Snowpipe Streaming

L’appel de Snowpipe Streaming API (« API ») provoque le chargement à faible latence de lignes de données en continu à l’aide de Snowflake Ingest SDK et de votre propre code d’application géré. L’ingestion en streaming API écrit des lignes de données dans les tables Snowflake, contrairement aux chargements de données en masse ou à Snowpipe, qui écrivent des données à partir de fichiers en zone de préparation. Cette architecture se traduit par des latences de chargement plus faibles, avec des coûts réduits correspondants pour le chargement de volumes de données similaires, ce qui en fait un outil puissant pour le traitement des flux de données en temps réel.

Cette rubrique décrit les concepts pour les applications client personnalisées qui appellent l’API. Pour les instructions relatives au connecteur Snowflake pour Kafka ( » connecteur Kafka « ), voir Utilisation de Connecteur Snowflake pour Kafka avec Snowpipe Streaming.

Dans ce chapitre :

Snowpipe Streaming API versus Snowpipe

Le API est destiné à compléter Snowpipe, pas à le remplacer. Utilisez Snowpipe Streaming API dans les scénarios de diffusion en streaming où les données sont diffusées via des lignes (par exemple, les sujets Apache Kafka) plutôt qu’écrites dans des fichiers. Le API s’intègre dans un flux d’ingestion qui comprend une application Java personnalisée existante qui produit ou reçoit des enregistrements. Le API supprime la nécessité de créer des fichiers pour charger les données dans les tables Snowflake, et permet le chargement automatique et continu des flux de données dans Snowflake au fur et à mesure que les données sont disponibles.

Snowpipe Streaming

Le tableau suivant décrit les différences entre Snowpipe Streaming et Snowpipe :

Catégorie

Snowpipe Streaming

Snowpipe

Forme des données à charger

Lignes

Fichiers. Si votre pipeline de données existant génère des fichiers dans un stockage Blob, nous vous recommandons d’utiliser Snowpipe au lieu du API.

Exigences relatives aux logiciels tiers

Code d’application Java personnalisé pour le Snowflake Ingest SDK.

Aucun

Commande de données

Insertions ordonnées dans chaque canal.

Non pris en charge. Snowpipe peut charger des données à partir de fichiers dans un ordre différent des horodatages de création des fichiers dans le stockage cloud.

Historique de chargement

Historique de chargement enregistré dans la vue SNOWPIPE_STREAMING_FILE_MIGRATION_HISTORY (Account Usage).

Historique de chargement enregistré dans la vue LOAD_HISTORY (Account Usage) et la fonction COPY_HISTORY (Information Schema).

Objet du canal

Ne nécessite pas d’objet de type « Canal ». Le API écrit les enregistrements directement dans les tables cibles.

Nécessite un objet Canal qui met en file d’attente et charge les données des fichiers en zone de préparation dans les tables cibles.

Exigences du logiciel

SDK Java

Le service Snowpipe Streaming est actuellement mis en œuvre sous la forme d’un ensemble de APIs pour le Snowflake Ingest SDK. Le pilote SDK peut être téléchargé à partir du répertoire central de Maven. Snowflake recommande d’utiliser les versions 2.0.2 et ultérieures du SDK Snowflake Ingest.

Le SDK prend en charge Java version 8 ou supérieure et nécessite Java Cryptography Extension (JCE) Unlimited Strength Jurisdiction Policy Files.

Important

Le SDK fait REST API des appels à Snowflake. Vous devrez peut-être ajuster les règles du pare-feu de votre réseau pour permettre la connectivité.

Application client personnalisée

Le API nécessite une interface d’application Java personnalisée capable de récupérer des lignes de données et de gérer les erreurs rencontrées. Vous devez veiller à ce que l’application fonctionne en continu et puisse redémarrer en cas de défaillance. Pour un lot de lignes donné, les API prennent en charge l’équivalent de ON_ERROR = CONTINUE | SKIP_BATCH | ABORT.

  • CONTINUE : continuer à charger les lignes de données acceptables et renvoyer toutes les erreurs.

  • SKIP_BATCH : ignorer le chargement et renvoyer toutes les erreurs si une erreur est rencontrée dans l’ensemble du lot de lignes.

  • ABORT (paramètre par défaut) : abandonner l’ensemble du lot de lignes et lever une exception à la première erreur rencontrée.

L’application doit capturer les erreurs en utilisant la réponse des méthodes insertRow (ligne unique) ou insertRows (ensemble de lignes).

Canaux

Le API ingère des lignes à travers un ou plusieurs canaux. Un canal représente une connexion de streaming logique et nommée avec Snowflake pour le chargement de données dans une table. Un seul canal correspond à une seule table dans Snowflake ; toutefois, plusieurs canaux peuvent pointer vers la même table. Le client SDK a la possibilité d’ouvrir plusieurs canaux vers plusieurs tables ; mais le client SDK ne peut pas ouvrir de canaux sur plusieurs comptes. L’ordre des lignes et les jetons de décalage correspondants sont préservés au sein d’un canal, mais pas entre les canaux qui pointent vers la même table.

Les canaux sont conçus pour durer longtemps lorsqu’un client insère activement des données, et ils doivent être réutilisés, car les informations de jeton de décalage sont conservées. Les données à l’intérieur du canal sont automatiquement effacées toutes les secondes, par défaut, et n’ont pas besoin d’être fermées. Pour plus d’informations, voir Latence.

Snowpipe streaming client channel table mapping

Vous pouvez lancer la commande SHOW CHANNELS pour obtenir la liste des chaînes pour lesquelles vous disposez de privilèges d’accès. Pour plus d’informations, consultez SHOW CHANNELS.

Note

Les canaux inactifs ainsi que leurs jetons de décalage sont automatiquement supprimés après 30 jours.

Jetons de décalage

Un jeton de décalage est une chaîne de caractères qu’un client peut inclure dans les demandes de méthode insertRow (une seule ligne) ou insertRows (ensemble de lignes) pour suivre la progression de l’ingestion sur la base par canal. Le jeton est initialisé à la valeur NULL lors de la création du canal et est mis à jour lorsque les lignes avec un jeton de décalage fourni sont validées dans Snowflake par le biais d’un processus asynchrone. Les clients peuvent périodiquement faire des demandes de méthode getLatestCommittedOffsetToken pour obtenir le dernier jeton de décalage engagé pour un canal particulier et l’utiliser pour raisonner sur la progression de l’ingestion. Notez que ce jeton n’est pas utilisé par Snowflake pour effectuer la déduplication ; cependant, les clients peuvent utiliser ce jeton pour effectuer la déduplication en utilisant votre code personnalisé.

Lorsqu’un client rouvre un canal, le dernier jeton de décalage persistant est renvoyé. Le client peut réinitialiser sa position dans la source de données en utilisant le jeton pour éviter d’envoyer deux fois les mêmes données. Notez que lorsqu’un événement de réouverture d’un canal se produit, toutes les données mises en mémoire tampon dans Snowflake sont écartées pour éviter de les engager.

Vous pouvez utiliser le dernier jeton de décalage engagé pour effectuer les opérations suivantes :

  • Suivre la progression de l’ingestion.

  • Vérifier si un décalage spécifique a été validé en le comparant au dernier jeton de décalage validé.

  • Avancer le décalage de la source et purger les données qui ont déjà été engagées.

  • Permettre la déduplication et assurer la livraison des données « exactly-once ».

Par exemple, le connecteur Kafka pourrait lire un jeton de décalage à partir d’un sujet tel que <partition>:<décalage>, ou simplement <décalage> si la partition est encodée dans le nom du canal. Prenons la requête suivante :

  1. Le connecteur Kafka est en ligne et ouvre un canal correspondant à Partition 1 dans le sujet Kafka T avec le nom de canal T:P1.

  2. Le connecteur commence à lire les enregistrements de la partition Kafka.

  3. Le connecteur appelle le API, en faisant une demande de méthode insertRows avec le décalage associé à l’enregistrement comme jeton de décalage.

    Par exemple, le jeton de décalage pourrait être 10, faisant référence au dixième enregistrement de la partition Kafka.

  4. Le connecteur effectue périodiquement des getLatestCommittedOffsetToken demandes de méthode pour déterminer la progression de l’ingestion.

Si le connecteur Kafka tombe en panne, la procédure suivante peut être complétée pour reprendre la lecture des enregistrements à partir du décalage correct pour la partition Kafka :

  1. Le connecteur Kafka revient en ligne et rouvre le canal en utilisant le même nom que précédemment.

  2. Le connecteur appelle le API, en faisant une demande de méthode getLatestCommittedOffsetToken pour obtenir le dernier décalage engagé pour la partition.

    Par exemple, supposons que le dernier jeton de décalage persistant est 20.

  3. Le connecteur utilise la lecture Kafka APIs pour réinitialiser un curseur correspondant au décalage plus 1 (21 dans cet exemple).

  4. Le connecteur reprend la lecture des enregistrements. Aucune donnée en double n’est récupérée après le repositionnement réussi du curseur de lecture.

Dans un autre exemple, une application lit les journaux d’un répertoire et utilise le client Snowpipe Streaming SDK pour exporter ces journaux vers Snowflake. Vous pouvez construire une application d’exportation de journaux qui fait ce qui suit :

  1. Lister les fichiers dans le répertoire du journal.

    Supposons que le framework de journalisation génère des fichiers journaux qui peuvent être ordonnés lexicographiquement et que les nouveaux fichiers journaux sont placés à la fin de cet ordre.

  2. Lit un fichier journal ligne par ligne et appelle les requêtes de la méthode API, effectuant insertRows avec un jeton de décalage correspondant au nom du fichier journal et au nombre de lignes ou à la position des octets.

    Par exemple, un jeton de décalage pourrait être messages_1.log:20, où messages_1.log est le nom du fichier journal et 20 est le numéro de ligne.

Si l’application tombe en panne ou doit être redémarrée, elle appelle alors le API, en faisant une demande de méthode getLatestCommittedOffsetToken pour récupérer un jeton de décalage qui correspond à au dernier fichier journal et à la dernière ligne exportés. En continuant avec l’exemple, cela pourrait être messages_1.log:20. L’application ouvrirait alors messages_1.log et chercherait la ligne 21 pour éviter que la même ligne de journal soit ingérée deux fois.

Note

Les informations relatives au jeton de décalage peuvent être perdues. Le jeton de décalage est lié à un objet de canal, et un canal est automatiquement effacé si aucune nouvelle ingestion n’est effectuée en utilisant le canal pendant une période de 30 jours. Pour éviter la perte du jeton de décalage, envisagez de maintenir un décalage séparé et de réinitialiser le jeton de décalage du canal si nécessaire.

Meilleures pratiques en matière de livraison « exactly-once »

Il peut être difficile d’assurer une livraison en une seule fois, et il est essentiel de respecter les principes suivants dans votre code personnalisé :

  • Pour garantir une récupération appropriée en cas d’exceptions, de défaillances ou de pannes, vous devez toujours rouvrir le canal et redémarrer l’ingestion en utilisant le dernier jeton de décalage validé.

  • Bien que votre application puisse maintenir son propre décalage, il est crucial d’utiliser le dernier jeton de décalage engagé fourni par Snowflake comme source de vérité et de réinitialiser votre propre décalage en conséquence.

  • Le seul cas où votre propre décalage doit être considéré comme la source de vérité est lorsque le jeton de décalage de Snowflake est défini ou réinitialisé sur une valeur NULL. Un jeton de décalage NULL signifie généralement l’une des choses suivantes :

    • Il s’agit d’un nouveau canal, aucun jeton de décalage n’a donc été défini.

    • La table cible a été supprimée et recréée, de sorte que le canal est considéré comme nouveau.

    • Aucune activité d’ingestion n’ayant eu lieu pendant 30 jours, le canal a été automatiquement nettoyé et les informations relatives au jeton de décalage ont été perdues.

  • Si nécessaire, vous pouvez périodiquement purger les données source qui ont déjà été validées sur la base du dernier jeton de décalage validé et avancer votre propre décalage.

Pour plus d’informations sur la façon dont le connecteur Kafka avec Snowpipe Streaming réalise la livraison « exactly-once », consultez Sémantique unique et exacte.

Latence

Snowpipe Streaming efface automatiquement les données des canaux toutes les secondes. Vous n’avez pas besoin de fermer le canal pour que les données soient vidées.

Avec les versions de SDK 2.0.4 et ultérieures de Snowflake Ingest, vous pouvez configurer la latence via l’option max_client_lag. L’option par défaut est d’1 seconde. La latence maximale peut être fixée à 10 minutes. Pour plus d’informations, voir MAX_CLIENT_LAG.

Notez que le connecteur Kafka pour Snowpipe Streaming a son propre tampon. Une fois que le temps de vidage de tampon Kafka est atteint, les données seront envoyées avec une seconde de latence à Snowflake par le biais de Snowpipe Streaming. Pour plus d’informations, voir temps.vidage.tampon.

Migration vers des fichiers optimisés

Le API écrit les lignes des canaux dans les blobs du stockage cloud, qui sont ensuite transférés dans la table cible. Initialement, les données streamées écrites dans une table cible sont stockées dans un format de fichier intermédiaire temporaire. À ce stade, la table est considérée comme une « table mixte », car les données partitionnées sont stockées dans un mélange de fichiers natifs et intermédiaires. Un processus automatisé en arrière-plan fait migrer les données des fichiers intermédiaires actifs vers des fichiers natifs optimisés pour les requêtes et les opérations DML, selon les besoins.

Réplication

Le Streaming Snowpipe prend en charge la réplication et le basculement des tables Snowflake remplies par Snowpipe Streaming et ses décalages de canaux associés d’un compte source vers un compte cible dans différentes régions et sur les plateformes Cloud.

Pour plus d’informations, voir Réplication et Streaming Snowpipe.

Opérations à insertion uniquement

L” API est actuellement limité à l’insertion de lignes. Pour modifier, supprimer ou combiner des données, écrivez les enregistrements  » bruts  » dans une ou plusieurs tables mises en zone de préparation. Fusionnez, joignez ou transformez les données en utilisant des pipelines de données continus pour insérer les données modifiées dans les tableaux de reporting de destination.

Classes et interfaces

Pour la documentation sur les classes et les interfaces, voir API SDK Snowflake Ingest.

Types de données Java pris en charge

Le tableau suivant résume les types de données Java pris en charge pour l’ingestion dans les colonnes Snowflake :

Type de colonne Snowflake

Type de données Java autorisées

  • CHAR

  • VARCHAR

  • Chaîne

  • les types de données primitifs (int, boolean, char, …)

  • BigInteger, BigDecimal

  • BINARY

  • octet[]

  • Chaîne (encodée en hexadécimal)

  • NUMBER

  • Types numériques (BigInteger, BigDecimal, byte, int, double, …)

  • Chaîne

  • FLOAT

  • Types numériques (BigInteger, BigDecimal, byte, int, double, …)

  • Chaîne

  • BOOLEAN

  • booléen

  • Types numériques (BigInteger, BigDecimal, byte, int, double, …)

  • Chaîne

Voir détails de la conversion booléenne.

  • TIME

  • java.time.LocalTime

  • java.time.OffsetTime

  • Chaîne

    • Temps stocké en entier

    • HH24:MI:SS.FFTZH:TZM (par ex. 20:57:01.123456789+07:00)

    • HH24:MI:SS.FF (par ex. 20:57:01.123456789)

    • HH24:MI:SS (par ex. 20:57:01)

    • HH24:MI (par ex. 20:57)

  • DATE

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.time.Instant

  • Chaîne

    • Date stockée en entier

    • YYYY-MM-DD (par ex. 2013-04-28)

    • YYYY-MM-DDTHH24:MI:SS.FFTZH:TZM (par ex. 2013-04-28T20:57:01.123456789+07:00)

    • YYYY-MM-DDTHH24:MI:SS.FF (par ex. 2013-04-28T20:57:01.123456)

    • YYYY-MM-DDTHH24:MI:SS (par ex. 2013-04-28T20:57:01)

    • YYYY-MM-DDTHH24:MI (par ex. 2013-04-28T20:57)

    • YYYY-MM-DDTHH24:MI:SSTZH:TZM (par ex. 2013-04-28T20:57:01-07:00)

    • YYYY-MM-DDTHH24:MITZH:TZM (par ex. 2013-04-28T20:57-07:00)

  • TIMESTAMP_NTZ

  • TIMESTAMP_LTZ

  • TIMESTAMP_TZ

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.time.Instant

  • Chaîne

    • Horodatage stocké en entier

    • YYYY-MM-DD (par ex. 2013-04-28)

    • YYYY-MM-DDTHH24:MI:SS.FFTZH:TZM (par ex. 2013-04-28T20:57:01.123456789+07:00)

    • YYYY-MM-DDTHH24:MI:SS.FF (par ex. 2013-04-28T20:57:01.123456)

    • YYYY-MM-DDTHH24:MI:SS (par ex. 2013-04-28T20:57:01)

    • YYYY-MM-DDTHH24:MI (par ex. 2013-04-28T20:57)

    • YYYY-MM-DDTHH24:MI:SSTZH:TZM (par ex. 2013-04-28T20:57:01-07:00)

    • YYYY-MM-DDTHH24:MITZH:TZM (par ex. 2013-04-28T20:57-07:00)

  • VARIANT

  • ARRAY

  • Chaîne (doit être un JSON valide)

  • les types de données primitifs et leurs tableaux

  • BigInteger, BigDecimal

  • java.time.LocalTime

  • java.time.OffsetTime

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.util.Map<String, T> où T est un type VARIANT valide

  • T[] où T est un type VARIANT valide

  • Liste<T> où T est un type VARIANT valide

  • OBJECT

  • Chaîne (doit être un objet JSON valide)

  • Map<String, T> où T est un type de variante valide

  • GEOGRAPHY

  • Non pris en charge

  • GEOMETRY

  • Non pris en charge

Privilèges d’accès requis

L’appel de l’API de Snowpipe Streaming nécessite un rôle avec les privilèges suivants :

Objet

Privilège

Table

OWNERSHIP ou au minimum INSERT et EVOLVE SCHEMA (uniquement requis lors de l’utilisation de l’évolution des schémas pour le connecteur Kafka avec Snowpipe Streaming)

Base de données

USAGE

Schéma

USAGE

Limites

Snowpipe Streaming prend uniquement en charge l’utilisation de clés AES 256 bits pour le chiffrement des données.

Les objets ou types suivants ne sont pas pris en charge :

  • Les types de données GEOGRAPHY et GEOMETRY

  • Colonnes avec classements

  • Tables TRANSIENT ou TEMPORARY

  • Tables avec l’une des options de colonne suivantes :

    • AUTOINCREMENT ou IDENTITY

    • Valeur par défaut de la colonne qui n’est pas NULL.