Utilisation de Connecteur Snowflake pour Kafka avec Snowpipe Streaming

Vous pouvez remplacer Snowpipe par Snowpipe Streaming dans votre chaîne de chargement de données depuis Kafka. Lorsque le seuil spécifié pour le vidage de la mémoire tampon (temps ou mémoire ou nombre de messages) est atteint, le connecteur appelle Snowpipe Streaming API (« API ») pour écrire des lignes de données dans les tables Snowflake, contrairement à Snowpipe, qui écrit les données à partir de fichiers temporaires en zone de préparation. Cette architecture permet de réduire les latences de chargement et, par conséquent, les coûts de chargement de volumes de données similaires.

La version 1.9.1 (ou supérieure) du connecteur Kafka est requise pour être utilisée avec Snowpipe Streaming. Le connecteur Kafka avec Snowpipe Streaming comprend le Snowflake Ingest SDK et prend en charge le streaming de lignes à partir de sujets Apache Kafka directement dans des tables cibles.

Snowpipe Streaming with Kafka connector

Note

Le connecteur Kafka avec Snowpipe Streaming ne prend actuellement pas en charge la détection des schémas ou leur évolution. Il utilise le même schéma de tables que celui utilisé avec Snowpipe.

Dans ce chapitre :

Version minimale requise

Le connecteur Kafka version 1.9.1 (ou ultérieure) prend en charge Snowpipe Streaming.

Propriétés de configuration de Kafka

Enregistrez vos paramètres de connexion dans le fichier de propriétés du connecteur Kafka. Pour plus d’informations, voir Configuration du connecteur Kafka.

Propriétés requises

Ajoutez ou modifiez vos paramètres de connexion dans le fichier de propriétés du connecteur Kafka. Pour plus d’informations, voir Configuration du connecteur Kafka.

snowflake.ingestion.method

Nécessaire uniquement si vous utilisez le connecteur Kafka comme client d’ingestion en continu. Indique s’il faut utiliser Snowpipe Streaming ou Snowpipe standard pour charger les données de votre sujet Kafka. Les valeurs prises en charge sont les suivantes :

  • SNOWPIPE_STREAMING

  • SNOWPIPE (par défaut)

Aucun paramètre supplémentaire n’est requis pour choisir le service backend afin der mettre en file d’attente et charger les données des sujets. Configurez les propriétés supplémentaires dans le fichier de propriétés de votre connecteur Kafka comme d’habitude.

snowflake.role.name

Rôle de contrôle d’accès à utiliser lors de l’insertion des lignes dans la table.

Propriétés de la mémoire tampon et de requête

buffer.flush.time

Nombre de secondes entre les vidages de la mémoire tampon, chaque vidage entraînant des opérations d’insertion pour les enregistrements mis en mémoire tampon. Le connecteur Kafka appelle Snowpipe Streaming API (« API ») une fois après chaque vidage.

La valeur minimale prise en charge pour la propriété buffer.flush.time est 1 (en secondes). Pour des débits de données moyens plus élevés, nous vous suggérons de diminuer la valeur par défaut afin d’améliorer la latence. Si le coût est plus important que la latence, vous pouvez augmenter le temps de vidage du tampon. Veillez à vider la mémoire tampon de Kafka avant qu’elle ne soit pleine afin d’éviter les exceptions liées à l’absence de mémoire.

Valeurs

1 - Pas de limite supérieure.

Par défaut

10

buffer.count.records

Nombre d’enregistrements mis en mémoire tampon par partition Kafka avant leur acquisition dans Snowflake.

Valeurs

1 - Pas de limite supérieure.

Par défaut

10000

buffer.size.bytes

Taille cumulée en octets des enregistrements mis en mémoire tampon par partition Kafka avant d’être ingérés dans Snowflake en tant que fichiers de données.

Les enregistrements sont compressés lorsqu’ils sont écrits dans des fichiers de données. Par conséquent, la taille des enregistrements dans la mémoire tampon peut être supérieure à la taille des fichiers de données créés à partir des enregistrements.

Valeurs

1 - Pas de limite supérieure.

Par défaut

20000000 (20 MB)

En plus des propriétés du connecteur Kafka, notez la propriété max.poll.records du consommateur Kafka, qui contrôle le nombre maximum d’enregistrements renvoyés par Kafka à Kafka Connect en une seule requête. La valeur par défaut de 500 peut être augmentée, mais il faut tenir compte des contraintes de mémoire. Pour plus d’informations sur cette propriété, voir la documentation de votre package Kafka :

Gestion des erreurs et propriétés DLQ

errors.tolerance

Spécifie comment gérer les erreurs rencontrées par le connecteur Kafka :

Cette propriété prend en charge les valeurs suivantes :

NONE

Arrêtez le chargement des données à la première erreur rencontrée.

ALL

Ignorez toutes les erreurs et continuez à charger les données.

Par défaut

NONE

errors.log.enable

Spécifie s’il faut ou non écrire les messages d’erreur dans le fichier journal de Kafka Connect.

Cette propriété prend en charge les valeurs suivantes :

TRUE

Écrire des messages d’erreur.

FALSE

Ne pas écrire de messages d’erreur.

Par défaut

FALSE

errors.deadletterqueue.topic.name

Spécifie le nom du sujet DLQ (file d’attente de lettres mortes) dans Kafka pour la livraison de messages à Kafka qui n’ont pas pu être intégrés dans les tables Snowflake. Pour plus d’informations, voir Files d’attente de lettres mortes dans cette rubrique.

Valeurs

Chaîne de texte personnalisée

Par défaut

Aucune.

Sémantique unique et exacte

La sémantique unique et exacte garantit la livraison des messages Kafka sans duplication ni perte de données. Cette garantie de livraison est définie par défaut pour le connecteur Kafka avec Snowpipe Streaming.

Le connecteur Kafka adopte un mappage univoque entre la partition et le canal et utilise deux décalages distincts :

  • Décalage du consommateur : il s’agit du décalage le plus récent consommé par le consommateur et géré par Kafka.

  • Jeton de compensation : ce jeton suit le décalage le plus récent dans Snowflake et est géré par Snowflake.

Le connecteur Kafka assure une livraison « exactly-once » en mettant en œuvre les meilleures pratiques suivantes :

Ouverture/réouverture d’un canal :

  • Lors de l’ouverture ou de la réouverture d’un canal pour une partition donnée, le connecteur Kafka utilise le dernier jeton de décalage engagé récupéré de Snowflake via l’API getLatestCommittedOffsetToken comme source de vérité et réinitialise le décalage du consommateur dans Kafka en conséquence.

  • Si le décalage du consommateur n’est plus dans la période de conservation des données, une exception est lancée et vous pouvez décider de l’action appropriée à prendre.

  • Le seul scénario dans lequel le connecteur Kafka ne réinitialise pas le décalage du consommateur dans Kafka et l’utilise comme source de vérité est lorsque le jeton de décalage de Snowflake est NULL. Dans ce cas, le connecteur accepte le décalage envoyé par Kafka et le jeton de décalage est ensuite mis à jour.

Traitement des enregistrements :

  • Pour garantir une couche supplémentaire de sécurité contre les décalages non continus qui pourraient résulter de bogues potentiels dans Kafka, Snowflake maintient une variable en mémoire qui suit le dernier décalage traité. Snowflake n’accepte les lignes que si le décalage de la ligne actuelle est égal au dernier décalage traité plus un, ajoutant ainsi une couche de protection supplémentaire pour garantir que le processus d’ingestion est continu et précis.

Gestion des exceptions, des défaillances, des accidents et des reprises après sinistre :

  • Dans le cadre du processus de récupération, Snowflake respecte systématiquement la logique d’ouverture/réouverture du canal décrite précédemment en rouvrant le canal et en réinitialisant le décalage du consommateur avec le dernier jeton de décalage engagé. Ce faisant, Kafka reçoit le signal d’envoyer les données à partir de la valeur de décalage qui est supérieure d’une unité au dernier jeton de décalage engagé, ce qui nous permet de reprendre l’ingestion à partir du point de défaillance avec une perte de données minime, voire nulle.

Mise en œuvre d’un mécanisme de relance :

  • Pour tenir compte des problèmes transitoires potentiels, Snowflake incorpore un mécanisme de relance dans les appels API. Snowflake tente plusieurs fois ces appels API afin d’augmenter les chances de succès et d’atténuer le risque d’échecs intermittents affectant le processus d’ingestion.

Faire progresser le décalage des consommateurs :

  • À intervalles réguliers, Snowflake fait progresser le décalage du consommateur en utilisant le dernier jeton de décalage engagé pour s’assurer que le processus d’ingestion est continuellement aligné sur le dernier état des données dans Snowflake.

Convertisseurs

Le connecteur Kafka avec Snowpipe Streaming ne prend pas en charge les valeurs key.converter ou value.converter suivantes :

  • com.snowflake.kafka.connector.records.SnowflakeJsonConverter

  • com.snowflake.kafka.connector.records.SnowflakeAvroConverter

  • com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry

Les convertisseurs Snowflake personnalisés traitent les erreurs qui empêchent le chargement des données en déplaçant les fichiers vers la zone de préparation de la table. Ce flux de travail entre en conflit avec Snowpipe Streaming Files d’attente de lettres mortes.

Les files d’attente de lettres mortes

Le connecteur Kafka avec Snowpipe Streaming prend en charge les files d’attente de lettres mortes (DLQ) pour les enregistrements interrompus ou les enregistrements qui ne peuvent être traités avec succès en raison d’une défaillance.

Pour plus d’informations sur la surveillance, voir la documentation d’Apache Kafka.

Facturation et utilisation

Pour des informations sur la facturation de Snowpipe Streaming, voir Coûts de streaming Snowpipe.