Snowpipe Streaming

Der Aufruf der Snowpipe Streaming-API („API“) führt zum Laden von Streaming-Datenzeilen mit niedriger Latenz unter Verwendung der Snowflake Ingest SDK und Ihrem selbstverwalteten Anwendungscode. Die API zur Streaming-Erfassung schreibt im Gegensatz zum Massenladen von Daten oder Snowpipe, bei denen Daten aus Stagingdateien geschrieben werden, Datenzeilen in Snowflake-Tabellen. Diese Architektur führt zu geringeren Latenzen und entsprechend niedrigeren Kosten für das Laden ähnlicher Datenmengen, was sie zu einem leistungsstarken Tool für die Verarbeitung von Echtzeit-Datenstreams macht.

Unter diesem Thema werden die Konzepte für kundenspezifische Clientanwendungen, die die API aufrufen, bereitgestellt. Eine Anleitung für den zugehörigen Snowflake-Konnektor für Kafka („Kafka-Konnektor“) finden Sie unter Verwenden des Snowflake-Konnektors für Kafka mit Snowpipe Streaming.

Unter diesem Thema:

Snowpipe Streaming-API vs. Snowpipe

Die API ist als Ergänzung zu Snowpipe gedacht, nicht als Ersatz. Verwenden Sie Snowpipe Streaming-API in Streaming-Szenarios, in denen Daten über Zeilen (z. B. Apache Kafka-Themen) gestreamt und nicht in Dateien geschrieben werden. Die API eignet sich für einen Erfassungs-Workflow, bei dem eine bestehende kundenspezifische Java-Anwendung eingesetzt wird, die Datensätze produziert oder empfängt. Mit der API ist es nicht mehr erforderlich, Dateien zum Laden von Daten in Snowflake-Tabellen zu erstellen, und sie ermöglicht das automatische, kontinuierliche Laden von Datenstreams in Snowflake, sobald die Daten verfügbar sind.

Snowpipe Streaming

In der folgenden Tabelle werden die Unterschiede zwischen Snowpipe Streaming und Snowpipe beschrieben:

Kategorie

Snowpipe Streaming

Snowpipe

Form der zu ladenden Daten

Zeilen

Dateien. Wenn Ihre bestehende Datenpipeline Dateien im Blobspeicher generiert, empfehlen wir die Verwendung von Snowpipe anstelle der API.

Anforderungen an Software von Drittanbietern

Kundenspezifischer Java-Anwendungscode-Wrapper für das Snowflake Ingest SDK

Keine

Reihenfolge der Daten

Geordnete Einfügungen innerhalb jedes Kanals

Nicht unterstützt. Snowpipe kann Daten aus Dateien in einer Reihenfolge laden, die von den Zeitstempeln der Dateierstellung im Cloudspeicher abweicht.

Ladeverlauf

Ladeverlauf wird in Ansicht SNOWPIPE_STREAMING_FILE_MIGRATION_HISTORY (Account Usage) erfasst

Ladeverlauf wird in Ansicht LOAD_HISTORY (Account Usage) und Funktion COPY_HISTORY (Information Schema) erfasst

Pipeobjekt

Kein Pipe-Objekt erforderlich: Die API schreibt Datensätze direkt in die Zieltabellen.

Erfordert ein Pipeobjekt, das Stagingdatei-Daten in eine Warteschlange stellt und in Zieltabellen lädt.

Software-Anforderungen

Java SDK

Der Snowpipe Streaming-Dienst ist derzeit als eine Menge von APIs des Snowflake Ingest SDK implementiert. Das SDK steht im Maven Central Repository zum Download bereit. Snowflake empfiehlt die Verwendung des Snowflake Ingest SDK, Version 2.0.2 oder höher.

Das SDK unterstützt Java-Version 8 (oder höher) und erfordert Java Cryptography Extension (JCE) Unlimited Strength Jurisdiction Policy Files.

Wichtig

Das SDK führt REST-API-Anrufe in Snowflake aus. Möglicherweise müssen Sie die Regeln Ihrer Netzwerk-Firewall anpassen, um Konnektivität zu ermöglichen.

Kundenspezifische Clientanwendung

Die API erfordert eine kundenspezifische Java-Anwendungsschnittstelle, die in der Lage ist, Zeilen von Daten zu transferieren und aufgetretene Fehler zu behandeln. Es liegt in Ihrer Verantwortung, dass die Anwendung ohne Unterbrechung ausgeführt wird und nach einem Ausfall wiederhergestellt werden kann. Für einen bestimmten Batch von Zeilen unterstützt API das Äquivalent von ON_ERROR = CONTINUE | SKIP_BATCH | ABORT.

  • CONTINUE: Fahren Sie fort, die zulässigen Datenzeilen zu laden, und geben Sie alle Fehler zurück.

  • SKIP_BATCH: Überspringen Sie das Laden, und geben Sie alle Fehler zurück, wenn ein Fehler im gesamten Batch von Zeilen auftritt.

  • ABORT (Standardeinstellung): Der gesamte Batch von Zeilen wird abgebrochen, und beim ersten Fehler wird eine Ausnahme ausgelöst.

Die Anwendung muss Fehler anhand der Antwort der Methoden insertRow (einzelne Zeile) oder insertRows (Menge von Zeilen) erfassen.

Kanäle

Die API erfasst Zeilen über einen oder mehrere Kanäle. Ein Kanal stellt eine benannte logische Streaming-Verbindung zu Snowflake dar, über die Daten in eine Tabelle geladen werden. Ein einzelner Kanal ist genau einer Tabelle in Snowflake zugeordnet. Es können jedoch mehrere Kanäle auf dieselbe Tabelle verweisen. Das Client-SDK kann mehrere Kanäle zu mehreren Tabellen öffnen, aber das SDK kann keine kontenübergreifenden Kanäle öffnen. Die Reihenfolge der Zeilen und die entsprechenden Offset-Token werden innerhalb eines Kanals aufbewahrt, aber nicht zwischen Kanälen, die auf dieselbe Tabelle verweisen.

Kanäle sind für eine lange Verwendungsdauer gedacht, wenn ein Client aktiv Daten einfügt, und sollten wiederverwendet werden, da die Informationen zu Offset-Token erhalten bleiben. Die Daten innerhalb des Kanals werden standardmäßig jede Sekunde automatisch gelöscht, sodass der Kanal nicht geschlossen werden muss. Weitere Informationen dazu finden Sie unter Latenz.

Snowpipe streaming client channel table mapping

Sie können den Befehl SHOW CHANNELS ausführen, um die Kanäle aufzulisten, für die Sie Zugriffsrechte haben. Weitere Informationen dazu finden Sie unter SHOW CHANNELS.

Bemerkung

Inaktive Kanäle werden zusammen mit ihren Offset-Token automatisch nach 30 Tagen gelöscht.

Offset-Token

Ein Offset-Token ist eine Zeichenfolge, die ein Client in Methodenanforderung insertRow (einzelne Zeile) oder insertRows (Satz von Zeilen) einfügen kann, um den Erfassungsfortschritt pro Kanal zu verfolgen. Das Token wird bei der Erstellung des Kanals mit NULL initialisiert, und es wird aktualisiert, wenn die Zeilen mit einem bereitgestellten Offset-Token durch einen asynchronen Prozess an Snowflake übergeben werden. Clients können in regelmäßigen Abständen getLatestCommittedOffsetToken-Methodenanforderungen stellen, um das letzte bestätigte Offset-Token für einen bestimmten Kanal abzurufen und daraus Rückschlüsse auf den Erfassungsfortschritt zu ziehen. Beachten Sie, dass dieses Token nicht von Snowflake zum Deduplizieren verwendet wird. Clients können dieses Token jedoch zum Deduplizieren mit kundenspezifischem Code verwenden.

Wenn ein Client einen Kanal wieder öffnet, wird das letzte gespeicherte Offset-Token zurückgegeben. Der Client kann seine Position in der Datenquelle mithilfe des Tokens zurücksetzen, um zu vermeiden, dass dieselben Daten zweimal gesendet werden. Beachten Sie, dass bei einem Wiederöffnungsereignis eines Kanals alle in Snowflake gepufferten Daten verworfen werden, damit sie nicht übertragen werden.

Sie können das letzte bestätigte Offset-Token verwenden, um die folgenden allgemeinen Anwendungsfälle durchzuführen:

  • Verfolgen des Erfassungsfortschritts

  • Überprüfen, ob ein bestimmter Offset übertragen wurde, indem dieser mit dem letzten übertragenen Offset-Token verglichen wird

  • Aktualisieren des Quellen-Offsets und Bereinigen der bereits bestätigten Daten

  • Aktivieren der Deduplizierung und Sicherstellen der genau einmaligen Bereitstellung von Daten

Beispielsweise könnte der Kafka-Konnektor ein Offset-Token aus einem Topic lesen, wie <partition>:<offset> oder einfach <offset>, wenn die Partition im Kanalnamen kodiert ist. Betrachten Sie das folgende Szenario:

  1. Der Kafka-Konnektor geht online und öffnet einen Kanal, der Partition 1 im Kafka-Topic T mit dem Kanalnamen T:P1 entspricht.

  2. Der Konnektor beginnt mit dem Lesen von Datensätzen aus der Kafka-Partition.

  3. Der Konnektor ruft die API auf und führt eine insertRows-Methodenanforderung mit dem Offset aus, der dem Datensatz als Offset-Token zugeordnet ist.

    Das Offset-Token könnte zum Beispiel 10 lauten und damit auf den zehnten Datensatz in der Kafka-Partition verweisen.

  4. Der Konnektor führt in regelmäßigen Abständen getLatestCommittedOffsetToken-Methodenanforderungen aus, um den Erfassungsfortschritt zu ermitteln.

Wenn der Kafka-Konnektor abstürzt, kann mit dem folgende Ablauf das Lesen von Datensätzen aus dem korrekten Offset für die Kafka-Partition wieder aufgenommen werden:

  1. Der Kafka-Konnektor ist wieder online und öffnet den Kanal erneut unter demselben Namen wie zuvor.

  2. Der Konnektor ruft die API auf und führt eine getLatestCommittedOffsetToken-Methodenanforderung aus, um den letzten bestätigten Offset für die Partition zu erhalten.

    Angenommen, das letzte persistent gespeicherte Offset-Token ist 20.

  3. Der Konnektor verwendet die Kafka-Lese-APIs, um einen Cursor zurückzusetzen, der dem Offset plus 1 zugeordnet ist (in diesem Beispiel 21).

  4. Der Konnektor setzt das Lesen der Datensätze fort. Nachdem der Lesecursor erfolgreich neu positioniert wurde, werden keine doppelten Daten abgerufen.

In einem weiteren Beispiel liest eine Anwendung Protokolle aus einem Verzeichnis und verwendet das Snowpipe Streaming Client SDK, um diese Protokolle nach Snowflake zu exportieren. Sie könnten eine Anwendung für den Protokollexport entwickeln, die Folgendes umsetzt:

  1. Auflisten der Dateien im Log-Verzeichnis.

    Angenommen, das Protokollierungs-Framework generiert Protokolldateien, die lexikografisch sortiert werden können, wobei neue Protokolldateien an das Ende dieser Sortierung positioniert werden.

  2. Liest eine Protokolldatei zeilenweise und ruft die API auf, die insertRows-Methodenanforderungen mit einem Offset-Token ausführt, das dem Namen der Protokolldatei und der Zeilenzahl oder Byteposition entspricht.

    Ein Offset-Token könnte zum Beispiel messages_1.log:20 sein, wobei messages_1.log der Name der Protokolldatei und 20 die Zeilennummer ist.

Wenn die Anwendung abstürzt oder neu gestartet werden muss, kann die API aufgerufen und eine getLatestCommittedOffsetToken-Methodenanforderung ausgeführt werden, um das Offset-Token abzurufen, das der letzten exportierten Protokolldatei und -zeile entspricht. Im Beispiel wäre dies messages_1.log:20. Die Anwendung würde dann messages_1.log öffnen und nach Zeile 21 suchen, um zu verhindern, dass dieselbe Protokollzeile zweimal erfasst wird.

Bemerkung

Die Informationen zum Offset-Token können verloren gehen. Das Offset-Token ist mit einem Kanalobjekt verknüpft, und ein Kanal wird automatisch gelöscht, wenn für einen Zeitraum von 30 Tagen keine neue Erfassung unter Verwendung des Kanals ausgeführt wird. Um den Verlust des Offset-Tokens zu verhindern, sollten Sie einen separaten Offset pflegen und das Offset-Token des Kanals bei Bedarf zurückzusetzen.

Best Practices für genau einmalige Bereitstellung

Die genau einmalige Bereitstellung kann eine Herausforderung sein. Entscheidend ist die Einhaltung der folgenden Grundsätze in Ihrem kundenspezifischen Code:

  • Um eine angemessene Wiederherstellung nach Ausnahmen, Fehlern oder Abstürzen sicherzustellen, müssen Sie den Kanal wieder neu öffnen und die Erfassung mit dem letzten bestätigten Offset-Token neu starten.

  • Auch wenn Ihre Anwendung ihren eigenen Offset pflegt, ist es wichtig, den letzten von Snowflake bereitgestellten Offset-Token als Quelle der Wahrheit zu verwenden und den eigenen Offset entsprechend zurückzusetzen.

  • Der einzige Fall, in dem Ihr eigener Offset als die Quelle der Wahrheit behandelt werden sollte, ist, wenn das Offset-Token von Snowflake auf NULL gesetzt oder zurückgesetzt wird. Ein NULL-Offset-Token hat in der Regel einen der folgenden Gründe:

    • Es handelt sich um einen neuen Kanal, weshalb noch kein Offset-Token festgelegt wurde.

    • Die Zieltabelle wurde gelöscht und neu erstellt, sodass der Kanal als neu angesehen wird.

    • In den letzten 30 Tagen gab es keine Datenerfassungsaktivitäten über den Kanal, sodass der Kanal automatisch bereinigt wurde und die Offset-Token-Informationen verloren gingen.

  • Falls erforderlich, können Sie die Quelldaten, die bereits auf der Grundlage des letzten übertragenen Offset-Tokens übertragen wurden, regelmäßig bereinigen und Ihren eigenen Offset aktualisieren.

Weitere Informationen darüber, wie der Kafka-Konnektor mit Snowpipe Streaming eine genau einmalige Bereitstellung erreicht, finden Sie unter „Exactly-once“-Semantik.

Latenz

Snowpipe Streaming löscht die Daten innerhalb von Kanälen automatisch jede Sekunde. Sie müssen den Kanal also nicht schließen, damit die Daten gelöscht werden.

Mit Snowflake Ingest SDK ab Version 2.0.4 können Sie die Latenz mit der Option max_client_lag konfigurieren. Die Standardoption ist 1 Sekunde. Die maximale Latenz kann auf bis zu 10 Minuten eingestellt werden. Weitere Informationen dazu finden Sie unter MAX_CLIENT_LAG.

Beachten Sie, dass der Kafka-Konnektor für Snowpipe Streaming über einen eigenen Puffer verfügt. Nach Erreichen der Kafka-Pufferleerungszeit werden die Daten mit einer Sekunde Latenz über Snowpipe Streaming an Snowflake gesendet. Weitere Informationen dazu finden Sie unter buffer.flush.time.

Migration zu optimierten Dateien

Die API schreibt die Zeilen aus den Kanälen in Blobs des Cloudspeichers, die dann in die Zieltabelle übertragen werden. Zunächst werden die in eine Zieltabelle geschriebenen Streaming-Daten in einem temporären Zwischendateiformat gespeichert. In dieser Phase wird die Tabelle als „gemischte Tabelle“ betrachtet, da die partitionierten Daten in einer Mischung aus nativen Dateien und Zwischendateien gespeichert sind. Ein automatischer Hintergrundprozess migriert bei Bedarf die Daten aus den aktiven Zwischendateien in native Dateien, die für Abfragen und DML-Operationen optimiert sind.

Replikation

Snowpipe Streaming unterstützt Replikation und Failover von Snowflake-Tabellen, die von Snowpipe Streaming und den zugehörigen Kanal-Offsets befüllt werden, von einem Quellkonto zu einem Zielkonto über verschiedene Regionen und verschiedene Cloudplattformen hinweg.

Weitere Informationen dazu finden Sie unter Replikation und Snowpipe Streaming.

Nur-Einfüge-Operationen

Die API ist derzeit auf das Einfügen von Zeilen beschränkt. Um Daten ändern, löschen oder kombinieren zu können, müssen Sie die „rohen“ Datensätze in eine oder mehrere Stagingtabellen schreiben. Das Zusammenführen, Verknüpfen oder Transformieren der Daten kann mithilfe von kontinuierlichen Datenpipelines erfolgen, mit denen geänderte Daten in Zielberichtstabellen eingefügt werden können.

Klassen und Schnittstellen

Eine Dokumentation der Klassen und Schnittstellen finden Sie unter Snowflake Ingest SDK-API.

Unterstützte Java-Datentypen

Die folgende Tabelle fasst zusammen, welche Java-Datentypen für das Einfügen in Snowflake-Spalten unterstützt werden:

Snowflake-Spaltentyp

Zulässiger Java-Datentyp

  • CHAR

  • VARCHAR

  • String

  • primitive Datentypen (int, boolean, char …)

  • BigInteger, BigDecimal

  • BINARY

  • byte[]

  • String (hexadezimal-codiert)

  • NUMBER

  • numerische Typen (BigInteger, BigDecimal, byte, int, double …)

  • String

  • FLOAT

  • numerische Typen (BigInteger, BigDecimal, byte, int, double …)

  • String

  • BOOLEAN

  • boolean

  • numerische Typen (BigInteger, BigDecimal, byte, int, double …)

  • String

Siehe Details zur Boolean-Konvertierung.

  • TIME

  • java.time.LocalTime

  • java.time.OffsetTime

  • String

    • Als Integer gespeicherte Zeit

    • HH24:MI:SS.FFTZH:TZM (z. B. 20:57:01.123456789+07:00)

    • HH24:MI:SS.FF (z. B. 20:57:01.123456789)

    • HH24:MI:SS (z. B. 20:57:01)

    • HH24:MI (z. B. 20:57)

  • DATE

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.time.Instant

  • String

    • Als Integer gespeichertes Datum

    • YYYY-MM-DD (z. B. 2013-04-28)

    • YYYY-MM-DDTHH24:MI:SS.FFTZH:TZM (z. B. 2013-04-28T20:57:01.123456789+07:00)

    • YYYY-MM-DDTHH24:MI:SS.FF (z. B. 2013-04-28T20:57:01.123456)

    • YYYY-MM-DDTHH24:MI:SS (z. B. 2013-04-28T20:57:01)

    • YYYY-MM-DDTHH24:MI (z. B. 2013-04-28T20:57)

    • YYYY-MM-DDTHH24:MI:SSTZH:TZM (z. B. 2013-04-28T20:57:01-07:00)

    • YYYY-MM-DDTHH24:MITZH:TZM (z. B. 2013-04-28T20:57-07:00)

  • TIMESTAMP_NTZ

  • TIMESTAMP_LTZ

  • TIMESTAMP_TZ

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.time.Instant

  • String

    • Als Integer gespeicherter Zeitstempel

    • YYYY-MM-DD (z. B. 2013-04-28)

    • YYYY-MM-DDTHH24:MI:SS.FFTZH:TZM (z. B. 2013-04-28T20:57:01.123456789+07:00)

    • YYYY-MM-DDTHH24:MI:SS.FF (z. B. 2013-04-28T20:57:01.123456)

    • YYYY-MM-DDTHH24:MI:SS (z. B. 2013-04-28T20:57:01)

    • YYYY-MM-DDTHH24:MI (z. B. 2013-04-28T20:57)

    • YYYY-MM-DDTHH24:MI:SSTZH:TZM (z. B. 2013-04-28T20:57:01-07:00)

    • YYYY-MM-DDTHH24:MITZH:TZM (z. B. 2013-04-28T20:57-07:00)

  • VARIANT

  • ARRAY

  • String (muss gültiges JSON sein)

  • primitive Datentypen und deren Arrays

  • BigInteger, BigDecimal

  • java.time.LocalTime

  • java.time.OffsetTime

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.util.Map<String, T>, wobei T ein gültiger VARIANT-Typ ist

  • T[], wobei T ein gültiger VARIANT-Typ ist

  • Liste<T>, wobei T ein gültiger VARIANT-Typ ist

  • OBJECT

  • String (muss gültiges JSON-Objekt sein)

  • Map<String, T>, wobei T ein gültiger Variant-Typ ist

  • GEOGRAPHY

  • Nicht unterstützt

  • GEOMETRY

  • Nicht unterstützt

Erforderliche Zugriffsrechte

Zum Aufrufen der Snowpipe Streaming-API ist eine Rolle mit den folgenden Berechtigungen erforderlich:

Objekt

Berechtigung

Tabelle

OWNERSHIP oder mindestens INSERT und EVOLVE SCHEMA (nur erforderlich bei Verwendung der Schemaentwicklung für den Kafka-Konnektor mit Snowpipe Streaming)

Datenbank

USAGE

Schema

USAGE

Einschränkungen

Snowpipe Streaming unterstützt nur die Verwendung von 256-Bit-AES-Schlüsseln für die Datenverschlüsselung.

Die folgenden Objekte oder Typen werden nicht unterstützt:

  • Datentypen GEOGRAPHY und GEOMETRY

  • Spalten mit Sortierung

  • TRANSIENT- oder TEMPORARY-Tabellen

  • Tabellen mit einer der folgenden Spalteneinstellungen:

    • AUTOINCREMENT oder IDENTITY

    • Standard-Spaltenwert, der nicht NULL ist.