Utilisation de Connecteur Snowflake pour Kafka avec Snowpipe Streaming¶
Vous pouvez remplacer Snowpipe par Snowpipe Streaming dans votre chaîne de chargement de données depuis Kafka. Lorsque le seuil spécifié pour le vidage de la mémoire tampon (temps ou mémoire ou nombre de messages) est atteint, le connecteur appelle Snowpipe Streaming API (« API ») pour écrire des lignes de données dans les tables Snowflake, contrairement à Snowpipe, qui écrit les données à partir de fichiers temporaires en zone de préparation. Cette architecture permet de réduire les latences de chargement et, par conséquent, les coûts de chargement de volumes de données similaires.
La version 1.9.1 (ou supérieure) du connecteur Kafka est requise pour être utilisée avec Snowpipe Streaming. Le connecteur Kafka avec Snowpipe Streaming comprend le Snowflake Ingest SDK et prend en charge le streaming de lignes à partir de sujets Apache Kafka directement dans des tables cibles.
Note
Le connecteur Kafka avec Snowpipe Streaming ne prend actuellement pas en charge la détection des schémas ou leur évolution. Il utilise le même schéma de tables que celui utilisé avec Snowpipe.
Dans ce chapitre :
Version minimale requise¶
Le connecteur Kafka version 1.9.1 (ou ultérieure) prend en charge Snowpipe Streaming.
Propriétés de configuration de Kafka¶
Enregistrez vos paramètres de connexion dans le fichier de propriétés du connecteur Kafka. Pour plus d’informations, voir Configuration du connecteur Kafka.
Propriétés requises¶
Ajoutez ou modifiez vos paramètres de connexion dans le fichier de propriétés du connecteur Kafka. Pour plus d’informations, voir Configuration du connecteur Kafka.
snowflake.ingestion.method
Nécessaire uniquement si vous utilisez le connecteur Kafka comme client d’ingestion en continu. Indique s’il faut utiliser Snowpipe Streaming ou Snowpipe standard pour charger les données de votre sujet Kafka. Les valeurs prises en charge sont les suivantes :
SNOWPIPE_STREAMING
SNOWPIPE
(par défaut)
Aucun paramètre supplémentaire n’est requis pour choisir le service backend afin der mettre en file d’attente et charger les données des sujets. Configurez les propriétés supplémentaires dans le fichier de propriétés de votre connecteur Kafka comme d’habitude.
snowflake.role.name
Rôle de contrôle d’accès à utiliser lors de l’insertion des lignes dans la table.
Propriétés de la mémoire tampon et de requête¶
buffer.flush.time
Nombre de secondes entre les vidages de la mémoire tampon, chaque vidage entraînant des opérations d’insertion pour les enregistrements mis en mémoire tampon. Le connecteur Kafka appelle Snowpipe Streaming API (« API ») une fois après chaque vidage.
La valeur minimale prise en charge pour la propriété
buffer.flush.time
est1
(en secondes). Pour des débits de données moyens plus élevés, nous vous suggérons de diminuer la valeur par défaut afin d’améliorer la latence. Si le coût est plus important que la latence, vous pouvez augmenter le temps de vidage du tampon. Veillez à vider la mémoire tampon de Kafka avant qu’elle ne soit pleine afin d’éviter les exceptions liées à l’absence de mémoire.- Valeurs
1
- Pas de limite supérieure.- Par défaut
10
buffer.count.records
Nombre d’enregistrements mis en mémoire tampon par partition Kafka avant leur acquisition dans Snowflake.
- Valeurs
1
- Pas de limite supérieure.- Par défaut
10000
buffer.size.bytes
Taille cumulée en octets des enregistrements mis en mémoire tampon par partition Kafka avant d’être ingérés dans Snowflake en tant que fichiers de données.
Les enregistrements sont compressés lorsqu’ils sont écrits dans des fichiers de données. Par conséquent, la taille des enregistrements dans la mémoire tampon peut être supérieure à la taille des fichiers de données créés à partir des enregistrements.
- Valeurs
1
- Pas de limite supérieure.- Par défaut
20000000
(20 MB)
En plus des propriétés du connecteur Kafka, notez la propriété max.poll.records
du consommateur Kafka, qui contrôle le nombre maximum d’enregistrements renvoyés par Kafka à Kafka Connect en une seule requête. La valeur par défaut de 500
peut être augmentée, mais il faut tenir compte des contraintes de mémoire. Pour plus d’informations sur cette propriété, voir la documentation de votre package Kafka :
Gestion des erreurs et propriétés DLQ¶
errors.tolerance
Spécifie comment gérer les erreurs rencontrées par le connecteur Kafka :
Cette propriété prend en charge les valeurs suivantes :
NONE
Arrêtez le chargement des données à la première erreur rencontrée.
ALL
Ignorez toutes les erreurs et continuez à charger les données.
- Par défaut
NONE
errors.log.enable
Spécifie s’il faut ou non écrire les messages d’erreur dans le fichier journal de Kafka Connect.
Cette propriété prend en charge les valeurs suivantes :
TRUE
Écrire des messages d’erreur.
FALSE
Ne pas écrire de messages d’erreur.
- Par défaut
FALSE
errors.deadletterqueue.topic.name
Spécifie le nom du sujet DLQ (file d’attente de lettres mortes) dans Kafka pour la livraison de messages à Kafka qui n’ont pas pu être intégrés dans les tables Snowflake. Pour plus d’informations, voir Files d’attente de lettres mortes dans cette rubrique.
- Valeurs
Chaîne de texte personnalisée
- Par défaut
Aucune.
Sémantique unique et exacte¶
La sémantique unique et exacte garantit la livraison des messages Kafka sans duplication ni perte de données. Cette garantie de livraison est définie par défaut pour le connecteur Kafka avec Snowpipe Streaming.
Le connecteur Kafka adopte un mappage univoque entre la partition et le canal et utilise deux décalages distincts :
Décalage du consommateur : il s’agit du décalage le plus récent consommé par le consommateur et géré par Kafka.
Jeton de compensation : ce jeton suit le décalage le plus récent dans Snowflake et est géré par Snowflake.
Le connecteur Kafka assure une livraison « exactly-once » en mettant en œuvre les meilleures pratiques suivantes :
Ouverture/réouverture d’un canal :
Lors de l’ouverture ou de la réouverture d’un canal pour une partition donnée, le connecteur Kafka utilise le dernier jeton de décalage engagé récupéré de Snowflake via l’API
getLatestCommittedOffsetToken
comme source de vérité et réinitialise le décalage du consommateur dans Kafka en conséquence.Si le décalage du consommateur n’est plus dans la période de conservation des données, une exception est lancée et vous pouvez décider de l’action appropriée à prendre.
Le seul scénario dans lequel le connecteur Kafka ne réinitialise pas le décalage du consommateur dans Kafka et l’utilise comme source de vérité est lorsque le jeton de décalage de Snowflake est NULL. Dans ce cas, le connecteur accepte le décalage envoyé par Kafka et le jeton de décalage est ensuite mis à jour.
Traitement des enregistrements :
Pour garantir une couche supplémentaire de sécurité contre les décalages non continus qui pourraient résulter de bogues potentiels dans Kafka, Snowflake maintient une variable en mémoire qui suit le dernier décalage traité. Snowflake n’accepte les lignes que si le décalage de la ligne actuelle est égal au dernier décalage traité plus un, ajoutant ainsi une couche de protection supplémentaire pour garantir que le processus d’ingestion est continu et précis.
Gestion des exceptions, des défaillances, des accidents et des reprises après sinistre :
Dans le cadre du processus de récupération, Snowflake respecte systématiquement la logique d’ouverture/réouverture du canal décrite précédemment en rouvrant le canal et en réinitialisant le décalage du consommateur avec le dernier jeton de décalage engagé. Ce faisant, Kafka reçoit le signal d’envoyer les données à partir de la valeur de décalage qui est supérieure d’une unité au dernier jeton de décalage engagé, ce qui nous permet de reprendre l’ingestion à partir du point de défaillance avec une perte de données minime, voire nulle.
Mise en œuvre d’un mécanisme de relance :
Pour tenir compte des problèmes transitoires potentiels, Snowflake incorpore un mécanisme de relance dans les appels API. Snowflake tente plusieurs fois ces appels API afin d’augmenter les chances de succès et d’atténuer le risque d’échecs intermittents affectant le processus d’ingestion.
Faire progresser le décalage des consommateurs :
À intervalles réguliers, Snowflake fait progresser le décalage du consommateur en utilisant le dernier jeton de décalage engagé pour s’assurer que le processus d’ingestion est continuellement aligné sur le dernier état des données dans Snowflake.
Convertisseurs¶
Le connecteur Kafka avec Snowpipe Streaming ne prend pas en charge les valeurs key.converter
ou value.converter
suivantes :
com.snowflake.kafka.connector.records.SnowflakeJsonConverter
com.snowflake.kafka.connector.records.SnowflakeAvroConverter
com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry
Les convertisseurs Snowflake personnalisés traitent les erreurs qui empêchent le chargement des données en déplaçant les fichiers vers la zone de préparation de la table. Ce flux de travail entre en conflit avec Snowpipe Streaming Files d’attente de lettres mortes.
Les files d’attente de lettres mortes¶
Le connecteur Kafka avec Snowpipe Streaming prend en charge les files d’attente de lettres mortes (DLQ) pour les enregistrements interrompus ou les enregistrements qui ne peuvent être traités avec succès en raison d’une défaillance.
Pour plus d’informations sur la surveillance, voir la documentation d’Apache Kafka.
Facturation et utilisation¶
Pour des informations sur la facturation de Snowpipe Streaming, voir Coûts de streaming Snowpipe.