Leitfaden zur Snowpipe Streaming Migration

In diesem Leitfaden wird beschrieben, wie Sie vom klassischen Snowpipe-Java-SDK zum leistungsstarken Snowpipe Streaming-SDK migrieren können. Die hier beschriebenen architektonischen Änderungen und API-Aktualisierungen gelten auch für Migrationen zum Python-SDK, da die leistungsstarke Architektur in beiden Sprachen verfügbar ist. Obwohl die Codebeispiele in diesem Dokument in Java vorliegen, bleiben die zentralen Migrationsprinzipien für alle Sprachen einheitlich.

Wichtige architektonische Änderungen

In der folgenden Tabelle sind die wichtigsten architektonischen Änderungen des leistungsstarken Snowpipe Streaming-SDK zusammengefasst. Einen detaillierten Vergleich der SDKs finden Sie unter Vergleich zwischen klassischem SDK und leistungsstarkem SDK.

Bereich

Klassisch (snowflake-ingest-java)

Leistungsstark (snowpipe-streaming SDK)

Einstiegspunkt

Die Daten werden direkt in die Tabellen aufgenommen.

Die Daten werden über PIPE-Objekte aufgenommen, die Transformationen und Schemadurchsetzungen unterstützen.

SDK/Kern

Nur Java-SDK.

SDK in mehrere Sprachen (Java und Python) mit einem gemeinsamen Rust-Kern.

API-Namen

insertRow/insertRows, openChannel(request)

appendRow/appendRows, openChannel(channelName, offsetToken)

Fehlerbehandlung

Es erfolgt eine clientseitige Validierung.

Es wird eine serverseitige Validierung mit umfangreicherem Fehlerfeedback bereitgestellt.

Handhabung von Gegendruck

Versetzt den Thread in den Ruhezustand, was zu einem blockierten bzw. nicht mehr reaktionsfähigen Zustand führt.

Gibt einen Fehler zurück und ermöglicht es dem Aufrufer, eine Backoff-/Wiederholungsstrategie zu implementieren.

Zuordnung von Client zu Tabelle

Ein einzelnes Clientobjekt könnte Kanäle zu einer beliebigen Tabelle öffnen.

Ein einzelnes Clientobjekt ist jetzt ausschließlich an ein Pipe-Objekt gebunden.

Rechnungsstellung

Basierend auf der Anzahl der Computeressourcen und Clients.

Pauschal, pro aufgenommenem GB.

Schema/Transformationen

Wird auf Clientseite verwaltet.

Wird auf Serverseite über die PIPE-Definition verwaltet.

Migrationsprozess

Um Ihre Anwendung zum leistungsstarken SDK zu migrieren, führen Sie die folgenden allgemeinen Schritte aus:

  1. Erstellen Sie eine PIPE für jede Zieltabelle.

    CREATE PIPE my_pipe
    AS COPY INTO my_table
      FROM TABLE (DATA_SOURCE(TYPE => 'STREAMING'))
      MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
      [CLUSTER_AT_INGEST_TIME = TRUE];
    
    Copy
  2. Beenden Sie die Datenaufnahme von allen klassischen Clients.

  3. Bestätigen Sie für jeden Kanal im klassischen Client die letzten übertragenen Offsets. Verwenden Sie zum Abrufen dieser Offsets die Methode``getLatestCommittedOffsetTokens()`` aus dem klassischen SDK. Überprüfen Sie, ob diese Offsets mit Ihren clientseitigen Datensätzen übereinstimmen.

  4. Aktualisieren Sie Ihren Anwendungscode.

    • Stellen Sie Ihre Projektabhängigkeiten auf das leistungsstarke SDK (Java oder Python) um.

    • Aktualisieren Sie Ihre API-Aufrufe, wie im folgenden Abschnitt API- und Konfigurationsänderungen beschrieben.

    • Initialisieren Sie einen Client pro Tabelle/PIPE durch Verwendung des letzten übertragenen Offsets von Snowflake.

  5. Nachdem Ihr neuer Client konfiguriert und stabil ist, setzen Sie die Datenaufnahme fort.

API- und Konfigurationsänderungen

Folgende Änderungen müssen an Ihren API-Aufrufen und Konfigurationseinstellungen während der Migration vorgenommen werden:

Clientinitialisierung

  • Klassisch: builder(name)

  • Leistungsstark: builder(name, db, schema, pipeName)

Kanäle

  • Klassisch: openChannel(OpenChannelRequest)

  • Leistungsstark: openChannel(channelName, offsetToken) gibt sowohl den Kanal als auch den Status zurück

Datenaufnahmemethoden

  • Klassisch: insertRow/insertRows(...)

  • Leistungsstark: appendRow/appendRows(...)

Offsetverfolgung

  • Die Methode getLatestCommittedOffsetTokens(channels) des klassischen SDK bietet begrenzte Sichtbarkeit und keinen Fehlerkontext.

  • Das leistungsstarke SDK unterstützt weiterhin getLatestCommittedOffsetTokens(...), allerdings empfehlen wir für eine zuverlässige Überwachung die Verwendung von getChannelStatuses(...). Diese Methode führt die folgenden Aufgaben aus:

    • Bestätigt, dass die Offsets wie erwartet voranschreiten.

    • Gibt die Anzahl der Fehler sowie detaillierte Fehlerinformationen pro Kanal zurück.

    • Ermöglicht die proaktive Überwachung und Fehlerbehebung Ihrer Datenpipelines.