Snowflake Connector for Kafka : installer et configurer¶
Cette rubrique décrit les étapes pour installer et configurer le Snowflake Connector for Kafka.
Installation du connecteur Kafka¶
Le connecteur Kafka est fourni sous forme de fichier JAR (exécutable Java).
Snowflake fournit deux versions du connecteur :
Une version pour l’installation Confluent Kafka.
Une version pour l’écosystème open source software (OSS) Apache Kafka https://mvnrepository.com/artifact/com.snowflake/snowflake-kafka-connector/.
Les instructions de ce chapitre spécifient les étapes qui ne s’appliquent qu’à l’une ou l’autre version du connecteur.
Conditions préalables à l’installation¶
Le connecteur Kafka prend en charge les versions de paquet suivantes :
Paquet
Version du connecteur Snowflake Kafka
Prise en charge des paquets (testé par Snowflake)
Apache Kafka
2.0.0 (ou versions ultérieures)
Apache Kafka 2.8.2, 3.7.2, 4.1.1
Confluent
2.0.0 (ou versions ultérieures)
Confluent 6.2.15, 7.8.2, 8.2.0
Le connecteur Kafka est conçu pour être utilisé avec l’API Kafka Connect 3.9.0. Les versions ultérieures de l’API Kafka Connect ne sont pas testés. Les versions antérieures à la version 3.9.0 sont compatibles avec le connecteur. Pour plus d’informations, voir Compatibilité Kafka.
Lorsque vous avez à la fois le connecteur Kafka et les fichiers jar du pilote JDBC dans votre environnement, assurez-vous que votre version de JDBC correspond à la version de
snowflake-jdbcspécifiée dans le fichierpom.xmlde la version du connecteur Kafka attendue. Vous pouvez accéder à la version du connecteur Kafka de votre choix, par exemple v4.0.0. Parcourez ensuite le fichierpom.xmlpour connaître la version desnowflake-jdbc.Si vous utilisez le format Avro pour l’ingestion de données :
Utilisez l’analyseur Avro, version 1.8.2 (ou supérieure), disponible à l’adresse https://mvnrepository.com/artifact/org.apache.avro.
Si vous utilisez la fonction de registre de schéma avec Avro, utilisez la version 5.0.0 (ou supérieure) du convertisseur Kafka Connect Avro disponible à l’adresse https://mvnrepository.com/artifact/io.confluent.
Notez que la fonction de registre de schéma n’est pas disponible dans le paquet OSS Apache Kafka.
Configurez Kafka avec la durée de conservation des données ou la limite de stockage de votre choix.
Installez et configurez le cluster Kafka Connect.
Chaque nœud de cluster Kafka Connect doit inclure suffisamment de RAM pour le connecteur Kafka. La quantité minimale recommandée est de 5 MB par partition Kafka. Cela s’ajoute aux RAM requises pour toute autre tâche effectuée par Kafka Connect.
Important
Le connecteur v4 utilise un SDK Snowpipe Streaming basé sur Rust qui alloue de la mémoire hors tas (système) pour la mise en mémoire tampon. Limitez la taille du tas de la JVM à environ 50 % de la mémoire disponible pour laisser de la place pour le SDK. Par exemple, pour un processus avec 8 GB de RAM, définissez
-Xmx4g.Snowflake recommande d’utiliser les mêmes versions sur Kafka Broker et Kafka Connect Runtime.
Snowflake vous recommande vivement d’exécuter votre instance Kafka Connect dans la même région de fournisseur de cloud que votre compte Snowflake. Cela n’est pas strictement nécessaire, mais améliore généralement le débit.
Pour obtenir la liste des systèmes d’exploitation pris en charge par les clients Snowflake, voir Prise en charge par les systèmes d’exploitation.
Installation du connecteur¶
Cette section fournit des instructions pour l’installation et la configuration du connecteur Kafka pour Confluent. Le tableau suivant décrit les versions de connecteurs prises en charge.
Série de versions |
Statut |
Remarques |
|---|---|---|
4.x.x |
Disponible de manière générale |
Dernière version. La migration depuis 3.x et 2.x doit être effectuée manuellement. |
3.x.x |
Officiellement pris en charge |
Mise à niveau vers v4 recommandée. |
2.x.x |
Officiellement pris en charge |
Mise à niveau recommandée. |
1.x.x |
Non pris en charge |
Installation du connecteur pour Confluent¶
Téléchargement des fichiers du connecteur Kafka¶
Téléchargez le fichier JAR du connecteur Kafka à l’un des emplacements suivants :
- Hub Confluent:
-
Le paquet inclut toutes les dépendances requises pour utiliser une clé privée chiffrée ou non chiffrée pour l’authentification par paire de clés. Pour plus d’informations, voir Utilisation de l’authentification par paire de clés et rotation de clés plus loin dans cette rubrique.
- Maven Central Repository:
https://mvnrepository.com/artifact/com.snowflake
Lorsque vous utilisez cette version, vous devez télécharger les bibliothèques de cryptographie Bouncy Castle (fichiers JAR) :
Téléchargez ces fichiers dans le même dossier local que le fichier JAR du connecteur Kafka.
Le code source du connecteur est disponible sur https://github.com/snowflakedb/snowflake-kafka-connector.
Installation du connecteur Kafka¶
Installez le connecteur Kafka en suivant les instructions fournies pour installer d’autres connecteurs :
Installation du connecteur pour Apache Kafka open source¶
Cette section fournit des instructions pour l’installation et la configuration du connecteur Kafka pour Apache Kafka Open Source.
Installer Apache Kafka¶
Téléchargez le paquet Kafka à partir du site officiel de Kafka.
Dans une fenêtre de terminal, accédez au répertoire dans lequel vous avez téléchargé le fichier de paquet.
Exécutez la commande suivante pour décompresser le fichier
kafka_<scala_version>-<kafka_version>.tgz:
Installer le JDK¶
Installez et configurez le kit de développement Java (JDK) version 11 ou supérieure. Snowflake teste avec l’édition standard (SE) de JDK. L’édition entreprise (EE) devrait être compatible mais n’a pas été testée.
Si vous avez déjà installé le JDK, vous pouvez ignorer cette section.
Téléchargez le JDK du site web Oracle JDK.
Installez ou décompressez le JDK.
En suivant les instructions pour votre système d’exploitation, définissez la variable d’environnement JAVA_HOME pour qu’elle pointe vers le répertoire contenant le JDK.
Téléchargement des fichiers JAR du connecteur Kafka¶
Téléchargez le fichier JAR du connecteur Kafka à partir du référentiel central Maven :
Téléchargez les fichiers jar de la bibliothèque de cryptographie Bouncy Castle :
Si vos données Kafka sont diffusées au format Apache Avro, téléchargez le fichier JAR (1.11.4) :
Le code source du connecteur est disponible sur https://github.com/snowflakedb/snowflake-kafka-connector.
Installation du connecteur Kafka¶
Copiez les fichiers JAR que vous avez téléchargés dans Installation du connecteur pour Apache Kafka open source dans le dossier <kafka_dir>/libs.
Configuration du connecteur Kafka¶
Lorsqu’il est déployé en mode autonome, le connecteur est configuré en créant un fichier spécifiant des paramètres tels que les identifiants de connexion Snowflake, les noms des sujets, les noms des tables Snowflake, etc. Lorsqu’il est déployé en mode distribué, le connecteur est configuré en appelant le point de terminaison de l’API REST du cluster Kafka Connect.
Important
L’infrastructure Kafka Connect diffuse les paramètres de configuration du connecteur Kafka du nœud primaire aux nœuds de travail. Les paramètres de configuration incluent des informations sensibles (en particulier le nom d’utilisateur et la clé privée Snowflake). Assurez-vous de sécuriser le canal de communication entre les nœuds Kafka Connect. Pour plus d’informations, consultez la documentation de votre logiciel Apache Kafka.
Chaque configuration spécifie les sujets et les tables correspondantes pour une base de données et un schéma dans cette base de données. Remarque : un connecteur peut intégrer des messages de n’importe quel nombre de sujets, mais les tables correspondantes doivent toutes figurer dans une base de données et un schéma uniques.
Cette section fournit des instructions pour les modes distribué et autonome.
Pour une description des champs de configuration, voir Propriétés de configuration du connecteur.
Important
Comme le fichier de configuration contient généralement des informations relatives à la sécurité, telles que la clé privée, définissez les privilèges de lecture / écriture de manière appropriée sur le fichier pour limiter l’accès.
En outre, envisagez de stocker le fichier de configuration dans un emplacement externe sécurisé ou dans un service de gestion de clés. Pour plus d’informations, voir Externaliser les secrets (dans ce chapitre).
Mode distribué¶
Créez le fichier de configuration Kafka, par exemple <path>/<config_file>.json. Remplissez le fichier avec toutes les informations de configuration du connecteur. Le fichier doit être au format JSON.
Exemple de fichier de configuration
Mode autonome¶
Créez un fichier de configuration, par exemple <kafka_dir>/config/SF_connect.properties. Remplissez le fichier avec toutes les informations de configuration du connecteur.
Exemple de fichier de configuration
Considérations relatives au cache pour les tests et le prototypage¶
Le connecteur met en cache les contrôles de recherche des tables et des canaux pour améliorer les performances lors des rééquilibrages des partitions. Cependant, lors des tests et du prototypage, ce comportement de mise en cache peut empêcher le connecteur de détecter immédiatement les tables ou les canaux créés manuellement.
Problème : lorsque vous créez manuellement une table ou un canal pendant que le connecteur est en cours d’exécution, le connecteur peut continuer à utiliser les résultats de la vérification de l’existence en cache (qui peuvent indiquer que l’objet n’existe pas) pendant 5 minutes par défaut. Cela peut conduire à des erreurs ou à des comportements inattendus pendant le test.
Recommandation pour les tests : pour éviter les problèmes liés au cache lors des tests et du prototypage, désactivez la mise en cache :
Cette configuration garantit que le connecteur effectue de nouveaux contrôles d’existence à chaque rééquilibrage de partition, ce qui vous permet de voir immédiatement les effets des tables et des canaux créés manuellement.
Important
Ces paramètres de cache minimaux sont recommandés uniquement pour les tests et le prototypage. Dans les environnements de production, utilisez les valeurs d’expiration du cache par défaut (5 minutes ou plus) pour réduire les requêtes de métadonnées vers Snowflake et améliorer les performances de rééquilibrage, surtout si vous gérez de nombreuses partitions.
Propriétés de configuration du connecteur¶
Configuration minimale pour les nouvelles installations¶
La configuration suivante est une configuration minimale pour que le connecteur fonctionne avec les valeurs v4 par défaut. Cet exemple utilise le format JSON pour le mode distribué :
Cette configuration utilise toutes les valeurs v4 par défaut : validation côté serveur, colonnes schématisées et identificateurs sensibles à la casse. Le connecteur crée automatiquement des tables et des canaux selon les besoins.
Note
Définissez snowflake.streaming.validate.compatibility.with.classic sur false pour les nouvelles installations. Ce paramètre n’est nécessaire que lors de la migration depuis v3.
Propriétés requises¶
nameNom de l’application. Cela doit être unique pour tous les connecteurs Kafka utilisés par le client. Ce nom doit être un identificateur non délimité Snowflake et valide. Pour plus d’informations sur les identificateurs valides, voir Exigences relatives à l’identificateur.
connector.classcom.snowflake.kafka.connector.SnowflakeStreamingSinkConnectortopicsListe de sujets séparés par des virgules. Par défaut, Snowflake suppose que le nom de la table est identique à celui du sujet. Si le nom de la table est différent du nom du sujet, utilisez le paramètre facultatif
topic2table.map(ci-dessous) pour spécifier le mappage entre le nom du sujet et le nom de la table. Ce nom de table doit être un identificateur non spécifié Snowflake et valide. Pour plus d’informations sur les noms de table valides, voir Exigences relatives à l’identificateur.Note
:emph:``
topicsoutopics.regexest requis ; pas les deux.topics.regexIl s’agit d’une expression régulière (« regex ») qui spécifie les sujets contenant les messages à charger dans les tables Snowflake. Le connecteur charge les données de tout nom de sujet correspondant à l’expression régulière. L’expression régulière doit respecter les règles applicables aux expressions régulières Java (c’est-à-dire être compatible avec java.util.regex.Pattern). Le fichier de configuration doit contenir :emph:` ou `, pas les deux.
snowflake.url.nameL’URL pour accéder à votre compte Snowflake. Cette URL doit inclure votre identificateur de compte. Notez que le protocole (
https://) et le numéro de port sont facultatifs.snowflake.user.nameNom de connexion de l’utilisateur pour le compte Snowflake.
snowflake.role.nameNom du rôle que le connecteur utilisera pour insérer des données dans la table.
snowflake.private.keyClé privée pour authentifier l’utilisateur. Incluez uniquement la clé, pas l’en-tête ni le pied de page. Si la clé est divisée sur plusieurs lignes, supprimez les sauts de ligne. Vous pouvez fournir une clé non chiffrée ou une clé chiffrée et fournir le paramètre
snowflake.private.key.passphrasepour permettre à Snowflake de déchiffrer la clé. Utilisez ce paramètre si et seulement si la valeur du paramètresnowflake.private.keyest chiffrée. Celui-ci déchiffre les clés privées qui ont été chiffrées conformément aux instructions fournies dans la section Authentification par paire de clés et rotation de paires de clés.Note
Consultez également
snowflake.private.key.passphrasedans les Propriétés facultatives.snowflake.database.nameNom de la base de données contenant la table dans laquelle insérer des lignes.
snowflake.schema.nameNom du schéma contenant la table dans laquelle insérer des lignes.
header.converterObligatoire uniquement si les enregistrements sont formatés en Avro et incluent un en-tête. La valeur par défaut est
"org.apache.kafka.connect.storage.StringConverter".key.converterLe convertisseur de clé de l’enregistrement Kafka (par exemple,
"org.apache.kafka.connect.storage.StringConverter"). Ce connecteur n’est pas utilisé par le connecteur Kafka, mais il est requis par la plateforme Kafka Connect.value.converterLe connecteur prend en charge les convertisseurs communautaires Kafka standard. Choisissez le convertisseur approprié en fonction de votre format de données :
Pour les enregistrements JSON :
"org.apache.kafka.connect.json.JsonConverter"Pour les enregistrements Avro avec le registre de schéma :
"io.confluent.connect.avro.AvroConverter"
Note
Lorsque
snowflake.enable.schematization=true(par défaut),``StringConverter`` etByteArrayConverterne sont pas pris en charge en tant que convertisseurs de valeurs. Pour plus d’informations, voir Dépanner le Snowflake Connector for Kafka.
Propriétés facultatives¶
Propriétés de schématisation et de validation¶
Ces propriétés contrôlent la manière dont le connecteur traite et valide les données. Pour les nouvelles installations, les valeurs par défaut fonctionnent bien. Si vous migrez depuis v3, examinez Migrer du connecteur Kafka v3 vers v4 pour obtenir des conseils sur les valeurs à utiliser.
snowflake.enable.schematizationContrôle si les enregistrements entrants sont schématisés en colonnes de tables individuelles ou inclus dans des colonnes VARIANT héritées.
Lorsque
true(par défaut), les champs d’enregistrement sont mappés aux colonnes de tables individuelles par leur nom. Lorsquefalse, le connecteur stocke les enregistrements dans deux colonnes VARIANT (RECORD_CONTENTetRECORD_METADATA), correspondant au comportement v3.- Par défaut:
true
snowflake.validationContrôles où la validation des données et l’évolution des schémas sont effectuées.
server_side(par défaut) : La validation est effectuée par le backend Snowflake, aligné avec le comportement COPY et Snowpipe. Les enregistrements invalides sont capturés dans les tables d’erreurs. Prend en charge à la fois les modes de canal par défaut et les modes de canal définis par l’utilisateur.client_side: Le connecteur valide les types de données et la compatibilité des schémas avant d’envoyer les lignes à Snowflake. Prend en charge la file d’attente de lettres mortes (DLQ) pour les enregistrements invalides. Ne fonctionne qu’avec le mode de canal par défaut.Pour plus de détails, voir Validation et traitement des erreurs.
- Par défaut:
server_side
Propriétés de migration et de compatibilité¶
Ces propriétés sont pertinentes lors de la migration depuis v3. Pour les nouvelles installations, vous pouvez les ignorer et définir snowflake.streaming.validate.compatibility.with.classic=false.
snowflake.compatibility.enable.autogenerated.table.name.sanitizationContrôle la manière dont les noms de tables automatiquement générés sont dérivés des noms de rubriques.
Lorsque
false(par défaut), les noms de sujets sont utilisés tels quels pour les noms de tables, en préservant la casse et les caractères spéciaux. Les noms de tables sont créés sous forme d’identificateurs cités.Lorsque
true, les caractères d’identificateurs Snowflake invalides sont remplacés par des traits de soulignement, les noms sont en majuscules, et un code de hachage est ajouté pour l’unicité. Cela correspond au comportement v3.- Par défaut:
false
snowflake.compatibility.enable.column.identifier.normalizationContrôle la manière dont les identificateurs de colonnes sont traités.
Lorsque
false(par défaut), les identificateurs de colonnes conservent les majuscules et les caractères spéciaux tels quels.Lorsque
true, les identificateurs de colonnes sont normalisés en majuscules, correspondant au comportement v3.- Par défaut:
false
snowflake.streaming.validate.compatibility.with.classicActive la validation du démarrage qui vérifie si toutes les configurations liées à la migration sont explicitement définies. Lorsque
true, le connecteur échoue au démarrage avec une erreur descriptive si l’une des configurations suivantes est manquante ou incompatible avec le comportement v3 :snowflake.validationdoit êtreclient_sidesnowflake.compatibility.enable.column.identifier.normalizationdoit êtretruesnowflake.compatibility.enable.autogenerated.table.name.sanitizationdoit êtretruesnowflake.enable.schematizationdoit être explicitement définie surtrueoufalse(la valeur par défaut a changé entre v3 et v4)snowflake.streaming.classic.offset.migrationdoit être explicitement définiesnowflake.streaming.classic.offset.migration.include.connector.namedoit être explicitement définie (lorsque la migration de décalage eststrictoubest_effort)
Cela empêche l’exécution accidentelle de v4 avec une configuration v3 copiée sans examiner les modifications de rupture. Définissez cette valeur sur
falsepour ignorer cette vérification après avoir confirmé votre configuration.Note
Pour les nouvelles installations (qui ne migrent pas depuis v3), définissez cette valeur sur
false. Le validateur de compatibilité n’est nécessaire que lors de la mise à niveau à partir d’un déploiement v3 existant.Pour plus d’informations, voir Migrer du connecteur Kafka v3 vers v4.
- Par défaut:
true
Propriétés de migration de décalage¶
Ces propriétés contrôlent la manière dont v4 migre les décalages validés des canaux Snowpipe Streaming (SSv1) v3. Elles ne sont pertinentes que lors de la migration à partir d’un connecteur v3 qui utilisait snowflake.ingestion.method=SNOWPIPE_STREAMING. Si vous migrez à partir du mode Snowpipe v3 (ingestion basée sur des fichiers), définissez snowflake.streaming.classic.offset.migration sur skip.
snowflake.streaming.classic.offset.migrationContrôle la manière dont v4 migre les décalages des canaux Snowpipe Streaming (SSv1) v3.
strict: v4 recherche le décalage validé à partir du canal SSv1 v3 et reprend à partir de là. Si le canal SSv1 n’est pas trouvé, le connecteur échoue avec une erreur.best_effort: v4 tente de rechercher le décalage validé à partir du canal SSv1 v3. Si le canal n’est pas trouvé, v4 revient au décalage du groupe de consommateurs Kafka.skip(par défaut) : Aucune migration de décalage SSv1 n’est effectuée. v4 utilise le décalage du groupe de consommateurs Kafka. Utilisez cette option lorsque vous migrez à partir du mode Snowpipe v3 (et non de Snowpipe Streaming).- Par défaut:
skip
snowflake.streaming.classic.offset.migration.include.connector.nameContrôle si la recherche du nom du canal SSv1 inclut le nom du connecteur. Cela doit correspondre à la manière dont votre connecteur v3 a été configuré. Dans la v3, la propriété
snowflake.streaming.channel.name.include.connector.namecontrôlait si le nom du connecteur était inclus dans le nom du canal.Définissez-la sur
truesi votre connecteur v3 disposait desnowflake.streaming.channel.name.include.connector.name=trueou si vous exécutiez la version 2.1.0 ou 2.1.1 du connecteur Kafka (ces versions incluaient le nom du connecteur par défaut). Définissez-la surfalsesinon.Nécessaire uniquement lorsque
snowflake.streaming.classic.offset.migrationeststrictoubest_effort.- Par défaut:
none (doit être explicitement défini lorsque la migration de décalage est active)
Propriétés de traitement des erreurs¶
errors.toleranceContrôle la manière dont le connecteur répond aux erreurs lors de l’ingestion.
none(par défaut) : La tâche du connecteur échoue à la première erreur. Avec la validation côté serveur, la détection des erreurs est asynchrone, de sorte que quelques enregistrements après celui qui est cassé peuvent encore être ingérés avant l’échec de la tâche.all: Le connecteur poursuit l’ingestion des données. Avec la validation côté client, les enregistrements invalides sont acheminés vers la DLQ (si configurée) ou supprimés de manière silencieuse.Avertissement
Définir
errors.tolerance=allsans configurer de sujet DLQ entraîne la suppression silencieuse des enregistrements invalides lors de l’utilisation de la validation côté client. Cela peut entraîner une perte de données.- Par défaut:
none
errors.deadletterqueue.topic.nameNom du sujet Kafka pour la file d’attente de lettres mortes. Effectif uniquement lorsque
snowflake.validation=client_sideeterrors.tolerance=all.- Par défaut:
vide (DLQ désactivée)
errors.log.enableLorsque
true, les erreurs sont journalisées avec les détails de l’opération qui a échoué et les propriétés de l’enregistrement.- Par défaut:
false
enable.task.fail.on.authorization.errorsLorsque
true, la tâche du connecteur échoue immédiatement en cas d’erreurs d’autorisation de Snowflake. Lorsquefalse, le connecteur effectue une nouvelle tentative.- Par défaut:
false
Mise en cache des propriétés¶
snowflake.cache.table.existsActive la mise en cache pour les contrôles d’existence des tables, réduisant le nombre de requêtes de métadonnées vers Snowflake.
- Par défaut:
true
snowflake.cache.table.exists.expire.msTemps d’expiration du cache en millisecondes pour les contrôles d’existence des tables.
- Par défaut:
300000(5 minutes)
snowflake.cache.pipe.existsActive la mise en cache pour les contrôles d’existence des canaux.
- Par défaut:
true
snowflake.cache.pipe.exists.expire.msTemps d’expiration du cache en millisecondes pour les contrôles d’existence des canaux.
- Par défaut:
300000(5 minutes)
Propriétés de surveillance et de diagnostic¶
jmxActive JMXMBeans pour les métriques du connecteur. Pour plus d’informations, voir Surveiller le Snowflake Connector for Kafka.
- Par défaut:
true
enable.mdc.loggingActive MDC (Mapped Diagnostic Context) pour ajouter le contexte du connecteur aux messages de journalisation, ce qui est utile lors de l’exécution de plusieurs instances de connecteur.
- Par défaut:
false
snowflake.streaming.metadata.connectorPushTimeLorsque
true, comprend l’horodatageSnowflakeConnectorPushTimedansRECORD_METADATA. Ce champ enregistre quand le connecteur a mis en mémoire tampon un enregistrement à des fins d’ingestion et est utile pour estimer la latence de bout en bout.- Par défaut:
true
Propriétés avancées¶
snowflake.streaming.client.provider.override.mapRemplacements des propriétés du client Snowpipe Streaming. Format :
key1:value1,key2:value2. À utiliser uniquement après avoir consulté le Support Snowflake.- Par défaut:
empty
Autres propriétés¶
snowflake.private.key.passphraseSi la valeur de ce paramètre n’est pas vide, le connecteur utilise cette expression pour tenter de déchiffrer la clé privée.
tasks.maxNombre de tâches, généralement égal au nombre de cœurs CPU sur les nœuds de travail du cluster Kafka Connect. Pour obtenir les meilleures performances, Snowflake recommande de définir un nombre de tâches égal au nombre total de partitions Kafka, sans dépasser le nombre de cœurs de CPU. Un nombre élevé de tâches peut entraîner une consommation accrue de mémoire et des rééquilibrages fréquents.
snowflake.topic2table.mapListe des correspondances entre sujet et table, séparées par des virgules, au format
topic:table. Prend en charge les modèles regex pour les noms de sujets. Les expressions régulières ne peuvent pas être ambiguës ; tout sujet correspondant doit correspondre à une seule table cible.Les noms de sujets et de tables peuvent être entre guillemets doubles pour prendre en charge les caractères spéciaux (deux-points, virgules, espaces). Les noms de tables qui ne sont pas entre guillemets sont en majuscules. Les noms de tables entre guillemets conservent la casse.
Pour des exemples détaillés incluant des modèles regex, des correspondances plusieurs-à-un et l’utilisation des guillemets, consultez Correspondance explicite entre sujet et table.
Exemple :
value.converter.schema.registry.urlSi le format est Avro et que vous utilisez un service de registre de schéma, il doit s’agir de l’URL du service de registre de schéma. Sinon, ce champ devrait être vide.
value.converter.break.on.schema.registry.errorSi vous chargez des données Avro à partir du service de registre de schéma, cette propriété détermine si le connecteur Kafka doit cesser de consommer des enregistrements s’il rencontre une erreur lors de la récupération de l’ID de schéma. La valeur par défaut est
false. Définissez la valeur surtruepour activer ce comportement.jvm.proxy.hostPour permettre au connecteur Snowflake Kafka d’accéder à Snowflake via un serveur proxy, définissez ce paramètre pour spécifier l’hôte de ce serveur proxy.
jvm.proxy.portPour permettre au connecteur Snowflake Kafka d’accéder à Snowflake via un serveur proxy, définissez ce paramètre pour spécifier le port de ce serveur proxy.
jvm.proxy.usernameNom d’utilisateur qui s’authentifie auprès du serveur proxy.
jvm.proxy.passwordMot de passe du nom d’utilisateur qui s’authentifie auprès du serveur proxy.
snowflake.jdbc.mapExemple :
"snowflake.jdbc.map": "networkTimeout:20,tracing:WARNING"Les propriétés JDBC supplémentaires (voir Référence Paramètre de connexion pilote JDBC) ne sont pas validées. Ces propriétés supplémentaires ne sont pas validées et ne doivent pas remplacer ni être utilisées à la place des propriétés obligatoires telles que
jvm.proxy.xxx,snowflake.user.name,snowflake.private.key,snowflake.schema.nameet des propriétés similaires.- Spécification de l’une des combinaisons suivantes :
Propriété
tracingavec variable d’environnementJDBC_TRACEPropriété
databaseavecsnowflake.database.name
Cela entraînera un comportement ambigu et le comportement sera déterminé par le pilote JDBC.
value.converter.basic.auth.credentials.sourceSi vous utilisez le format de données Avro et avez besoin d’un accès sécurisé au registre de schémas Kafka, définissez ce paramètre sur la chaîne « USER_INFO », puis définissez le paramètre
value.converter.basic.auth.user.infodécrit ci-dessous. Sinon, omettez ce paramètre.value.converter.basic.auth.user.infoSi vous utilisez le format de données Avro et avez besoin d’un accès sécurisé au registre de schémas Kafka, définissez ce paramètre sur la chaîne « <ID_utilisateur>:<motdepasse> », puis définissez le paramètre value.converter.basic.auth.credentials.source décrit ci-dessus. Sinon, omettez ce paramètre.
snowflake.metadata.createtimeSi la valeur est définie sur FALSE, la valeur de la propriété
CreateTimeest omise des métadonnées dans la colonne RECORD_METADATA. La valeur par défaut est TRUE.snowflake.metadata.topicSi la valeur est définie sur FALSE, la valeur de la propriété
topicest omise des métadonnées dans la colonne RECORD_METADATA. La valeur par défaut est TRUE.snowflake.metadata.offset.and.partitionSi la valeur est définie sur FALSE, les valeurs de propriété
OffsetetPartitionsont omises des métadonnées de la colonne RECORD_METADATA. La valeur par défaut est TRUE.snowflake.metadata.allSi la valeur est définie sur FALSE, les métadonnées de la colonne RECORD_METADATA sont complètement vides. La valeur par défaut est TRUE.
transformsSpécifier pour ignorer les enregistrements tombstone rencontrés par le connecteur Kafka et ne pas les charger dans la table cible. Un enregistrement de type tombstone est défini comme un enregistrement dont le champ de valeur est entièrement nul.
Définir la valeur de la propriété sur
"tombstoneHandlerExample".Note
Utilisez cette propriété uniquement avec les convertisseurs de communauté Kafka (c’est-à-dire la valeur de la propriété
value.converter) (par exempleorg.apache.kafka.connect.json.JsonConverterouorg.apache.kafka.connect.json.AvroConverter). Pour gérer le traitement des enregistrements tombstone avec les convertisseurs Snowflake, utilisez plutôt la propriétébehavior.on.null.values.transforms.tombstoneHandlerExample.typeRequis lors de la définition de la propriété
transforms.Définissez la valeur de la propriété sur
"io.confluent.connect.transforms.TombstoneHandler"behavior.on.null.valuesSpécifiez comment le connecteur Kafka doit traiter les enregistrements tombstone. Un enregistrement de type tombstone est défini comme un enregistrement dont le champ de valeur est entièrement nul. Pour Snowpipe, cette propriété est prise en charge par le connecteur Kafka version 1.5.5 et ultérieure. Pour Snowpipe Streaming, cette propriété est prise en charge par le connecteur Kafka à partir de la version 2.1.0.
Cette propriété prend en charge les valeurs suivantes :
DEFAULTLorsque le connecteur Kafka rencontre un enregistrement tombstone, il insère une chaîne JSON vide dans la colonne de contenu.
IGNORELe connecteur Kafka ignore les enregistrements tombstone et n’insère pas de lignes pour ces enregistrements.
La valeur par défaut est
DEFAULT.Note
L’ingestion des enregistrements tombstone varie selon les méthodes d’ingestion :
Pour Snowpipe, le connecteur Kafka utilise uniquement les convertisseurs Snowflake. Pour gérer le traitement des enregistrements tombstone avec les convertisseurs communautaires Kafka, utilisez plutôt les propriétés
transformettransforms.tombstoneHandlerExample.type.Pour Snowpipe Streaming, le connecteur Kafka utilise uniquement des convertisseurs communautaires.
Les enregistrements envoyés aux courtiers Kafka ne doivent pas être NULL, car ces enregistrements seront abandonnés par le connecteur Kafka, ce qui entraînera des décalages manquants. Les décalages manquants interrompront le connecteur Kafka dans des cas d’utilisation spécifiques. Il est recommandé d’utiliser les enregistrements tombstone plutôt que les enregistrements NULL.
Utilisation de l’authentification par paire de clés et rotation de clés¶
Le connecteur Kafka repose sur l’authentification par paire de clés au lieu de l’authentification par nom d’utilisateur et mot de passe. Cette méthode d’authentification nécessite une paire de clés de 2048 bits (minimum) RSA. Générez la paire de clés publiques-privées via OpenSSL. La clé publique est attribuée à l’utilisateur Snowflake défini dans le fichier de configuration.
Après avoir terminé les tâches d’authentification par paire de clés sur cette page et les tâches pour la rotation de paires de clés, évaluez la recommandation pour Externaliser les secrets, plus loin dans cette rubrique.
Pour configurer la paire de clés publiques/privées :
Depuis la ligne de commande d’une fenêtre de terminal, générez une clé privée.
Vous pouvez générer une version chiffrée ou non chiffrée de la clé privée.
Note
Le connecteur Kafka prend en charge des algorithmes de chiffrement validés pour répondre aux exigences de la norme Federal Information Processing Standard (140-2) (c’est-à-dire FIPS 140-2). Pour plus d’informations, voir FIPS 140-2.
Pour générer une version non chiffrée, utilisez la commande suivante :
Pour générer une version chiffrée, utilisez la commande suivante :
Où
<algorithme>est un algorithme de chiffrement conforme à FIPS 140-2.Par exemple, pour spécifier AES 256 comme algorithme de chiffrement :
Si vous générez une version chiffrée de la clé privée, enregistrez la phrase secrète. Plus tard, vous spécifierez la phrase secrète dans la propriété
snowflake.private.key.passphrasedu fichier de configuration de Kafka.Exemple de clé privée PEM
Depuis la ligne de commande, générez la clé publique en faisant référence à la clé privée :
En supposant que la clé privée soit chiffrée et contenue dans le fichier nommé
rsa_key.p8, utilisez la commande suivante :Exemple de clé publique PEM
Copiez les fichiers de clés publiques et privées dans un répertoire local en vue de leur stockage. Notez le chemin d’accès aux fichiers. La clé privée est stockée au format PKCS#8 (Public Key Cryptography Standards) et est chiffrée à l’aide de la phrase secrète que vous avez spécifiée à l’étape précédente ; toutefois, le fichier doit toujours être protégé contre tout accès non autorisé au moyen du mécanisme d’autorisation de fichier fourni par votre système d’exploitation. Il est de la responsabilité de l’utilisateur de sécuriser le fichier lorsqu’il n’est pas utilisé.
Connectez-vous à Snowflake. Attribuez la clé publique à l’utilisateur Snowflake en utilisant ALTER USER.
Par exemple :
Note
Seuls les administrateurs de sécurité (c’est-à-dire les utilisateurs disposant du rôle SECURITYADMIN) ou ayant un rôle supérieur peuvent modifier un utilisateur.
Excluez l’en-tête et le pied de page de la clé publique dans l’instruction SQL.
Vérifiez l’empreinte de la clé publique de l’utilisateur en utilisant DESCRIBE USER :
Note
La propriété
RSA_PUBLIC_KEY_2_FPest décrite dans Configuration de la rotation de paires de clés.Copiez et collez la clé privée complète dans le champ
snowflake.private.keydu fichier de configuration. Sauvegardez le fichier.
Externaliser les secrets¶
Snowflake recommande fortement d’externaliser des secrets tels que la clé privée et de les stocker sous une forme chiffrée ou dans un service de gestion de clés tel que AWS Key Management Service (KMS), Microsoft Azure Key Vault, ou HashiCorp Vault. Pour ce faire, utilisez une implémentation ConfigProvider sur votre cluster Kafka Connect.
Pour plus d’informations, voir la description de ce service par Confluent.
Démarrage du connecteur¶
Démarrez Kafka en suivant les instructions fournies dans la documentation tierce Confluent ou Apache Kafka. Vous pouvez démarrer le connecteur Kafka en mode distribué ou en mode autonome. Les instructions pour chacun sont indiquées ci-dessous :
Mode distribué¶
À partir d’une fenêtre de terminal, exécutez la commande suivante :
Mode autonome¶
À partir d’une fenêtre de terminal, exécutez la commande suivante :
Note
(Une installation par défaut d’Apache Kafka ou de Confluent Kafka devrait déjà comprendre le fichier connect-standalone.properties).