Snowflake High Performance connector for Kafka : Configurer Kafka¶
Cette rubrique décrit les étapes pour installer et configurer Kafka pour Snowflake High Performance connector for Kafka.
Installation du connecteur Kafka¶
Le connecteur Kafka est fourni sous forme de fichier JAR (exécutable Java).
Snowflake fournit deux versions du connecteur :
Une version pour l’implémentation Confluent de Kafka Connect.
Une version pour le open source software (OSS) Apache Kafka package https://mvnrepository.com/artifact/com.snowflake/snowflake-kafka-connector/.
Les deux versions du connecteur sont disponibles dans la prévisualisation privée de Snowflake et doivent être obtenues auprès de Snowflake. Contactez l’équipe de votre compte Snowflake pour obtenir le fichier JAR du connecteur.
Si vous n’êtes pas sûr de la version à utiliser, consultez Sélection d’une version de connecteur. Configuration du connecteur Kafka ==============================================================================
La configuration du connecteur est spécifique au fournisseur. Certaines implémentations, comme Amazon MSK Connect, disposent d’une UI pour la configuration du connecteur et acceptent la configuration au format JSON ainsi que dans un format de fichier de propriétés.
Cette section est une référence générale pour les noms et les valeurs des paramètres du connecteur. N’oubliez pas que différents fournisseurs Cloud peuvent avoir des exigences de configuration légèrement différentes.
Important
L’infrastructure Kafka Connect diffuse les paramètres de configuration du connecteur Kafka du nœud maître aux nœuds de travail. Les paramètres de configuration incluent des informations sensibles (en particulier le nom d’utilisateur et la clé privée Snowflake). Assurez-vous de sécuriser le canal de communication entre les nœuds Kafka Connect. Pour obtenir des instructions, consultez la documentation de votre logiciel Apache Kafka.
Chaque fichier de configuration spécifie les sujets et les tables correspondantes pour une base de données et un schéma dans cette base de données. Notez qu’un connecteur peut intégrer des messages provenant d’un nombre illimité de rubriques, mais les tables correspondantes doivent toutes se trouver dans une seule base de données et un seul schéma.
Pour une description des champs de configuration, voir Propriétés de configuration du connecteur.
Important
Comme le fichier de configuration contient généralement des informations relatives à la sécurité, telles que la clé privée, définissez les privilèges de lecture / écriture de manière appropriée sur le fichier pour limiter l’accès.
En outre, envisagez de stocker le fichier de configuration dans un emplacement externe sécurisé ou dans un service de gestion de clés.
Exemple de fichier json de configuration
{
"name":"XYZCompanySensorData",
"config":{
"connector.class": "com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector",
"tasks.max": "1",
"snowflake.topic2table.map": "topic1:table_1,topic2:table_2",
"snowflake.url.name": "myorganization-myaccount.snowflakecomputing.com:443",
"snowflake.warehouse.name": "WH",
"snowflake.private.key": "-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n",
"snowflake.schema.name": "MY_SCHEMA",
"snowflake.database.name": "MY_DATABASE",
"snowflake.role.name": "MY_ROLE",
"snowflake.user.name": "MY_USER",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"errors.log.enable": "true",
"topics": "topic1,topic2",
"value.converter.schemas.enable": "false",
"errors.tolerance": "all"
}
}
Exemple de fichier de propriétés de configuration
connector.class=com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector
tasks.max=1
snowflake.topic2table.map=topic1:table_1,topic2:table_2
snowflake.url.name=myorganization-myaccount.snowflakecomputing.com:443
snowflake.warehouse.name=WH
snowflake.private.key=-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n
snowflake.schema.name=MY_SCHEMA
snowflake.database.name=MY_DATABASE
snowflake.role.name=MY_ROLE
snowflake.user.name=MY_USER
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
errors.log.enable=true
topics=topic1,topic2
name=XYZCompanySensorData
value.converter.schemas.enable=false
errors.tolerance=all
Propriétés de configuration du connecteur¶
Propriétés requises¶
nameNom de l’application. Cela doit être unique pour tous les connecteurs Kafka utilisés par le client. Ce nom doit être un identificateur non délimité Snowflake et valide. Pour plus d’informations sur les identificateurs valides, voir Exigences relatives à l’identificateur.
connector.classcom.snowflake.kafka.connector.SnowflakeStreamingSinkConnectortopicsListe de sujets séparés par des virgules. Par défaut, Snowflake suppose que le nom de la table est identique à celui du sujet. Si le nom de la table est différent du nom du sujet, utilisez le paramètre facultatif
topic2table.map(ci-dessous) pour spécifier le mappage entre le nom du sujet et le nom de la table. Ce nom de table doit être un identificateur non spécifié Snowflake et valide. Pour plus d’informations sur les noms de table valides, voir Exigences relatives à l’identificateur.Note
:emph:``
topicsoutopics.regexest requis ; pas les deux.topics.regexIl s’agit d’une expression régulière (« regex ») qui spécifie les sujets contenant les messages à charger dans les tables Snowflake. Le connecteur charge les données de tout nom de sujet correspondant à l’expression régulière. L’expression régulière doit respecter les règles applicables aux expressions régulières Java (c.-à-d. être compatible avec java.util.regex.Pattern). Le fichier de configuration doit contenir
topicsoutopics.regex, pas les deux.snowflake.url.nameL’URL pour accéder à votre compte Snowflake. Cette URL doit inclure votre identificateur de compte. Notez que le protocole (
https://) et le numéro de port sont facultatifs.snowflake.user.nameNom de connexion de l’utilisateur pour le compte Snowflake.
snowflake.role.nameNom du rôle que le connecteur utilisera pour insérer des données dans la table.
snowflake.private.keyClé privée pour authentifier l’utilisateur. Incluez uniquement la clé, pas l’en-tête ni le pied de page. Si la clé est divisée sur plusieurs lignes, supprimez les sauts de ligne. Vous pouvez fournir une clé non chiffrée ou une clé chiffrée et fournir le paramètre
snowflake.private.key.passphrasepour permettre à Snowflake de déchiffrer la clé. Utilisez ce paramètre si et seulement si la valeur du paramètresnowflake.private.keyest chiffrée. Celui-ci déchiffre les clés privées qui ont été chiffrées conformément aux instructions fournies dans la section Authentification par paire de clés et rotation de paires de clés.Note
Consultez également
snowflake.private.key.passphrasedans les Propriétés facultatives.snowflake.database.nameNom de la base de données contenant la table dans laquelle insérer des lignes.
snowflake.schema.nameNom du schéma contenant la table dans laquelle insérer des lignes.
header.converterObligatoire uniquement si les enregistrements sont formatés en Avro et incluent un en-tête. La valeur par défaut est
"org.apache.kafka.connect.storage.StringConverter".key.converterIl s’agit du convertisseur de clé de l’enregistrement Kafka, (par exemple
"org.apache.kafka.connect.storage.StringConverter"). Ce connecteur n’est pas utilisé par le connecteur Kafka, mais il est requis par la plate-forme Kafka Connect.Voir Limitations du connecteur Kafka pour les limitations actuelles.
value.converterLe connecteur prend en charge les convertisseurs communautaires Kafka standard. Choisissez le convertisseur approprié en fonction de votre format de données :
Pour les enregistrements JSON :
"org.apache.kafka.connect.json.JsonConverter"Pour les enregistrements Avro avec le registre de schéma :
"io.confluent.connect.avro.AvroConverter"
Voir Limitations du connecteur Kafka pour les limitations actuelles.
Propriétés facultatives¶
snowflake.private.key.passphraseSi la valeur de ce paramètre n’est pas vide, le connecteur utilise cette expression pour tenter de déchiffrer la clé privée.
tasks.maxNombre de tâches, généralement égal au nombre de cœurs CPU sur les nœuds de travail du cluster Kafka Connect. Pour obtenir les meilleures performances, Snowflake recommande de définir un nombre de tâches égal au nombre total de partitions Kafka, sans dépasser le nombre de cœurs de CPU. Un nombre élevé de tâches peut entraîner une consommation accrue de mémoire et des rééquilibrages fréquents.
snowflake.topic2table.mapCe paramètre facultatif permet à un utilisateur de spécifier quelles rubriques doivent être mappées à quelles tables. Chaque sujet et son nom de table doivent être séparés par le signe deux-points (voir exemple ci-dessous). Ce nom de table doit être un identificateur non spécifié Snowflake et valide. Pour plus d’informations sur les noms de table valides, voir Exigences relatives à l’identificateur. La configuration de la rubrique sujet permet l’utilisation d’expressions régulières pour définir des rubriques, tout comme l’utilisation de
topics.regex. Les expressions régulières ne peuvent pas être ambiguës. Toute rubrique correspondante doit correspondre à une seule table cible.Exemple :
topics="topic1,topic2,topic5,topic6" snowflake.topic2table.map="topic1:low_range,topic2:low_range,topic5:high_range,topic6:high_range"
peut s’écrire ainsi :
topics.regex="topic[0-9]" snowflake.topic2table.map="topic[0-4]:low_range,topic[5-9]:high_range"
value.converter.schema.registry.urlSi le format est Avro et que vous utilisez un service de registre de schéma, il doit s’agir de l’URL du service de registre de schéma. Sinon, ce champ devrait être vide.
value.converter.break.on.schema.registry.errorSi vous chargez des données Avro à partir du service de registre de schéma, cette propriété détermine si le connecteur Kafka doit cesser de consommer des enregistrements s’il rencontre une erreur lors de la récupération de l’ID de schéma. La valeur par défaut est
false. Définissez la valeur surtruepour activer ce comportement.jvm.proxy.hostPour permettre au connecteur Snowflake Kafka d’accéder à Snowflake via un serveur proxy, définissez ce paramètre pour spécifier l’hôte de ce serveur proxy.
jvm.proxy.portPour permettre au connecteur Snowflake Kafka d’accéder à Snowflake via un serveur proxy, définissez ce paramètre pour spécifier le port de ce serveur proxy.
snowflake.streaming.max.client.lagSpécifie la fréquence à laquelle Snowflake Ingest Java nettoie les données vers Snowflake, en secondes.
- Valeurs:
Minimum :
1secondeMaximum :
600secondes
- Par défaut:
:code:`1`seconde
jvm.proxy.usernameNom d’utilisateur qui s’authentifie auprès du serveur proxy.
jvm.proxy.passwordMot de passe du nom d’utilisateur qui s’authentifie auprès du serveur proxy.
snowflake.jdbc.mapExemple :
"snowflake.jdbc.map": "networkTimeout:20,tracing:WARNING"Les propriétés JDBC supplémentaires (voir Référence Paramètre de connexion pilote JDBC) ne sont pas validées. Ces propriétés supplémentaires ne sont pas validées et ne doivent pas remplacer ni être utilisées à la place des propriétés obligatoires telles que :
jvm.proxy.xxx,snowflake.user.name,snowflake.private.key,snowflake.schema.nameetc.- Spécification de l’une des combinaisons suivantes :
Propriété
tracingavec variable d’environnementJDBC_TRACEPropriété
databaseavecsnowflake.database.name
Cela entraînera un comportement ambigu et le comportement sera déterminé par le pilote JDBC.
value.converter.basic.auth.credentials.sourceSi vous utilisez le format de données Avro et avez besoin d’un accès sécurisé au registre de schémas Kafka, définissez ce paramètre sur la chaîne « USER_INFO », puis définissez le paramètre
value.converter.basic.auth.user.infodécrit ci-dessous. Sinon, omettez ce paramètre.value.converter.basic.auth.user.infoSi vous utilisez le format de données Avro et avez besoin d’un accès sécurisé au registre de schémas Kafka, définissez ce paramètre sur la chaîne « <ID_utilisateur>:<motdepasse> », puis définissez le paramètre value.converter.basic.auth.credentials.source décrit ci-dessus. Sinon, omettez ce paramètre.
snowflake.metadata.createtimeSi la valeur est définie sur FALSE, la valeur de la propriété
CreateTimeest omise des métadonnées dans la colonne RECORD_METADATA. La valeur par défaut est TRUE.snowflake.metadata.topicSi la valeur est définie sur FALSE, la valeur de la propriété
topicest omise des métadonnées dans la colonne RECORD_METADATA. La valeur par défaut est TRUE.snowflake.metadata.offset.and.partitionSi la valeur est définie sur FALSE, les valeurs de propriété
OffsetetPartitionsont omises des métadonnées de la colonne RECORD_METADATA. La valeur par défaut est TRUE.snowflake.metadata.allSi la valeur est définie sur FALSE, les métadonnées de la colonne RECORD_METADATA sont complètement vides. La valeur par défaut est TRUE.
transformsSpécifier pour ignorer les enregistrements tombstone rencontrés par le connecteur Kafka et ne pas les charger dans la table cible. Un enregistrement de type tombstone est défini comme un enregistrement dont le champ de valeur est entièrement nul.
Définir la valeur de la propriété sur
"tombstoneHandlerExample".Note
Utilisez cette propriété uniquement avec les convertisseurs de communauté Kafka (c’est-à-dire la valeur de la propriété
value.converter) (par exempleorg.apache.kafka.connect.json.JsonConverterouorg.apache.kafka.connect.json.AvroConverter). Pour gérer le traitement des enregistrements tombstone avec les convertisseurs Snowflake, utilisez plutôt la propriétébehavior.on.null.values.transforms.tombstoneHandlerExample.typeRequis lors de la définition de la propriété
transforms.Définissez la valeur de la propriété sur
"io.confluent.connect.transforms.TombstoneHandler"behavior.on.null.valuesSpécifiez comment le connecteur Kafka doit traiter les enregistrements tombstone. Un enregistrement de type tombstone est défini comme un enregistrement dont le champ de valeur est entièrement nul. Pour Snowpipe, cette propriété est prise en charge par le connecteur Kafka version 1.5.5 et ultérieure. Pour Snowpipe Streaming, cette propriété est prise en charge par le connecteur Kafka à partir de la version 2.1.0.
Cette propriété prend en charge les valeurs suivantes :
DEFAULTLorsque le connecteur Kafka rencontre un enregistrement tombstone, il insère une chaîne JSON vide dans la colonne de contenu.
IGNORELe connecteur Kafka ignore les enregistrements tombstone et n’insère pas de lignes pour ces enregistrements.
La valeur par défaut est
DEFAULT.Note
L’ingestion des enregistrements tombstone varie selon les méthodes d’ingestion :
Pour Snowpipe, le connecteur Kafka utilise uniquement les convertisseurs Snowflake. Pour gérer le traitement des enregistrements tombstone avec les convertisseurs communautaires Kafka, utilisez plutôt les propriétés
transformettransforms.tombstoneHandlerExample.type.Pour Snowpipe Streaming, le connecteur Kafka utilise uniquement des convertisseurs communautaires.
Les enregistrements envoyés aux courtiers Kafka ne doivent pas être NULL, car ces enregistrements seront abandonnés par le connecteur Kafka, ce qui entraînera des décalages manquants. Les décalages manquants interrompront le connecteur Kafka dans des cas d’utilisation spécifiques. Il est recommandé d’utiliser les enregistrements tombstone plutôt que les enregistrements NULL.