Utilisation de Connecteur Snowflake pour Kafka avec Snowpipe Streaming¶
Vous pouvez remplacer Snowpipe par Snowpipe Streaming dans votre chaîne de chargement de données depuis Kafka. Lorsque le seuil spécifié pour le vidage de la mémoire tampon (temps ou mémoire ou nombre de messages) est atteint, le connecteur appelle Snowpipe Streaming API (« API ») pour écrire des lignes de données dans les tables Snowflake, contrairement à Snowpipe, qui écrit les données à partir de fichiers temporaires en zone de préparation. Cette architecture permet de réduire les latences de chargement et, par conséquent, les coûts de chargement de volumes de données similaires.
La version 1.9.1 (ou supérieure) du connecteur Kafka est requise pour être utilisée avec Snowpipe Streaming. Le connecteur Kafka avec Snowpipe Streaming comprend le Snowflake Ingest SDK et prend en charge le streaming de lignes à partir de sujets Apache Kafka directement dans des tables cibles.
Note
Le connecteur Kafka avec Snowpipe Streaming ne prend actuellement pas en charge la détection des schémas ou leur évolution. Il utilise le même schéma de tables que celui utilisé avec Snowpipe.
Dans ce chapitre :
Version minimale requise¶
Le connecteur Kafka version 1.9.1 (ou ultérieure) prend en charge Snowpipe Streaming.
Propriétés de configuration de Kafka¶
Enregistrez vos paramètres de connexion dans le fichier de propriétés du connecteur Kafka. Pour plus d’informations, voir Configuration du connecteur Kafka.
Propriétés requises¶
Ajoutez ou modifiez vos paramètres de connexion dans le fichier de propriétés du connecteur Kafka. Pour plus d’informations, voir Configuration du connecteur Kafka.
snowflake.ingestion.method
Nécessaire uniquement si vous utilisez le connecteur Kafka comme client d’ingestion en continu. Indique s’il faut utiliser Snowpipe Streaming ou Snowpipe standard pour charger les données de votre sujet Kafka. Les valeurs prises en charge sont les suivantes :
SNOWPIPE_STREAMING
SNOWPIPE
(par défaut)
Aucun paramètre supplémentaire n’est requis pour choisir le service backend afin der mettre en file d’attente et charger les données des sujets. Configurez les propriétés supplémentaires dans le fichier de propriétés de votre connecteur Kafka comme d’habitude.
snowflake.role.name
Rôle de contrôle d’accès à utiliser lors de l’insertion des lignes dans la table.
Propriétés de la mémoire tampon et de requête¶
buffer.flush.time
Nombre de secondes entre les vidages de la mémoire tampon, chaque vidage entraînant des opérations d’insertion pour les enregistrements mis en mémoire tampon. Le connecteur Kafka appelle Snowpipe Streaming API (« API ») une fois après chaque vidage.
La valeur minimale prise en charge pour la propriété
buffer.flush.time
est1
(en secondes). Pour des débits de données moyens plus élevés, nous vous suggérons de diminuer la valeur par défaut afin d’améliorer la latence. Si le coût est plus important que la latence, vous pouvez augmenter le temps de vidage du tampon. Veillez à vider la mémoire tampon de Kafka avant qu’elle ne soit pleine afin d’éviter les exceptions liées à l’absence de mémoire.- Valeurs
1
- Pas de limite supérieure.- Par défaut
10
buffer.count.records
Nombre d’enregistrements mis en mémoire tampon par partition Kafka avant leur acquisition dans Snowflake.
- Valeurs
1
- Pas de limite supérieure.- Par défaut
10000
buffer.size.bytes
Taille cumulée en octets des enregistrements mis en mémoire tampon par partition Kafka avant d’être ingérés dans Snowflake en tant que fichiers de données.
Les enregistrements sont compressés lorsqu’ils sont écrits dans des fichiers de données. Par conséquent, la taille des enregistrements dans la mémoire tampon peut être supérieure à la taille des fichiers de données créés à partir des enregistrements.
- Valeurs
1
- Pas de limite supérieure.- Par défaut
20000000
(20 MB)
En plus des propriétés du connecteur Kafka, notez la propriété max.poll.records
du consommateur Kafka, qui contrôle le nombre maximum d’enregistrements renvoyés par Kafka à Kafka Connect en une seule requête. La valeur par défaut de 500
peut être augmentée, mais il faut tenir compte des contraintes de mémoire. Pour plus d’informations sur cette propriété, voir la documentation de votre package Kafka :
Gestion des erreurs et propriétés DLQ¶
errors.tolerance
Spécifie comment gérer les erreurs rencontrées par le connecteur Kafka :
Cette propriété prend en charge les valeurs suivantes :
NONE
Arrêtez le chargement des données à la première erreur rencontrée.
ALL
Ignorez toutes les erreurs et continuez à charger les données.
- Par défaut
NONE
errors.log.enable
Spécifie s’il faut ou non écrire les messages d’erreur dans le fichier journal de Kafka Connect.
Cette propriété prend en charge les valeurs suivantes :
TRUE
Écrire des messages d’erreur.
FALSE
Ne pas écrire de messages d’erreur.
- Par défaut
FALSE
errors.deadletterqueue.topic.name
Spécifie le nom du sujet DLQ (file d’attente de lettres mortes) dans Kafka pour la livraison de messages à Kafka qui n’ont pas pu être intégrés dans les tables Snowflake. Pour plus d’informations, voir Files d’attente de lettres mortes dans cette rubrique.
- Valeurs
Chaîne de texte personnalisée
- Par défaut
Aucune.
Sémantique unique et exacte¶
La sémantique unique et exacte garantit la livraison des messages Kafka sans duplication ni perte de données. Cette garantie de livraison est définie par défaut pour le connecteur Kafka avec Snowpipe Streaming.
Convertisseurs¶
Le connecteur Kafka avec Snowpipe Streaming ne prend pas en charge les valeurs key.converter
ou value.converter
suivantes :
com.snowflake.kafka.connector.records.SnowflakeJsonConverter
com.snowflake.kafka.connector.records.SnowflakeAvroConverter
com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry
Les convertisseurs Snowflake personnalisés traitent les erreurs qui empêchent le chargement des données en déplaçant les fichiers vers la zone de préparation de la table. Ce flux de travail entre en conflit avec Snowpipe Streaming Files d’attente de lettres mortes.
Les files d’attente de lettres mortes¶
Le connecteur Kafka avec Snowpipe Streaming prend en charge les files d’attente de lettres mortes (DLQ) pour les enregistrements interrompus ou les enregistrements qui ne peuvent être traités avec succès en raison d’une défaillance.
Pour plus d’informations sur la surveillance, voir la documentation d’Apache Kafka.
Facturation et utilisation¶
Pour des informations sur la facturation de Snowpipe Streaming, voir Facturation de Snowpipe Streaming.