Guide de migration de Snowpipe Streaming¶
Ce guide décrit comment migrer depuis le SDK Java Snowpipe classique vers le SDK Snowpipe Streaming hautes performances. Les modifications architecturales et les mises à jour d’API décrites ici s’appliquent également aux migrations vers le SDK Python, car l’architecture hautes performances est disponible dans les deux langages. Bien que les exemples de code de ce document soient en Java, les principes de migration fondamentaux restent cohérents à travers les langages.
Modifications architecturales clés¶
Le tableau suivant résume les modifications architecturales les plus importantes dans le SDK Snowpipe Streaming hautes performances. Pour obtenir une comparaison détaillée des SDKs, consultez Comparaison entre Snowpipe Streaming hautes performances et les SDKs classiques.
Zone |
Classique (snowflake-ingest-java) |
Hautes performances (SDK snowpipe-streaming) |
|---|---|---|
Point d’entrée |
Les données sont ingérées directement dans les tables. |
Les données sont ingérées via des objets PIPE, qui prennent en charge les transformations et l’application des schémas. |
SDK/Noyau |
SDK Java uniquement. |
SDK en plusieurs langages (Java et Python) avec un noyau Rust partagé. |
Noms d’API |
|
|
Traitement des erreurs |
Une validation côté client est effectuée. |
Une validation côté serveur avec un retour d’erreur plus détaillé est fournie. |
Gestion de la contre-pression |
Met le thread en veille, ce qui entraîne un état bloqué ou qui ne répond pas. |
Renvoie une erreur, ce qui permet à l’appelant de mettre en œuvre une stratégie d’interruption ou de relance. |
Mappage client-table |
Un seul objet client pouvait ouvrir des canaux vers n’importe quelle table. |
Un seul objet client est désormais exclusivement lié à un seul objet de canal. |
Facturation |
En fonction des calculs et du nombre de clients. |
Plate, par GB ingéré. |
Schéma/Transformations |
Gérés côté client. |
Gérés côté serveur par l’intermédiaire de la définition PIPE. |
Processus de migration¶
Pour migrer votre application vers le SDK hautes performances, suivez les étapes de haut niveau suivantes :
Pour chaque table cible, créez un PIPE.
CREATE PIPE my_pipe AS COPY INTO my_table FROM TABLE (DATA_SOURCE(TYPE => 'STREAMING')) MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE [CLUSTER_AT_INGEST_TIME = TRUE];
Arrêtez l’ingestion de tous les clients classiques.
Pour chaque canal dans le client classique, confirmez les derniers décalages validés. Pour récupérer ces décalages, utilisez la méthode
getLatestCommittedOffsetTokens()depuis le SDK classique. Vérifiez que ces décalages s’alignent sur vos enregistrements côté client.Mettez à jour le code de votre application.
Faites basculer les dépendances de votre projet vers le SDK hautes performances (Java ou Python).
Mettez à jour vos appels d’API comme détaillé dans la section Modifications de l’API et de la configuration suivante.
Initialisez un seul client par table/PIPE en utilisant le dernier décalage validé de Snowflake.
Une fois que votre nouveau client est configuré et stable, reprenez l’ingestion.
Modifications de l’API et de la configuration¶
Les modifications suivantes doivent être apportées à vos appels d’API et à vos paramètres de configuration pendant la migration :
Initialisation du client¶
Classique :
builder(name)Hautes performances :
builder(name, db, schema, pipeName)
Canaux¶
Classique :
openChannel(OpenChannelRequest)Hautes performances :
openChannel(channelName, offsetToken)renvoie à la fois le canal et l’état.
Méthodes d’ingestion¶
Classique :
insertRow/insertRows(...)Hautes performances :
appendRow/appendRows(...)
Suivi du décalage¶
La méthode
getLatestCommittedOffsetTokens(channels)du SDK classique offre une visibilité limitée et ne dispose pas d’un contexte d’erreur.Le SDK hautes performances prend toujours en charge
getLatestCommittedOffsetTokens(...), mais pour une surveillance efficace, nous vous recommandons d’utilisergetChannelStatuses(...). Cette méthode effectue les tâches suivantes :Confirme que les décalages progressent comme attendu.
Renvoie le nombre d’erreurs et des informations détaillées sur les erreurs par canal.
Permet une surveillance et un dépannage proactifs de vos pipelines de données.
Gestion des données semi-structurées¶
Lors de la migration vers le SDK ultra-performant, vérifiez la manière dont votre application fournit des données pour les colonnes ARRAY et VARIANT afin d’éviter que les données ne soient stockées sous forme de chaînes littérales.
Modification de comportement¶
Le passage d’une chaîne littérale sérialisée — par exemple, « [1, 2, 3] » — à une colonne ARRAY dans v2 donne un tableau à élément unique contenant cette chaîne littérale. Pour conserver le comportement classique de l’architecture, sélectionnez l’une des options suivantes :
Option 1 : Transmettre des objets natifs (recommandé)¶
Mettez à jour votre application cliente pour la désérialisation de chaînes JSON en objets natifs avant d’appeler appendRow.
Java : Utilisez
java.util.Listpour les tableaux etjava.util.Mappour les objets.Python : Utiliser les types
listetdictnatifs.
Avantage : Compatible avec le canal par défaut et l’évolution automatique des schémas.
Option 2 : Transformation côté canal¶
Définissez explicitement l’objet canal avec la logique de transformation à l’aide de la fonction PARSE_JSON.
Exemple SQL
CREATE PIPE my_pipe AS
COPY INTO my_table (my_array_col)
FROM (SELECT PARSE_JSON($1:my_array_col) FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
Note
Cette méthode est incompatible avec le canal par défaut et les fonctionnalités d’évolution automatique des schémas.