Snowpipe Streaming

L’appel de Snowpipe Streaming API (« API ») provoque des chargements à faible latence de lignes de données en continu à l’aide de Snowflake Ingest SDK et de votre propre code d’application géré. L’ingestion en streaming API écrit des lignes de données dans les tables Snowflake, contrairement aux chargements de données en masse ou à Snowpipe, qui écrivent des données à partir de fichiers en zone de préparation. Cette architecture se traduit par des latences de chargement plus faibles, avec des coûts réduits correspondants pour le chargement de volumes de données similaires, ce qui en fait un outil puissant pour le traitement des flux de données en temps réel.

Cette rubrique décrit les concepts pour les applications client personnalisées qui appellent l’API. Pour les instructions relatives au connecteur Snowflake pour Kafka ( » connecteur Kafka « ), voir Utilisation de Connecteur Snowflake pour Kafka avec Snowpipe Streaming.

Dans ce chapitre :

Snowpipe Streaming API vs Snowpipe

Le API est destiné à compléter Snowpipe, pas à le remplacer. Utilisez Snowpipe Streaming API dans les scénarios de diffusion en streaming où les données sont diffusées via des lignes (par exemple, les sujets Apache Kafka) plutôt qu’écrites dans des fichiers. Le API s’intègre dans un flux d’ingestion qui comprend une application Java personnalisée existante qui produit ou reçoit des enregistrements. Le API supprime la nécessité de créer des fichiers pour charger les données dans les tables Snowflake, et permet le chargement automatique et continu des flux de données dans Snowflake au fur et à mesure que les données sont disponibles.

Snowpipe Streaming

Le tableau suivant décrit les différences entre Snowpipe Streaming et Snowpipe :

Catégorie

Snowpipe Streaming

Snowpipe

Forme des données à charger

Lignes.

Fichiers. Si votre pipeline de données existant génère des fichiers dans un stockage blob, nous vous recommandons d’utiliser Snowpipe plutôt que le API.

Exigences relatives aux logiciels tiers

Un code d’application Java personnalisé pour le Snowflake Ingest SDK.

Aucun.

Commande de données

Insertions ordonnées dans chaque canal.

Non pris en charge. Snowpipe peut charger des données à partir de fichiers dans un ordre différent des horodatages de création des fichiers dans le stockage cloud.

Historique de chargement

Historique de chargement enregistré dans la vue SNOWPIPE_STREAMING_FILE_MIGRATION_HISTORY (Account Usage).

Historique de chargement enregistré dans la vue LOAD_HISTORY (Account Usage) et la fonction COPY_HISTORY (Information Schema).

Objet du canal

Ne nécessite pas d’objet de type  » Canal « . Le API écrit les enregistrements directement dans les tables cibles.

Nécessite un objet Canal qui met en file d’attente et charge les données des fichiers en zone de préparation dans les tables cibles.

Exigences du logiciel

SDK Java

Le service Snowpipe Streaming est actuellement mis en œuvre sous la forme d’un ensemble de APIs pour le Snowflake Ingest SDK. Le pilote SDK peut être téléchargé à partir du répertoire central de Maven : https://mvnrepository.com/artifact/net.snowflake/snowflake-ingest-sdk. Le SDK prend en charge Java version 8 (ou supérieure) et nécessite Java Cryptography Extension (JCE) Unlimited Strength Jurisdiction Policy Files.

Important

Le SDK fait REST API des appels à Snowflake. Vous devrez peut-être ajuster les règles du pare-feu de votre réseau pour permettre la connectivité.

Application client personnalisée

Le API nécessite une interface d’application Java personnalisée capable de récupérer des lignes de données et de gérer les erreurs rencontrées. Vous devez veiller à ce que l’application fonctionne en continu et puisse redémarrer en cas de défaillance. Pour un ensemble donné de lignes, le API prend en charge l’équivalent de ON_ERROR = CONTINUE | ABORT. ABORT abandonne l’ensemble du lot après la première erreur trouvée et constitue le paramètre par défaut, et CONTINUE continue à charger les données si des erreurs sont trouvées.

L’application doit capturer les erreurs en utilisant la réponse des méthodes insertRow (ligne unique) ou insertRows (ensemble de lignes).

Canaux

Le API ingère des lignes via un ou plusieurs canaux. Un canal représente une connexion de streaming logique et nommée avec Snowflake pour le chargement de données dans une table. Un seul canal correspond à une seule table dans Snowflake ; toutefois, plusieurs canaux peuvent pointer vers la même table. Le client SDK a la possibilité d’ouvrir plusieurs canaux vers plusieurs tables, mais le client SDK ne peut pas ouvrir de canaux sur plusieurs comptes. L’ordre des lignes et les jetons de décalage correspondants sont préservés au sein d’un canal, mais pas entre les canaux qui pointent vers la même table.

Snowpipe streaming client channel table mapping

Note

Inactive channels along with their offset tokens are deleted automatically after 14 days.

Jetons de décalage

Un jeton de décalage est une chaîne de caractères qu’un client peut inclure dans les demandes de méthode insertRow (une seule ligne) ou insertRows (ensemble de lignes) pour suivre la progression de l’ingestion sur la base par canal. Les clients peuvent périodiquement faire des demandes de méthode getLatestCommittedOffsetToken pour obtenir le dernier jeton de décalage engagé pour un canal particulier et l’utiliser pour raisonner sur la progression de l’ingestion. Notez que ce jeton n’est pas utilisé par Snowflake pour effectuer la déduplication ; cependant, les clients peuvent utiliser ce jeton pour effectuer la déduplication en utilisant votre code personnalisé.

Lorsqu’un client rouvre un canal, le dernier jeton de décalage persistant est renvoyé. Le client peut réinitialiser sa position dans la source de données en utilisant le jeton pour éviter d’envoyer deux fois les mêmes données. Notez que lorsqu’un événement de réouverture d’un canal se produit, toutes les données mises en mémoire tampon dans Snowflake sont écartées pour éviter de les engager.

Par exemple, le connecteur Kafka pourrait lire un jeton de décalage à partir du sujet tel que <partition>:<décalage>, ou simplement <décalage> si la partition est encodée dans le nom du canal. Prenons la requête suivante :

  1. Le connecteur Kafka est en ligne et ouvre un canal correspondant à Partition 1 dans le sujet Kafka T avec le nom de canal T:P1.

  2. Le connecteur commence à lire les enregistrements de la partition Kafka. Le connecteur appelle le API, en faisant une demande de méthode insertRows avec le décalage associé à l’enregistrement comme jeton de décalage. Par exemple, le jeton de décalage pourrait être 10, faisant référence au 10ème enregistrement de la partition Kafka.

  3. Le connecteur effectue périodiquement des getLatestCommittedOffsetToken demandes de méthode pour déterminer la progression de l’ingestion.

Si le connecteur Kafka tombe en panne, le flux suivant peut être complété pour reprendre la lecture des enregistrements à partir du décalage correct pour la partition Kafka :

  1. Le connecteur Kafka revient en ligne et rouvre le canal en utilisant le même nom que précédemment.

  2. Le connecteur appelle le API, en faisant une demande de méthode getLatestCommittedOffsetToken pour obtenir le dernier décalage engagé pour la partition. Par exemple, supposons que le dernier jeton de décalage persistant est 20.

  3. Le connecteur utilise la lecture Kafka APIs pour réinitialiser un curseur correspondant au décalage plus 1 : 21 dans cet exemple.

  4. Le connecteur reprend la lecture des enregistrements. Aucune donnée en double n’est récupérée après le repositionnement réussi du curseur de lecture.

Autre exemple, supposons que vous ayez une application qui lit les journaux d’un répertoire et exporte ces journaux vers Snowflake à l’aide du client Snowpipe Streaming SDK. Vous pouvez construire une application d’exportation de journaux qui fait ce qui suit :

  1. Lister les fichiers dans le répertoire du journal. Supposons que le framework de journalisation génère des fichiers journaux qui peuvent être ordonnés lexicographiquement et que les nouveaux fichiers journaux sont placés à la fin de cet ordre.

  2. Lit un fichier journal ligne par ligne et appelle les requêtes de la méthode API, effectuant insertRows avec un jeton de décalage correspondant au nom du fichier journal et au nombre de lignes ou à la position des octets. Par exemple, un jeton de décalage pourrait être messages_1.log:20, où messages_1.log est le nom du fichier journal et 20 est le numéro de ligne.

Si l’application tombe en panne ou doit être redémarrée, elle appelle alors le API, en faisant une demande de méthode getLatestCommittedOffsetToken pour récupérer un jeton de décalage correspondant au dernier fichier journal et à la dernière ligne exportés. En continuant avec l’exemple, cela pourrait être messages_1.log:20. L’application ouvrirait alors messages_1.log et chercherait la ligne 21 pour éviter que la même ligne de journal soit ingérée deux fois.

Migration vers des fichiers optimisés

Le API écrit les lignes des canaux dans les blobs du stockage cloud, qui sont ensuite transférés dans la table cible. Initialement, les données streamées écrites dans une table cible sont stockées dans un format de fichier intermédiaire temporaire. À ce stade, la table est considérée comme une  » table mixte « , car les données partitionnées sont stockées dans un mélange de fichiers natifs et intermédiaires. Un processus automatisé en arrière-plan fait migrer les données des fichiers intermédiaires actifs vers des fichiers natifs optimisés pour les requêtes et les opérations DML, selon les besoins.

Opérations à insertion uniquement

L” API est actuellement limité à l’insertion de lignes. Pour modifier, supprimer ou combiner des données, écrivez les enregistrements  » bruts  » dans une ou plusieurs tables mises en zone de préparation. Fusionnez, joignez ou transformez les données en utilisant des pipelines de données continus pour insérer les données modifiées dans les tableaux de reporting de destination.

Classes et interfaces

Pour la documentation sur les classes et les interfaces, voir API SDK Snowflake Ingest.

Types de données Java pris en charge

Le tableau suivant résume les types de données Java pris en charge pour l’ingestion dans les colonnes Snowflake :

Type de colonne Snowflake

Type de données Java autorisées

  • CHAR

  • VARCHAR

  • Chaîne

  • les types de données primitifs (int, boolean, char, …)

  • BigInteger, BigDecimal

  • BINARY

  • octet[]

  • Chaîne (encodée en hexadécimal)

  • NUMBER

  • Types numériques (BigInteger, BigDecimal, byte, int, double, …)

  • Chaîne

  • FLOAT

  • Types numériques (BigInteger, BigDecimal, byte, int, double, …)

  • Chaîne

  • BOOLEAN

  • booléen

  • Types numériques (BigInteger, BigDecimal, byte, int, double, …)

  • Chaîne

Voir détails de la conversion booléenne.

  • TIME

  • java.time.LocalTime

  • java.time.OffsetTime

  • Chaîne

    • Temps stocké en entier

    • HH24:MI:SS.FFTZH:TZM (par ex. 20:57:01.123456789+07:00)

    • HH24:MI:SS.FF (par ex. 20:57:01.123456789)

    • HH24:MI:SS (par ex. 20:57:01)

    • HH24:MI (par ex. 20:57)

  • DATE

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.time.Instant

  • Chaîne

    • Date stockée en entier

    • YYYY-MM-DD (par ex. 2013-04-28)

    • YYYY-MM-DDTHH24:MI:SS.FFTZH:TZM (par ex. 2013-04-28T20:57:01.123456789+07:00)

    • YYYY-MM-DDTHH24:MI:SS.FF (par ex. 2013-04-28T20:57:01.123456)

    • YYYY-MM-DDTHH24:MI:SS (par ex. 2013-04-28T20:57:01)

    • YYYY-MM-DDTHH24:MI (par ex. 2013-04-28T20:57)

    • YYYY-MM-DDTHH24:MI:SSTZH:TZM (par ex. 2013-04-28T20:57:01-07:00)

    • YYYY-MM-DDTHH24:MITZH:TZM (par ex. 2013-04-28T20:57-07:00)

  • TIMESTAMP_NTZ

  • TIMESTAMP_LTZ

  • TIMESTAMP_TZ

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.time.Instant

  • Chaîne

    • Horodatage stocké en entier

    • YYYY-MM-DD (par ex. 2013-04-28)

    • YYYY-MM-DDTHH24:MI:SS.FFTZH:TZM (par ex. 2013-04-28T20:57:01.123456789+07:00)

    • YYYY-MM-DDTHH24:MI:SS.FF (par ex. 2013-04-28T20:57:01.123456)

    • YYYY-MM-DDTHH24:MI:SS (par ex. 2013-04-28T20:57:01)

    • YYYY-MM-DDTHH24:MI (par ex. 2013-04-28T20:57)

    • YYYY-MM-DDTHH24:MI:SSTZH:TZM (par ex. 2013-04-28T20:57:01-07:00)

    • YYYY-MM-DDTHH24:MITZH:TZM (par ex. 2013-04-28T20:57-07:00)

  • VARIANT

  • ARRAY

  • Chaîne (doit être un JSON valide)

  • les types de données primitifs et leurs tableaux

  • BigInteger, BigDecimal

  • java.time.LocalTime

  • java.time.OffsetTime

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.util.Map<String, T> où T est un type VARIANT valide

  • T[] où T est un type VARIANT valide

  • Liste<T> où T est un type VARIANT valide

  • OBJECT

  • Chaîne (doit être un objet JSON valide)

  • Map<String, T> où T est un type de variante valide

  • GEOGRAPHY

  • Non pris en charge

  • GEOMETRY

  • Non pris en charge

Limites

  • Les tables avec un des paramètres de colonne suivants ne sont pas prises en charge :

    • AUTOINCREMENT ou IDENTITY

    • Valeur par défaut de la colonne qui n’est pas NULL.

  • Les types de données GEOGRAPHY et GEOMETRY ne sont pas pris en charge.

  • Les colonnes avec classements ne sont pas prises en charge.

  • Snowpipe Streaming prend uniquement en charge l’utilisation de clés AES 256 bits pour le chiffrement des données.

  • Les tables TRANSIENT ou TEMPORARY ne sont pas prises en charge.