Utilisation de Connecteur Snowflake pour Kafka avec Snowpipe Streaming

Vous pouvez remplacer Snowpipe par Snowpipe Streaming dans votre chaîne de chargement de données depuis Kafka. Lorsque le seuil spécifié pour le vidage de la mémoire tampon (temps ou mémoire ou nombre de messages) est atteint, le connecteur appelle Snowpipe Streaming API (« API ») pour écrire des lignes de données dans les tables Snowflake, contrairement à Snowpipe, qui écrit les données à partir de fichiers temporaires en zone de préparation. Cette architecture permet de réduire les latences de chargement et, par conséquent, les coûts de chargement de volumes de données similaires.

La version 1.9.1 (ou supérieure) du connecteur Kafka est requise pour être utilisée avec Snowpipe Streaming. Le connecteur Kafka avec Snowpipe Streaming comprend le Snowflake Ingest SDK et prend en charge le streaming de lignes à partir de sujets Apache Kafka directement dans des tables cibles.

Snowpipe Streaming with Kafka connector

Note

Le connecteur Kafka avec Snowpipe Streaming ne prend actuellement pas en charge la détection des schémas ou leur évolution. Il utilise le même schéma de tables que celui utilisé avec Snowpipe.

Dans ce chapitre :

Version minimale requise

Le connecteur Kafka version 1.9.1 (ou ultérieure) prend en charge Snowpipe Streaming.

Propriétés de configuration de Kafka

Enregistrez vos paramètres de connexion dans le fichier de propriétés du connecteur Kafka. Pour plus d’informations, voir Configuration du connecteur Kafka.

Propriétés requises

Ajoutez ou modifiez vos paramètres de connexion dans le fichier de propriétés du connecteur Kafka. Pour plus d’informations, voir Configuration du connecteur Kafka.

snowflake.ingestion.method

Nécessaire uniquement si vous utilisez le connecteur Kafka comme client d’ingestion en continu. Indique s’il faut utiliser Snowpipe Streaming ou Snowpipe standard pour charger les données de votre sujet Kafka. Les valeurs prises en charge sont les suivantes :

  • SNOWPIPE_STREAMING

  • SNOWPIPE (par défaut)

Aucun paramètre supplémentaire n’est requis pour choisir le service backend afin der mettre en file d’attente et charger les données des sujets. Configurez les propriétés supplémentaires dans le fichier de propriétés de votre connecteur Kafka comme d’habitude.

snowflake.role.name

Rôle de contrôle d’accès à utiliser lors de l’insertion des lignes dans la table.

Propriétés de la mémoire tampon et de requête

buffer.flush.time

Nombre de secondes entre les vidages de la mémoire tampon, chaque vidage entraînant des opérations d’insertion pour les enregistrements mis en mémoire tampon. Le connecteur Kafka appelle Snowpipe Streaming API (« API ») une fois après chaque vidage.

La valeur minimale prise en charge pour la propriété buffer.flush.time est 1 (en secondes). Pour des débits de données moyens plus élevés, nous vous suggérons de diminuer la valeur par défaut afin d’améliorer la latence. Si le coût est plus important que la latence, vous pouvez augmenter le temps de vidage du tampon. Veillez à vider la mémoire tampon de Kafka avant qu’elle ne soit pleine afin d’éviter les exceptions liées à l’absence de mémoire.

Valeurs

1 - Pas de limite supérieure.

Par défaut

10

buffer.count.records

Nombre d’enregistrements mis en mémoire tampon par partition Kafka avant leur acquisition dans Snowflake.

Valeurs

1 - Pas de limite supérieure.

Par défaut

10000

buffer.size.bytes

Taille cumulée en octets des enregistrements mis en mémoire tampon par partition Kafka avant d’être ingérés dans Snowflake en tant que fichiers de données.

Les enregistrements sont compressés lorsqu’ils sont écrits dans des fichiers de données. Par conséquent, la taille des enregistrements dans la mémoire tampon peut être supérieure à la taille des fichiers de données créés à partir des enregistrements.

Valeurs

1 - Pas de limite supérieure.

Par défaut

20000000 (20 MB)

En plus des propriétés du connecteur Kafka, notez la propriété max.poll.records du consommateur Kafka, qui contrôle le nombre maximum d’enregistrements renvoyés par Kafka à Kafka Connect en une seule requête. La valeur par défaut de 500 peut être augmentée, mais il faut tenir compte des contraintes de mémoire. Pour plus d’informations sur cette propriété, voir la documentation de votre package Kafka :

Gestion des erreurs et propriétés DLQ

errors.tolerance

Spécifie comment gérer les erreurs rencontrées par le connecteur Kafka :

Cette propriété prend en charge les valeurs suivantes :

NONE

Arrêtez le chargement des données à la première erreur rencontrée.

ALL

Ignorez toutes les erreurs et continuez à charger les données.

Par défaut

NONE

errors.log.enable

Spécifie s’il faut ou non écrire les messages d’erreur dans le fichier journal de Kafka Connect.

Cette propriété prend en charge les valeurs suivantes :

TRUE

Écrire des messages d’erreur.

FALSE

Ne pas écrire de messages d’erreur.

Par défaut

FALSE

errors.deadletterqueue.topic.name

Spécifie le nom du sujet DLQ (file d’attente de lettres mortes) dans Kafka pour la livraison de messages à Kafka qui n’ont pas pu être intégrés dans les tables Snowflake. Pour plus d’informations, voir Files d’attente de lettres mortes dans cette rubrique.

Valeurs

Chaîne de texte personnalisée

Par défaut

Aucune.

Sémantique unique et exacte

La sémantique unique et exacte garantit la livraison des messages Kafka sans duplication ni perte de données. Cette garantie de livraison est définie par défaut pour le connecteur Kafka avec Snowpipe Streaming.

Convertisseurs

Le connecteur Kafka avec Snowpipe Streaming ne prend pas en charge les valeurs key.converter ou value.converter suivantes :

  • com.snowflake.kafka.connector.records.SnowflakeJsonConverter

  • com.snowflake.kafka.connector.records.SnowflakeAvroConverter

  • com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry

Les convertisseurs Snowflake personnalisés traitent les erreurs qui empêchent le chargement des données en déplaçant les fichiers vers la zone de préparation de la table. Ce flux de travail entre en conflit avec Snowpipe Streaming Files d’attente de lettres mortes.

Les files d’attente de lettres mortes

Le connecteur Kafka avec Snowpipe Streaming prend en charge les files d’attente de lettres mortes (DLQ) pour les enregistrements interrompus ou les enregistrements qui ne peuvent être traités avec succès en raison d’une défaillance.

Pour plus d’informations sur la surveillance, voir la documentation d’Apache Kafka.

Facturation et utilisation

Pour des informations sur la facturation de Snowpipe Streaming, voir Facturation de Snowpipe Streaming.