Guide de migration de Snowpipe Streaming

Ce guide décrit comment migrer depuis le SDK Java Snowpipe classique vers le SDK Snowpipe Streaming hautes performances. Les modifications architecturales et les mises à jour d’API décrites ici s’appliquent également aux migrations vers le SDK Python, car l’architecture hautes performances est disponible dans les deux langages. Bien que les exemples de code de ce document soient en Java, les principes de migration fondamentaux restent cohérents à travers les langages.

Modifications architecturales clés

Le tableau suivant résume les modifications architecturales les plus importantes dans le SDK Snowpipe Streaming hautes performances. Pour obtenir une comparaison détaillée des SDKs, consultez Comparaison entre le SDK classique et le SDK hautes performances.

Zone

Classique (snowflake-ingest-java)

Hautes performances (SDK snowpipe-streaming)

Point d’entrée

Les données sont ingérées directement dans les tables.

Les données sont ingérées via des objets PIPE, qui prennent en charge les transformations et l’application des schémas.

SDK/Noyau

SDK Java uniquement.

SDK en plusieurs langages (Java et Python) avec un noyau Rust partagé.

Noms d’API

insertRow/insertRows, openChannel(request)

appendRow/appendRows, openChannel(channelName, offsetToken)

Traitement des erreurs

Une validation côté client est effectuée.

Une validation côté serveur avec un retour d’erreur plus détaillé est fournie.

Gestion de la contre-pression

Met le thread en veille, ce qui entraîne un état bloqué ou qui ne répond pas.

Renvoie une erreur, ce qui permet à l’appelant de mettre en œuvre une stratégie d’interruption ou de relance.

Mappage client-table

Un seul objet client pouvait ouvrir des canaux vers n’importe quelle table.

Un seul objet client est désormais exclusivement lié à un seul objet de canal.

Facturation

En fonction des calculs et du nombre de clients.

Plate, par GB ingéré.

Schéma/Transformations

Gérés côté client.

Gérés côté serveur par l’intermédiaire de la définition PIPE.

Processus de migration

Pour migrer votre application vers le SDK hautes performances, suivez les étapes de haut niveau suivantes :

  1. Pour chaque table cible, créez un PIPE.

    CREATE PIPE my_pipe
    AS COPY INTO my_table
      FROM TABLE (DATA_SOURCE(TYPE => 'STREAMING'))
      MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
      [CLUSTER_AT_INGEST_TIME = TRUE];
    
    Copy
  2. Arrêtez l’ingestion de tous les clients classiques.

  3. Pour chaque canal dans le client classique, confirmez les derniers décalages validés. Pour récupérer ces décalages, utilisez la méthode getLatestCommittedOffsetTokens() depuis le SDK classique. Vérifiez que ces décalages s’alignent sur vos enregistrements côté client.

  4. Mettez à jour le code de votre application.

    • Faites basculer les dépendances de votre projet vers le SDK hautes performances (Java ou Python).

    • Mettez à jour vos appels d’API comme détaillé dans la section Modifications de l’API et de la configuration suivante.

    • Initialisez un seul client par table/PIPE en utilisant le dernier décalage validé de Snowflake.

  5. Une fois que votre nouveau client est configuré et stable, reprenez l’ingestion.

Modifications de l’API et de la configuration

Les modifications suivantes doivent être apportées à vos appels d’API et à vos paramètres de configuration pendant la migration :

Initialisation du client

  • Classique : builder(name)

  • Hautes performances : builder(name, db, schema, pipeName)

Canaux

  • Classique : openChannel(OpenChannelRequest)

  • Hautes performances : openChannel(channelName, offsetToken) renvoie à la fois le canal et l’état.

Méthodes d’ingestion

  • Classique : insertRow/insertRows(...)

  • Hautes performances : appendRow/appendRows(...)

Suivi du décalage

  • La méthode getLatestCommittedOffsetTokens(channels) du SDK classique offre une visibilité limitée et ne dispose pas d’un contexte d’erreur.

  • Le SDK hautes performances prend toujours en charge getLatestCommittedOffsetTokens(...), mais pour une surveillance efficace, nous vous recommandons d’utiliser getChannelStatuses(...). Cette méthode effectue les tâches suivantes :

    • Confirme que les décalages progressent comme attendu.

    • Renvoie le nombre d’erreurs et des informations détaillées sur les erreurs par canal.

    • Permet une surveillance et un dépannage proactifs de vos pipelines de données.