Snowflake Connector for Kafka : installer et configurer

Cette rubrique décrit les étapes pour installer et configurer le Snowflake Connector for Kafka.

Installation du connecteur Kafka

Le connecteur Kafka est fourni sous forme de fichier JAR (exécutable Java).

Snowflake fournit deux versions du connecteur :

Les instructions de ce chapitre spécifient les étapes qui ne s’appliquent qu’à l’une ou l’autre version du connecteur.

Conditions préalables à l’installation

  • Le connecteur Kafka prend en charge les versions de paquet suivantes :

    Paquet

    Version du connecteur Snowflake Kafka

    Prise en charge des paquets (testé par Snowflake)

    Apache Kafka

    2.0.0 (ou versions ultérieures)

    Apache Kafka 2.8.2, 3.7.2, 4.1.1

    Confluent

    2.0.0 (ou versions ultérieures)

    Confluent 6.2.15, 7.8.2, 8.2.0

  • Le connecteur Kafka est conçu pour être utilisé avec l’API Kafka Connect 3.9.0. Les versions ultérieures de l’API Kafka Connect ne sont pas testés. Les versions antérieures à la version 3.9.0 sont compatibles avec le connecteur. Pour plus d’informations, voir Compatibilité Kafka.

  • Lorsque vous avez à la fois le connecteur Kafka et les fichiers jar du pilote JDBC dans votre environnement, assurez-vous que votre version de JDBC correspond à la version de snowflake-jdbc spécifiée dans le fichier pom.xml de la version du connecteur Kafka attendue. Vous pouvez accéder à la version du connecteur Kafka de votre choix, par exemple v4.0.0. Parcourez ensuite le fichier pom.xml pour connaître la version de snowflake-jdbc.

  • Si vous utilisez le format Avro pour l’ingestion de données :

  • Configurez Kafka avec la durée de conservation des données ou la limite de stockage de votre choix.

  • Installez et configurez le cluster Kafka Connect.

    Chaque nœud de cluster Kafka Connect doit inclure suffisamment de RAM pour le connecteur Kafka. La quantité minimale recommandée est de 5 MB par partition Kafka. Cela s’ajoute aux RAM requises pour toute autre tâche effectuée par Kafka Connect.

    Important

    Le connecteur v4 utilise un SDK Snowpipe Streaming basé sur Rust qui alloue de la mémoire hors tas (système) pour la mise en mémoire tampon. Limitez la taille du tas de la JVM à environ 50 % de la mémoire disponible pour laisser de la place pour le SDK. Par exemple, pour un processus avec 8 GB de RAM, définissez -Xmx4g.

  • Snowflake recommande d’utiliser les mêmes versions sur Kafka Broker et Kafka Connect Runtime.

  • Snowflake vous recommande vivement d’exécuter votre instance Kafka Connect dans la même région de fournisseur de cloud que votre compte Snowflake. Cela n’est pas strictement nécessaire, mais améliore généralement le débit.

Pour obtenir la liste des systèmes d’exploitation pris en charge par les clients Snowflake, voir Prise en charge par les systèmes d’exploitation.

Installation du connecteur

Cette section fournit des instructions pour l’installation et la configuration du connecteur Kafka pour Confluent. Le tableau suivant décrit les versions de connecteurs prises en charge.

Série de versions

Statut

Remarques

4.x.x

Disponible de manière générale

Dernière version. La migration depuis 3.x et 2.x doit être effectuée manuellement.

3.x.x

Officiellement pris en charge

Mise à niveau vers v4 recommandée.

2.x.x

Officiellement pris en charge

Mise à niveau recommandée.

1.x.x

Non pris en charge

Installation du connecteur pour Confluent

Téléchargement des fichiers du connecteur Kafka

Téléchargez le fichier JAR du connecteur Kafka à l’un des emplacements suivants :

Hub Confluent:

https://www.confluent.io/hub/

Le paquet inclut toutes les dépendances requises pour utiliser une clé privée chiffrée ou non chiffrée pour l’authentification par paire de clés. Pour plus d’informations, voir Utilisation de l’authentification par paire de clés et rotation de clés plus loin dans cette rubrique.

Maven Central Repository:

https://mvnrepository.com/artifact/com.snowflake

Lorsque vous utilisez cette version, vous devez télécharger les bibliothèques de cryptographie Bouncy Castle (fichiers JAR) :

Téléchargez ces fichiers dans le même dossier local que le fichier JAR du connecteur Kafka.

Le code source du connecteur est disponible sur https://github.com/snowflakedb/snowflake-kafka-connector.

Installation du connecteur Kafka

Installez le connecteur Kafka en suivant les instructions fournies pour installer d’autres connecteurs :

Installation du connecteur pour Apache Kafka open source

Cette section fournit des instructions pour l’installation et la configuration du connecteur Kafka pour Apache Kafka Open Source.

Installer Apache Kafka

  1. Téléchargez le paquet Kafka à partir du site officiel de Kafka.

  2. Dans une fenêtre de terminal, accédez au répertoire dans lequel vous avez téléchargé le fichier de paquet.

  3. Exécutez la commande suivante pour décompresser le fichier kafka_<scala_version>-<kafka_version>.tgz :

    tar xzvf kafka_<scala_version>-<kafka_version>.tgz
    

Installer le JDK

Installez et configurez le kit de développement Java (JDK) version 11 ou supérieure. Snowflake teste avec l’édition standard (SE) de JDK. L’édition entreprise (EE) devrait être compatible mais n’a pas été testée.

Si vous avez déjà installé le JDK, vous pouvez ignorer cette section.

  1. Téléchargez le JDK du site web Oracle JDK.

  2. Installez ou décompressez le JDK.

  3. En suivant les instructions pour votre système d’exploitation, définissez la variable d’environnement JAVA_HOME pour qu’elle pointe vers le répertoire contenant le JDK.

Téléchargement des fichiers JAR du connecteur Kafka

  1. Téléchargez le fichier JAR du connecteur Kafka à partir du référentiel central Maven :

    https://mvnrepository.com/artifact/com.snowflake

  2. Téléchargez les fichiers jar de la bibliothèque de cryptographie Bouncy Castle :

  3. Si vos données Kafka sont diffusées au format Apache Avro, téléchargez le fichier JAR (1.11.4) :

Le code source du connecteur est disponible sur https://github.com/snowflakedb/snowflake-kafka-connector.

Installation du connecteur Kafka

Copiez les fichiers JAR que vous avez téléchargés dans Installation du connecteur pour Apache Kafka open source dans le dossier <kafka_dir>/libs.

Configuration du connecteur Kafka

Lorsqu’il est déployé en mode autonome, le connecteur est configuré en créant un fichier spécifiant des paramètres tels que les identifiants de connexion Snowflake, les noms des sujets, les noms des tables Snowflake, etc. Lorsqu’il est déployé en mode distribué, le connecteur est configuré en appelant le point de terminaison de l’API REST du cluster Kafka Connect.

Important

L’infrastructure Kafka Connect diffuse les paramètres de configuration du connecteur Kafka du nœud primaire aux nœuds de travail. Les paramètres de configuration incluent des informations sensibles (en particulier le nom d’utilisateur et la clé privée Snowflake). Assurez-vous de sécuriser le canal de communication entre les nœuds Kafka Connect. Pour plus d’informations, consultez la documentation de votre logiciel Apache Kafka.

Chaque configuration spécifie les sujets et les tables correspondantes pour une base de données et un schéma dans cette base de données. Remarque : un connecteur peut intégrer des messages de n’importe quel nombre de sujets, mais les tables correspondantes doivent toutes figurer dans une base de données et un schéma uniques.

Cette section fournit des instructions pour les modes distribué et autonome.

Pour une description des champs de configuration, voir Propriétés de configuration du connecteur.

Important

Comme le fichier de configuration contient généralement des informations relatives à la sécurité, telles que la clé privée, définissez les privilèges de lecture / écriture de manière appropriée sur le fichier pour limiter l’accès.

En outre, envisagez de stocker le fichier de configuration dans un emplacement externe sécurisé ou dans un service de gestion de clés. Pour plus d’informations, voir Externaliser les secrets (dans ce chapitre).

Mode distribué

Créez le fichier de configuration Kafka, par exemple <path>/<config_file>.json. Remplissez le fichier avec toutes les informations de configuration du connecteur. Le fichier doit être au format JSON.

Exemple de fichier de configuration

{
  "name":"XYZCompanySensorData",
  "config":{
      "connector.class": "com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector",
      "snowflake.topic2table.map": "topic1:table_1,topic2:table_2",
      "snowflake.url.name": "myorganization-myaccount.snowflakecomputing.com:443",
      "snowflake.private.key": "-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n",
      "snowflake.schema.name": "MY_SCHEMA",
      "snowflake.database.name": "MY_DATABASE",
      "snowflake.role.name": "MY_ROLE",
      "snowflake.user.name": "MY_USER",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "errors.log.enable": "true",
      "topics": "topic1,topic2",
      "value.converter.schemas.enable": "false",
      "errors.tolerance": "none",
      "snowflake.streaming.validate.compatibility.with.classic": "false"
      }
}

Mode autonome

Créez un fichier de configuration, par exemple <kafka_dir>/config/SF_connect.properties. Remplissez le fichier avec toutes les informations de configuration du connecteur.

Exemple de fichier de configuration

connector.class=com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector
snowflake.topic2table.map=topic1:table_1,topic2:table_2
snowflake.url.name=myorganization-myaccount.snowflakecomputing.com:443
snowflake.private.key=-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n
snowflake.schema.name=MY_SCHEMA
snowflake.database.name=MY_DATABASE
snowflake.role.name=MY_ROLE
snowflake.user.name=MY_USER
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
errors.log.enable=true
topics=topic1,topic2
name=XYZCompanySensorData
value.converter.schemas.enable=false
errors.tolerance=none
snowflake.streaming.validate.compatibility.with.classic=false

Considérations relatives au cache pour les tests et le prototypage

Le connecteur met en cache les contrôles de recherche des tables et des canaux pour améliorer les performances lors des rééquilibrages des partitions. Cependant, lors des tests et du prototypage, ce comportement de mise en cache peut empêcher le connecteur de détecter immédiatement les tables ou les canaux créés manuellement.

Problème : lorsque vous créez manuellement une table ou un canal pendant que le connecteur est en cours d’exécution, le connecteur peut continuer à utiliser les résultats de la vérification de l’existence en cache (qui peuvent indiquer que l’objet n’existe pas) pendant 5 minutes par défaut. Cela peut conduire à des erreurs ou à des comportements inattendus pendant le test.

Recommandation pour les tests : pour éviter les problèmes liés au cache lors des tests et du prototypage, désactivez la mise en cache :

snowflake.cache.table.exists=false
snowflake.cache.pipe.exists=false

Cette configuration garantit que le connecteur effectue de nouveaux contrôles d’existence à chaque rééquilibrage de partition, ce qui vous permet de voir immédiatement les effets des tables et des canaux créés manuellement.

Important

Ces paramètres de cache minimaux sont recommandés uniquement pour les tests et le prototypage. Dans les environnements de production, utilisez les valeurs d’expiration du cache par défaut (5 minutes ou plus) pour réduire les requêtes de métadonnées vers Snowflake et améliorer les performances de rééquilibrage, surtout si vous gérez de nombreuses partitions.

Propriétés de configuration du connecteur

Configuration minimale pour les nouvelles installations

La configuration suivante est une configuration minimale pour que le connecteur fonctionne avec les valeurs v4 par défaut. Cet exemple utilise le format JSON pour le mode distribué :

{
  "name": "my_kafka_connector",
  "config": {
    "connector.class": "com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector",
    "topics": "my_topic",
    "snowflake.url.name": "https://myaccount.snowflakecomputing.com",
    "snowflake.user.name": "my_user",
    "snowflake.private.key": "<base64-encoded-private-key>",
    "snowflake.database.name": "MY_DB",
    "snowflake.schema.name": "MY_SCHEMA",
    "snowflake.role.name": "MY_ROLE",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "snowflake.streaming.validate.compatibility.with.classic": "false"
  }
}

Cette configuration utilise toutes les valeurs v4 par défaut : validation côté serveur, colonnes schématisées et identificateurs sensibles à la casse. Le connecteur crée automatiquement des tables et des canaux selon les besoins.

Note

Définissez snowflake.streaming.validate.compatibility.with.classic sur false pour les nouvelles installations. Ce paramètre n’est nécessaire que lors de la migration depuis v3.

Propriétés requises

name

Nom de l’application. Cela doit être unique pour tous les connecteurs Kafka utilisés par le client. Ce nom doit être un identificateur non délimité Snowflake et valide. Pour plus d’informations sur les identificateurs valides, voir Exigences relatives à l’identificateur.

connector.class

com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector

topics

Liste de sujets séparés par des virgules. Par défaut, Snowflake suppose que le nom de la table est identique à celui du sujet. Si le nom de la table est différent du nom du sujet, utilisez le paramètre facultatif topic2table.map (ci-dessous) pour spécifier le mappage entre le nom du sujet et le nom de la table. Ce nom de table doit être un identificateur non spécifié Snowflake et valide. Pour plus d’informations sur les noms de table valides, voir Exigences relatives à l’identificateur.

Note

:emph:`` topics ou topics.regex est requis ; pas les deux.

topics.regex

Il s’agit d’une expression régulière (« regex ») qui spécifie les sujets contenant les messages à charger dans les tables Snowflake. Le connecteur charge les données de tout nom de sujet correspondant à l’expression régulière. L’expression régulière doit respecter les règles applicables aux expressions régulières Java (c’est-à-dire être compatible avec java.util.regex.Pattern). Le fichier de configuration doit contenir :emph:` ou `, pas les deux.

snowflake.url.name

L’URL pour accéder à votre compte Snowflake. Cette URL doit inclure votre identificateur de compte. Notez que le protocole (https://) et le numéro de port sont facultatifs.

snowflake.user.name

Nom de connexion de l’utilisateur pour le compte Snowflake.

snowflake.role.name

Nom du rôle que le connecteur utilisera pour insérer des données dans la table.

snowflake.private.key

Clé privée pour authentifier l’utilisateur. Incluez uniquement la clé, pas l’en-tête ni le pied de page. Si la clé est divisée sur plusieurs lignes, supprimez les sauts de ligne. Vous pouvez fournir une clé non chiffrée ou une clé chiffrée et fournir le paramètre snowflake.private.key.passphrase pour permettre à Snowflake de déchiffrer la clé. Utilisez ce paramètre si et seulement si la valeur du paramètre snowflake.private.key est chiffrée. Celui-ci déchiffre les clés privées qui ont été chiffrées conformément aux instructions fournies dans la section Authentification par paire de clés et rotation de paires de clés.

Note

Consultez également snowflake.private.key.passphrase dans les Propriétés facultatives.

snowflake.database.name

Nom de la base de données contenant la table dans laquelle insérer des lignes.

snowflake.schema.name

Nom du schéma contenant la table dans laquelle insérer des lignes.

header.converter

Obligatoire uniquement si les enregistrements sont formatés en Avro et incluent un en-tête. La valeur par défaut est "org.apache.kafka.connect.storage.StringConverter".

key.converter

Le convertisseur de clé de l’enregistrement Kafka (par exemple, "org.apache.kafka.connect.storage.StringConverter"). Ce connecteur n’est pas utilisé par le connecteur Kafka, mais il est requis par la plateforme Kafka Connect.

value.converter

Le connecteur prend en charge les convertisseurs communautaires Kafka standard. Choisissez le convertisseur approprié en fonction de votre format de données :

  • Pour les enregistrements JSON : "org.apache.kafka.connect.json.JsonConverter"

  • Pour les enregistrements Avro avec le registre de schéma : "io.confluent.connect.avro.AvroConverter"

Note

Lorsque snowflake.enable.schematization=true (par défaut),``StringConverter`` et ByteArrayConverter ne sont pas pris en charge en tant que convertisseurs de valeurs. Pour plus d’informations, voir Dépanner le Snowflake Connector for Kafka.

Propriétés facultatives

Propriétés de schématisation et de validation

Ces propriétés contrôlent la manière dont le connecteur traite et valide les données. Pour les nouvelles installations, les valeurs par défaut fonctionnent bien. Si vous migrez depuis v3, examinez Migrer du connecteur Kafka v3 vers v4 pour obtenir des conseils sur les valeurs à utiliser.

snowflake.enable.schematization

Contrôle si les enregistrements entrants sont schématisés en colonnes de tables individuelles ou inclus dans des colonnes VARIANT héritées.

Lorsque true (par défaut), les champs d’enregistrement sont mappés aux colonnes de tables individuelles par leur nom. Lorsque false, le connecteur stocke les enregistrements dans deux colonnes VARIANT (RECORD_CONTENT et RECORD_METADATA), correspondant au comportement v3.

Par défaut:

true

snowflake.validation

Contrôles où la validation des données et l’évolution des schémas sont effectuées.

server_side (par défaut) : La validation est effectuée par le backend Snowflake, aligné avec le comportement COPY et Snowpipe. Les enregistrements invalides sont capturés dans les tables d’erreurs. Prend en charge à la fois les modes de canal par défaut et les modes de canal définis par l’utilisateur.

client_side : Le connecteur valide les types de données et la compatibilité des schémas avant d’envoyer les lignes à Snowflake. Prend en charge la file d’attente de lettres mortes (DLQ) pour les enregistrements invalides. Ne fonctionne qu’avec le mode de canal par défaut.

Pour plus de détails, voir Validation et traitement des erreurs.

Par défaut:

server_side

Propriétés de migration et de compatibilité

Ces propriétés sont pertinentes lors de la migration depuis v3. Pour les nouvelles installations, vous pouvez les ignorer et définir snowflake.streaming.validate.compatibility.with.classic=false.

snowflake.compatibility.enable.autogenerated.table.name.sanitization

Contrôle la manière dont les noms de tables automatiquement générés sont dérivés des noms de rubriques.

Lorsque false (par défaut), les noms de sujets sont utilisés tels quels pour les noms de tables, en préservant la casse et les caractères spéciaux. Les noms de tables sont créés sous forme d’identificateurs cités.

Lorsque true, les caractères d’identificateurs Snowflake invalides sont remplacés par des traits de soulignement, les noms sont en majuscules, et un code de hachage est ajouté pour l’unicité. Cela correspond au comportement v3.

Par défaut:

false

snowflake.compatibility.enable.column.identifier.normalization

Contrôle la manière dont les identificateurs de colonnes sont traités.

Lorsque false (par défaut), les identificateurs de colonnes conservent les majuscules et les caractères spéciaux tels quels.

Lorsque true, les identificateurs de colonnes sont normalisés en majuscules, correspondant au comportement v3.

Par défaut:

false

snowflake.streaming.validate.compatibility.with.classic

Active la validation du démarrage qui vérifie si toutes les configurations liées à la migration sont explicitement définies. Lorsque true, le connecteur échoue au démarrage avec une erreur descriptive si l’une des configurations suivantes est manquante ou incompatible avec le comportement v3 :

  • snowflake.validation doit être client_side

  • snowflake.compatibility.enable.column.identifier.normalization doit être true

  • snowflake.compatibility.enable.autogenerated.table.name.sanitization doit être true

  • snowflake.enable.schematization doit être explicitement définie sur true ou false (la valeur par défaut a changé entre v3 et v4)

  • snowflake.streaming.classic.offset.migration doit être explicitement définie

  • snowflake.streaming.classic.offset.migration.include.connector.name doit être explicitement définie (lorsque la migration de décalage est strict ou best_effort)

Cela empêche l’exécution accidentelle de v4 avec une configuration v3 copiée sans examiner les modifications de rupture. Définissez cette valeur sur false pour ignorer cette vérification après avoir confirmé votre configuration.

Note

Pour les nouvelles installations (qui ne migrent pas depuis v3), définissez cette valeur sur false. Le validateur de compatibilité n’est nécessaire que lors de la mise à niveau à partir d’un déploiement v3 existant.

Pour plus d’informations, voir Migrer du connecteur Kafka v3 vers v4.

Par défaut:

true

Propriétés de migration de décalage

Ces propriétés contrôlent la manière dont v4 migre les décalages validés des canaux Snowpipe Streaming (SSv1) v3. Elles ne sont pertinentes que lors de la migration à partir d’un connecteur v3 qui utilisait snowflake.ingestion.method=SNOWPIPE_STREAMING. Si vous migrez à partir du mode Snowpipe v3 (ingestion basée sur des fichiers), définissez snowflake.streaming.classic.offset.migration sur skip.

snowflake.streaming.classic.offset.migration

Contrôle la manière dont v4 migre les décalages des canaux Snowpipe Streaming (SSv1) v3.

strict : v4 recherche le décalage validé à partir du canal SSv1 v3 et reprend à partir de là. Si le canal SSv1 n’est pas trouvé, le connecteur échoue avec une erreur.

best_effort : v4 tente de rechercher le décalage validé à partir du canal SSv1 v3. Si le canal n’est pas trouvé, v4 revient au décalage du groupe de consommateurs Kafka.

skip (par défaut) : Aucune migration de décalage SSv1 n’est effectuée. v4 utilise le décalage du groupe de consommateurs Kafka. Utilisez cette option lorsque vous migrez à partir du mode Snowpipe v3 (et non de Snowpipe Streaming).

Par défaut:

skip

snowflake.streaming.classic.offset.migration.include.connector.name

Contrôle si la recherche du nom du canal SSv1 inclut le nom du connecteur. Cela doit correspondre à la manière dont votre connecteur v3 a été configuré. Dans la v3, la propriété snowflake.streaming.channel.name.include.connector.name contrôlait si le nom du connecteur était inclus dans le nom du canal.

Définissez-la sur true si votre connecteur v3 disposait de snowflake.streaming.channel.name.include.connector.name=true ou si vous exécutiez la version 2.1.0 ou 2.1.1 du connecteur Kafka (ces versions incluaient le nom du connecteur par défaut). Définissez-la sur false sinon.

Nécessaire uniquement lorsque snowflake.streaming.classic.offset.migration est strict ou best_effort.

Par défaut:

none (doit être explicitement défini lorsque la migration de décalage est active)

Propriétés de traitement des erreurs

errors.tolerance

Contrôle la manière dont le connecteur répond aux erreurs lors de l’ingestion.

none (par défaut) : La tâche du connecteur échoue à la première erreur. Avec la validation côté serveur, la détection des erreurs est asynchrone, de sorte que quelques enregistrements après celui qui est cassé peuvent encore être ingérés avant l’échec de la tâche.

all : Le connecteur poursuit l’ingestion des données. Avec la validation côté client, les enregistrements invalides sont acheminés vers la DLQ (si configurée) ou supprimés de manière silencieuse.

Avertissement

Définir errors.tolerance=all sans configurer de sujet DLQ entraîne la suppression silencieuse des enregistrements invalides lors de l’utilisation de la validation côté client. Cela peut entraîner une perte de données.

Par défaut:

none

errors.deadletterqueue.topic.name

Nom du sujet Kafka pour la file d’attente de lettres mortes. Effectif uniquement lorsque snowflake.validation=client_side et errors.tolerance=all.

Par défaut:

vide (DLQ désactivée)

errors.log.enable

Lorsque true, les erreurs sont journalisées avec les détails de l’opération qui a échoué et les propriétés de l’enregistrement.

Par défaut:

false

enable.task.fail.on.authorization.errors

Lorsque true, la tâche du connecteur échoue immédiatement en cas d’erreurs d’autorisation de Snowflake. Lorsque false, le connecteur effectue une nouvelle tentative.

Par défaut:

false

Mise en cache des propriétés

snowflake.cache.table.exists

Active la mise en cache pour les contrôles d’existence des tables, réduisant le nombre de requêtes de métadonnées vers Snowflake.

Par défaut:

true

snowflake.cache.table.exists.expire.ms

Temps d’expiration du cache en millisecondes pour les contrôles d’existence des tables.

Par défaut:

300000 (5 minutes)

snowflake.cache.pipe.exists

Active la mise en cache pour les contrôles d’existence des canaux.

Par défaut:

true

snowflake.cache.pipe.exists.expire.ms

Temps d’expiration du cache en millisecondes pour les contrôles d’existence des canaux.

Par défaut:

300000 (5 minutes)

Propriétés de surveillance et de diagnostic

jmx

Active JMXMBeans pour les métriques du connecteur. Pour plus d’informations, voir Surveiller le Snowflake Connector for Kafka.

Par défaut:

true

enable.mdc.logging

Active MDC (Mapped Diagnostic Context) pour ajouter le contexte du connecteur aux messages de journalisation, ce qui est utile lors de l’exécution de plusieurs instances de connecteur.

Par défaut:

false

snowflake.streaming.metadata.connectorPushTime

Lorsque true, comprend l’horodatage SnowflakeConnectorPushTime dans RECORD_METADATA. Ce champ enregistre quand le connecteur a mis en mémoire tampon un enregistrement à des fins d’ingestion et est utile pour estimer la latence de bout en bout.

Par défaut:

true

Propriétés avancées

snowflake.streaming.client.provider.override.map

Remplacements des propriétés du client Snowpipe Streaming. Format : key1:value1,key2:value2. À utiliser uniquement après avoir consulté le Support Snowflake.

Par défaut:

empty

Autres propriétés

snowflake.private.key.passphrase

Si la valeur de ce paramètre n’est pas vide, le connecteur utilise cette expression pour tenter de déchiffrer la clé privée.

tasks.max

Nombre de tâches, généralement égal au nombre de cœurs CPU sur les nœuds de travail du cluster Kafka Connect. Pour obtenir les meilleures performances, Snowflake recommande de définir un nombre de tâches égal au nombre total de partitions Kafka, sans dépasser le nombre de cœurs de CPU. Un nombre élevé de tâches peut entraîner une consommation accrue de mémoire et des rééquilibrages fréquents.

snowflake.topic2table.map

Liste des correspondances entre sujet et table, séparées par des virgules, au format topic:table. Prend en charge les modèles regex pour les noms de sujets. Les expressions régulières ne peuvent pas être ambiguës ; tout sujet correspondant doit correspondre à une seule table cible.

Les noms de sujets et de tables peuvent être entre guillemets doubles pour prendre en charge les caractères spéciaux (deux-points, virgules, espaces). Les noms de tables qui ne sont pas entre guillemets sont en majuscules. Les noms de tables entre guillemets conservent la casse.

Pour des exemples détaillés incluant des modèles regex, des correspondances plusieurs-à-un et l’utilisation des guillemets, consultez Correspondance explicite entre sujet et table.

Exemple :

snowflake.topic2table.map=topic1:low_range,topic2:low_range,"my:topic":"My_Table"
value.converter.schema.registry.url

Si le format est Avro et que vous utilisez un service de registre de schéma, il doit s’agir de l’URL du service de registre de schéma. Sinon, ce champ devrait être vide.

value.converter.break.on.schema.registry.error

Si vous chargez des données Avro à partir du service de registre de schéma, cette propriété détermine si le connecteur Kafka doit cesser de consommer des enregistrements s’il rencontre une erreur lors de la récupération de l’ID de schéma. La valeur par défaut est false. Définissez la valeur sur true pour activer ce comportement.

jvm.proxy.host

Pour permettre au connecteur Snowflake Kafka d’accéder à Snowflake via un serveur proxy, définissez ce paramètre pour spécifier l’hôte de ce serveur proxy.

jvm.proxy.port

Pour permettre au connecteur Snowflake Kafka d’accéder à Snowflake via un serveur proxy, définissez ce paramètre pour spécifier le port de ce serveur proxy.

jvm.proxy.username

Nom d’utilisateur qui s’authentifie auprès du serveur proxy.

jvm.proxy.password

Mot de passe du nom d’utilisateur qui s’authentifie auprès du serveur proxy.

snowflake.jdbc.map

Exemple : "snowflake.jdbc.map": "networkTimeout:20,tracing:WARNING"

Les propriétés JDBC supplémentaires (voir Référence Paramètre de connexion pilote JDBC) ne sont pas validées. Ces propriétés supplémentaires ne sont pas validées et ne doivent pas remplacer ni être utilisées à la place des propriétés obligatoires telles que jvm.proxy.xxx, snowflake.user.name, snowflake.private.key, snowflake.schema.name et des propriétés similaires.

Spécification de l’une des combinaisons suivantes :
  • Propriété tracing avec variable d’environnement JDBC_TRACE

  • Propriété database avec snowflake.database.name

Cela entraînera un comportement ambigu et le comportement sera déterminé par le pilote JDBC.

value.converter.basic.auth.credentials.source

Si vous utilisez le format de données Avro et avez besoin d’un accès sécurisé au registre de schémas Kafka, définissez ce paramètre sur la chaîne « USER_INFO », puis définissez le paramètre value.converter.basic.auth.user.info décrit ci-dessous. Sinon, omettez ce paramètre.

value.converter.basic.auth.user.info

Si vous utilisez le format de données Avro et avez besoin d’un accès sécurisé au registre de schémas Kafka, définissez ce paramètre sur la chaîne « <ID_utilisateur>:<motdepasse> », puis définissez le paramètre value.converter.basic.auth.credentials.source décrit ci-dessus. Sinon, omettez ce paramètre.

snowflake.metadata.createtime

Si la valeur est définie sur FALSE, la valeur de la propriété CreateTime est omise des métadonnées dans la colonne RECORD_METADATA. La valeur par défaut est TRUE.

snowflake.metadata.topic

Si la valeur est définie sur FALSE, la valeur de la propriété topic est omise des métadonnées dans la colonne RECORD_METADATA. La valeur par défaut est TRUE.

snowflake.metadata.offset.and.partition

Si la valeur est définie sur FALSE, les valeurs de propriété Offset et Partition sont omises des métadonnées de la colonne RECORD_METADATA. La valeur par défaut est TRUE.

snowflake.metadata.all

Si la valeur est définie sur FALSE, les métadonnées de la colonne RECORD_METADATA sont complètement vides. La valeur par défaut est TRUE.

transforms

Spécifier pour ignorer les enregistrements tombstone rencontrés par le connecteur Kafka et ne pas les charger dans la table cible. Un enregistrement de type tombstone est défini comme un enregistrement dont le champ de valeur est entièrement nul.

Définir la valeur de la propriété sur "tombstoneHandlerExample".

Note

Utilisez cette propriété uniquement avec les convertisseurs de communauté Kafka (c’est-à-dire la valeur de la propriété value.converter) (par exemple org.apache.kafka.connect.json.JsonConverter ou org.apache.kafka.connect.json.AvroConverter). Pour gérer le traitement des enregistrements tombstone avec les convertisseurs Snowflake, utilisez plutôt la propriété behavior.on.null.values.

transforms.tombstoneHandlerExample.type

Requis lors de la définition de la propriété transforms.

Définissez la valeur de la propriété sur "io.confluent.connect.transforms.TombstoneHandler"

behavior.on.null.values

Spécifiez comment le connecteur Kafka doit traiter les enregistrements tombstone. Un enregistrement de type tombstone est défini comme un enregistrement dont le champ de valeur est entièrement nul. Pour Snowpipe, cette propriété est prise en charge par le connecteur Kafka version 1.5.5 et ultérieure. Pour Snowpipe Streaming, cette propriété est prise en charge par le connecteur Kafka à partir de la version 2.1.0.

Cette propriété prend en charge les valeurs suivantes :

DEFAULT

Lorsque le connecteur Kafka rencontre un enregistrement tombstone, il insère une chaîne JSON vide dans la colonne de contenu.

IGNORE

Le connecteur Kafka ignore les enregistrements tombstone et n’insère pas de lignes pour ces enregistrements.

La valeur par défaut est DEFAULT.

Note

L’ingestion des enregistrements tombstone varie selon les méthodes d’ingestion :

  • Pour Snowpipe, le connecteur Kafka utilise uniquement les convertisseurs Snowflake. Pour gérer le traitement des enregistrements tombstone avec les convertisseurs communautaires Kafka, utilisez plutôt les propriétés transform et transforms.tombstoneHandlerExample.type.

  • Pour Snowpipe Streaming, le connecteur Kafka utilise uniquement des convertisseurs communautaires.

Les enregistrements envoyés aux courtiers Kafka ne doivent pas être NULL, car ces enregistrements seront abandonnés par le connecteur Kafka, ce qui entraînera des décalages manquants. Les décalages manquants interrompront le connecteur Kafka dans des cas d’utilisation spécifiques. Il est recommandé d’utiliser les enregistrements tombstone plutôt que les enregistrements NULL.

Utilisation de l’authentification par paire de clés et rotation de clés

Le connecteur Kafka repose sur l’authentification par paire de clés au lieu de l’authentification par nom d’utilisateur et mot de passe. Cette méthode d’authentification nécessite une paire de clés de 2048 bits (minimum) RSA. Générez la paire de clés publiques-privées via OpenSSL. La clé publique est attribuée à l’utilisateur Snowflake défini dans le fichier de configuration.

Après avoir terminé les tâches d’authentification par paire de clés sur cette page et les tâches pour la rotation de paires de clés, évaluez la recommandation pour Externaliser les secrets, plus loin dans cette rubrique.

Pour configurer la paire de clés publiques/privées :

  1. Depuis la ligne de commande d’une fenêtre de terminal, générez une clé privée.

    Vous pouvez générer une version chiffrée ou non chiffrée de la clé privée.

    Note

    Le connecteur Kafka prend en charge des algorithmes de chiffrement validés pour répondre aux exigences de la norme Federal Information Processing Standard (140-2) (c’est-à-dire FIPS 140-2). Pour plus d’informations, voir FIPS 140-2.

    Pour générer une version non chiffrée, utilisez la commande suivante :

    $ openssl genrsa -out rsa_key.pem 2048
    

    Pour générer une version chiffrée, utilisez la commande suivante :

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 <algorithm> -inform PEM -out rsa_key.p8
    

    <algorithme> est un algorithme de chiffrement conforme à FIPS 140-2.

    Par exemple, pour spécifier AES 256 comme algorithme de chiffrement :

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 aes256 -inform PEM -out rsa_key.p8
    

    Si vous générez une version chiffrée de la clé privée, enregistrez la phrase secrète. Plus tard, vous spécifierez la phrase secrète dans la propriété snowflake.private.key.passphrase du fichier de configuration de Kafka.

    Exemple de clé privée PEM

    -----BEGIN ENCRYPTED PRIVATE KEY-----
    MIIE6TAbBgkqhkiG9w0BBQMwDgQILYPyCppzOwECAggABIIEyLiGSpeeGSe3xHP1
    wHLjfCYycUPennlX2bd8yX8xOxGSGfvB+99+PmSlex0FmY9ov1J8H1H9Y3lMWXbL
    ...
    -----END ENCRYPTED PRIVATE KEY-----
    
  2. Depuis la ligne de commande, générez la clé publique en faisant référence à la clé privée :

    En supposant que la clé privée soit chiffrée et contenue dans le fichier nommé rsa_key.p8, utilisez la commande suivante :

    $ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
    

    Exemple de clé publique PEM

    -----BEGIN PUBLIC KEY-----
    MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy+Fw2qv4Roud3l6tjPH4
    zxybHjmZ5rhtCz9jppCV8UTWvEXxa88IGRIHbJ/PwKW/mR8LXdfI7l/9vCMXX4mk
    ...
    -----END PUBLIC KEY-----
    
  3. Copiez les fichiers de clés publiques et privées dans un répertoire local en vue de leur stockage. Notez le chemin d’accès aux fichiers. La clé privée est stockée au format PKCS#8 (Public Key Cryptography Standards) et est chiffrée à l’aide de la phrase secrète que vous avez spécifiée à l’étape précédente ; toutefois, le fichier doit toujours être protégé contre tout accès non autorisé au moyen du mécanisme d’autorisation de fichier fourni par votre système d’exploitation. Il est de la responsabilité de l’utilisateur de sécuriser le fichier lorsqu’il n’est pas utilisé.

  4. Connectez-vous à Snowflake. Attribuez la clé publique à l’utilisateur Snowflake en utilisant ALTER USER.

    Par exemple :

    ALTER USER jsmith SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
    

    Note

    • Seuls les administrateurs de sécurité (c’est-à-dire les utilisateurs disposant du rôle SECURITYADMIN) ou ayant un rôle supérieur peuvent modifier un utilisateur.

    • Excluez l’en-tête et le pied de page de la clé publique dans l’instruction SQL.

    Vérifiez l’empreinte de la clé publique de l’utilisateur en utilisant DESCRIBE USER :

    DESC USER jsmith;
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    | property                      | value                                               | default | description                                                                   |
    |-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------|
    | NAME                          | JSMITH                                              | null    | Name                                                                          |
    ...
    ...
    | RSA_PUBLIC_KEY_FP             | SHA256:nvnONUsfiuycCLMXIEWG4eTp4FjhVUZQUQbNpbSHXiA= | null    | Fingerprint of user's RSA public key.                                         |
    | RSA_PUBLIC_KEY_2_FP           | null                                                | null    | Fingerprint of user's second RSA public key.                                  |
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    

    Note

    La propriété RSA_PUBLIC_KEY_2_FP est décrite dans Configuration de la rotation de paires de clés.

  5. Copiez et collez la clé privée complète dans le champ snowflake.private.key du fichier de configuration. Sauvegardez le fichier.

Externaliser les secrets

Snowflake recommande fortement d’externaliser des secrets tels que la clé privée et de les stocker sous une forme chiffrée ou dans un service de gestion de clés tel que AWS Key Management Service (KMS), Microsoft Azure Key Vault, ou HashiCorp Vault. Pour ce faire, utilisez une implémentation ConfigProvider sur votre cluster Kafka Connect.

Pour plus d’informations, voir la description de ce service par Confluent.

Démarrage du connecteur

Démarrez Kafka en suivant les instructions fournies dans la documentation tierce Confluent ou Apache Kafka. Vous pouvez démarrer le connecteur Kafka en mode distribué ou en mode autonome. Les instructions pour chacun sont indiquées ci-dessous :

Mode distribué

À partir d’une fenêtre de terminal, exécutez la commande suivante :

curl -X POST -H "Content-Type: application/json" --data @<path>/<config_file>.json http://localhost:8083/connectors

Mode autonome

À partir d’une fenêtre de terminal, exécutez la commande suivante :

<kafka_dir>/bin/connect-standalone.sh <kafka_dir>/<path>/connect-standalone.properties <kafka_dir>/config/SF_connect.properties

Note

(Une installation par défaut d’Apache Kafka ou de Confluent Kafka devrait déjà comprendre le fichier connect-standalone.properties).

Prochaines étapes

tester le connecteur.