Snowpipe Streaming

Der Aufruf der Snowpipe Streaming-API („API“) führt zum Laden von Streaming-Datenzeilen mit niedriger Latenz unter Verwendung der Snowflake Ingest SDK und Ihrem selbstverwalteten Anwendungscode. Die API zur Streaming-Erfassung schreibt im Gegensatz zum Massenladen von Daten oder Snowpipe, bei denen Daten aus Stagingdateien geschrieben werden, Datenzeilen in Snowflake-Tabellen. Diese Architektur führt zu geringeren Latenzen und entsprechend niedrigeren Kosten für das Laden ähnlicher Datenmengen, was sie zu einem leistungsstarken Tool für die Verarbeitung von Echtzeit-Datenstreams macht.

Unter diesem Thema werden die Konzepte für kundenspezifische Clientanwendungen, die die API aufrufen, bereitgestellt. Eine Anleitung für den zugehörigen Snowflake-Konnektor für Kafka („Kafka-Konnektor“) finden Sie unter Verwenden des Snowflake-Konnektors für Kafka mit Snowpipe Streaming.

Unter diesem Thema:

Snowpipe Streaming-API vs. Snowpipe

Die API ist als Ergänzung zu Snowpipe gedacht, nicht als Ersatz. Verwenden Sie Snowpipe Streaming-API in Streaming-Szenarios, in denen Daten über Zeilen (z. B. Apache Kafka-Themen) gestreamt und nicht in Dateien geschrieben werden. Die API eignet sich für einen Erfassungs-Workflow, bei dem eine bestehende kundenspezifische Java-Anwendung eingesetzt wird, die Datensätze produziert oder empfängt. Mit der API ist es nicht mehr erforderlich, Dateien zum Laden von Daten in Snowflake-Tabellen zu erstellen, und sie ermöglicht das automatische, kontinuierliche Laden von Datenstreams in Snowflake, sobald die Daten verfügbar sind.

Snowpipe Streaming

In der folgenden Tabelle werden die Unterschiede zwischen Snowpipe Streaming und Snowpipe beschrieben:

Kategorie

Snowpipe Streaming

Snowpipe

Form der zu ladenden Daten

Zeilen

Dateien Wenn Ihre bestehende Datenpipeline Dateien im Blobspeicher generiert, empfehlen wir die Verwendung von Snowpipe anstelle der API.

Anforderungen an Software von Drittanbietern

Kundenspezifischer Java-Anwendungscode-Wrapper für die Snowflake Ingest SDK.

Keine.

Reihenfolge der Daten

Geordnete Einfügungen innerhalb jedes Kanals.

Nicht unterstützt. Snowpipe kann Daten aus Dateien in einer Reihenfolge laden, die von den Zeitstempeln der Dateierstellung im Cloudspeicher abweicht.

Ladeverlauf

Ladeverlauf wird in Ansicht SNOWPIPE_STREAMING_FILE_MIGRATION_HISTORY (Account Usage) erfasst.

Ladeverlauf wird in Ansicht LOAD_HISTORY (Account Usage) und Funktion COPY_HISTORY (Information Schema) erfasst.

Pipeobjekt

Kein Pipeobjekt erforderlich. Die API schreibt Datensätze direkt in die Zieltabellen.

Erfordert ein Pipeobjekt, das Stagingdatei-Daten in eine Warteschlange stellt und in Zieltabellen lädt.

Software-Anforderungen

Java SDK

Der Snowpipe Streaming-Dienst ist derzeit als eine Menge von APIs des Snowflake Ingest SDK implementiert. Das SDK steht im Maven Central Repository zum Download bereit: https://mvnrepository.com/artifact/net.snowflake/snowflake-ingest-sdk. Das SDK unterstützt die Java-Version 8 (oder höher) und erfordert Java Cryptography Extension (JCE) Unlimited Strength Jurisdiction Policy Files.

Wichtig

Das SDK führt REST-API-Anrufe in Snowflake aus. Möglicherweise müssen Sie die Regeln Ihrer Netzwerk-Firewall anpassen, um Konnektivität zu ermöglichen.

Kundenspezifische Clientanwendung

Die API erfordert eine kundenspezifische Java-Anwendungsschnittstelle, die in der Lage ist, Zeilen von Daten zu transferieren und aufgetretene Fehler zu behandeln. Es liegt in Ihrer Verantwortung, dass die Anwendung ohne Unterbrechung ausgeführt wird und nach einem Ausfall wiederhergestellt werden kann. Für eine gegebene Menge von Zeilen unterstützt die API das Äquivalent von ON_ERROR = CONTINUE | ABORT. ABORT bricht den gesamten Batch ab, sobald der erste Fehler gefunden wird. Das ist die Standardeinstellung. CONTINUE setzt das Laden der Daten fort, wenn Fehler gefunden werden.

Die Anwendung muss Fehler anhand der Antwort der Methoden insertRow (einzelne Zeile) oder insertRows (Menge von Zeilen) erfassen.

Kanäle

Die API erfasst Zeilen über einen oder mehrere Kanäle. Ein Kanal stellt eine benannte logische Streaming-Verbindung zu Snowflake dar, über die Daten in eine Tabelle geladen werden. Ein einzelner Kanal ist genau einer Tabelle in Snowflake zugeordnet. Es können jedoch mehrere Kanäle auf dieselbe Tabelle verweisen. Das Client-SDK kann mehrere Kanäle zu mehreren Tabellen öffnen, aber das SDK kann keine kontenübergreifenden Kanäle öffnen. Die Reihenfolge der Zeilen und die entsprechenden Offset-Token werden innerhalb eines Kanals aufbewahrt, aber nicht zwischen Kanälen, die auf dieselbe Tabelle verweisen.

Snowpipe streaming client channel table mapping

Bemerkung

Inaktive Kanäle werden zusammen mit ihren Offset-Tokens automatisch nach 14 Tagen gelöscht.

Offset-Token

Ein Offset-Token ist eine Zeichenfolge, die ein Client in Methodenanforderung insertRow (einzelne Zeile) oder insertRows (Satz von Zeilen) einfügen kann, um den Erfassungsfortschritt pro Kanal zu verfolgen. Clients können in regelmäßigen Abständen getLatestCommittedOffsetToken-Methodenanforderungen stellen, um das letzte bestätigte Offset-Token für einen bestimmten Kanal abzurufen und daraus Rückschlüsse auf den Erfassungsfortschritt zu ziehen. Beachten Sie, dass dieses Token nicht von Snowflake zum Deduplizieren verwendet wird. Clients können dieses Token jedoch zum Deduplizieren mit kundenspezifischem Code verwenden.

Wenn ein Client einen Kanal wieder öffnet, wird das letzte gespeicherte Offset-Token zurückgegeben. Der Client kann seine Position in der Datenquelle mithilfe des Tokens zurücksetzen, um zu vermeiden, dass dieselben Daten zweimal gesendet werden. Beachten Sie, dass bei einem Wiederöffnungsereignis eines Kanals alle in Snowflake gepufferten Daten verworfen werden, damit sie nicht übertragen werden.

Beispielsweise könnte der Kafka-Konnektor ein Offset-Token aus dem Topic lesen, wie <partition>:<offset> oder einfach <offset>, wenn die Partition im Kanalnamen kodiert ist. Betrachten Sie das folgende Szenario:

  1. Der Kafka-Konnektor geht online und öffnet einen Kanal, der Partition 1 im Kafka-Topic T mit dem Kanalnamen T:P1 entspricht.

  2. Der Konnektor beginnt mit dem Lesen von Datensätzen aus der Kafka-Partition. Der Konnektor ruft die API auf und führt eine insertRows-Methodenanforderung mit dem Offset aus, der dem Datensatz als Offset-Token zugeordnet ist. Das Offset-Token könnte zum Beispiel 10 lauten und damit auf den 10. Datensatz in der Kafka-Partition verweisen.

  3. Der Konnektor führt in regelmäßigen Abständen getLatestCommittedOffsetToken-Methodenanforderungen aus, um den Erfassungsfortschritt zu ermitteln.

Wenn der Kafka-Konnektor abstürzt, kann mit dem folgende Ablauf das Lesen von Datensätzen aus dem korrekten Offset für die Kafka-Partition wieder aufgenommen werden:

  1. Der Kafka-Konnektor ist wieder online und öffnet den Kanal erneut unter demselben Namen wie zuvor.

  2. Der Konnektor ruft die API auf und führt eine getLatestCommittedOffsetToken-Methodenanforderung aus, um den letzten bestätigten Offset für die Partition zu erhalten. Angenommen, das letzte persistent gespeicherte Offset-Token ist 20.

  3. Der Konnektor verwendet die Kafka-Lese-APIs, um einen Cursor zurückzusetzen, der dem Offset plus 1 zugeordnet ist – in diesem Beispiel 21.

  4. Der Konnektor setzt das Lesen der Datensätze fort. Nachdem der Lesecursor erfolgreich neu positioniert wurde, werden keine doppelten Daten abgerufen.

Ein weiteres Beispiel: Angenommen, Sie haben eine Anwendung, die Protokolle aus einem Verzeichnis liest und diese Protokolle mithilfe des Snowpipe Streaming Client SDK in Snowflake exportiert. Sie könnten eine Anwendung für den Protokollexport entwickeln, die Folgendes umsetzt:

  1. Auflisten der Dateien im Log-Verzeichnis. Angenommen, das Protokollierungs-Framework generiert Protokolldateien, die lexikografisch sortiert werden können, wobei neue Protokolldateien an das Ende dieser Sortierung positioniert werden.

  2. Liest eine Protokolldatei zeilenweise und ruft die API auf, die insertRows-Methodenanforderungen mit einem Offset-Token ausführt, das dem Namen der Protokolldatei und der Zeilenzahl oder Byteposition entspricht. Ein Offset-Token könnte zum Beispiel messages_1.log:20 sein, wobei messages_1.log der Name der Protokolldatei und 20 die Zeilennummer ist.

Wenn die Anwendung abstürzt oder neu gestartet werden muss, kann die API aufgerufen werden und eine getLatestCommittedOffsetToken-Methodenanforderung ausgeführt werden, um das Offset-Token abzurufen, das der letzten exportierten Protokolldatei und -zeile entspricht. Im Beispiel wäre dies messages_1.log:20. Die Anwendung würde dann messages_1.log öffnen und nach Zeile 21 suchen, um zu verhindern, dass dieselbe Protokollzeile zweimal erfasst wird.

Migration zu optimierten Dateien

Die API schreibt die Zeilen aus den Kanälen in Blobs des Cloudspeichers, die dann in die Zieltabelle übertragen werden. Zunächst werden die in eine Zieltabelle geschriebenen Streaming-Daten in einem temporären Zwischendateiformat gespeichert. In diesem Stadium wird die Tabelle als „gemischte Tabelle“ betrachtet, da die partitionierten Daten in einer Mischung aus nativen Dateien und Zwischendateien gespeichert sind. Ein automatischer Hintergrundprozess migriert bei Bedarf die Daten aus den aktiven Zwischendateien in native Dateien, die für Abfragen und DML-Operationen optimiert sind.

Nur-Einfüge-Operationen

Die API ist derzeit auf das Einfügen von Zeilen beschränkt. Um Daten ändern, löschen oder kombinieren zu können, müssen Sie die „rohen“ Datensätze in eine oder mehrere Stagingtabellen schreiben. Das Zusammenführen, Verknüpfen oder Transformieren der Daten kann mithilfe von kontinuierlichen Datenpipelines erfolgen, mit denen geänderte Daten in Zielberichtstabellen eingefügt werden können.

Klassen und Schnittstellen

Eine Dokumentation der Klassen und Schnittstellen finden Sie unter Snowflake Ingest SDK-API.

Unterstützte Java-Datentypen

Die folgende Tabelle fasst zusammen, welche Java-Datentypen für das Einfügen in Snowflake-Spalten unterstützt werden:

Snowflake-Spaltentyp

Zulässiger Java-Datentyp

  • CHAR

  • VARCHAR

  • String

  • primitive Datentypen (int, boolean, char …)

  • BigInteger, BigDecimal

  • BINARY

  • byte[]

  • String (hexadezimal-codiert)

  • NUMBER

  • numerische Typen (BigInteger, BigDecimal, byte, int, double …)

  • String

  • FLOAT

  • numerische Typen (BigInteger, BigDecimal, byte, int, double …)

  • String

  • BOOLEAN

  • boolean

  • numerische Typen (BigInteger, BigDecimal, byte, int, double …)

  • String

Siehe Details zur Boolean-Konvertierung.

  • TIME

  • java.time.LocalTime

  • java.time.OffsetTime

  • String

    • Als Integer gespeicherte Zeit

    • HH24:MI:SS.FFTZH:TZM (z. B. 20:57:01.123456789+07:00)

    • HH24:MI:SS.FF (z. B. 20:57:01.123456789)

    • HH24:MI:SS (z. B. 20:57:01)

    • HH24:MI (z. B. 20:57)

  • DATE

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.time.Instant

  • String

    • Als Integer gespeichertes Datum

    • YYYY-MM-DD (z. B. 2013-04-28)

    • YYYY-MM-DDTHH24:MI:SS.FFTZH:TZM (z. B. 2013-04-28T20:57:01.123456789+07:00)

    • YYYY-MM-DDTHH24:MI:SS.FF (z. B. 2013-04-28T20:57:01.123456)

    • YYYY-MM-DDTHH24:MI:SS (z. B. 2013-04-28T20:57:01)

    • YYYY-MM-DDTHH24:MI (z. B. 2013-04-28T20:57)

    • YYYY-MM-DDTHH24:MI:SSTZH:TZM (z. B. 2013-04-28T20:57:01-07:00)

    • YYYY-MM-DDTHH24:MITZH:TZM (z. B. 2013-04-28T20:57-07:00)

  • TIMESTAMP_NTZ

  • TIMESTAMP_LTZ

  • TIMESTAMP_TZ

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.time.Instant

  • String

    • Als Integer gespeicherter Zeitstempel

    • YYYY-MM-DD (z. B. 2013-04-28)

    • YYYY-MM-DDTHH24:MI:SS.FFTZH:TZM (z. B. 2013-04-28T20:57:01.123456789+07:00)

    • YYYY-MM-DDTHH24:MI:SS.FF (z. B. 2013-04-28T20:57:01.123456)

    • YYYY-MM-DDTHH24:MI:SS (z. B. 2013-04-28T20:57:01)

    • YYYY-MM-DDTHH24:MI (z. B. 2013-04-28T20:57)

    • YYYY-MM-DDTHH24:MI:SSTZH:TZM (z. B. 2013-04-28T20:57:01-07:00)

    • YYYY-MM-DDTHH24:MITZH:TZM (z. B. 2013-04-28T20:57-07:00)

  • VARIANT

  • ARRAY

  • String (muss gültiges JSON sein)

  • primitive Datentypen und deren Arrays

  • BigInteger, BigDecimal

  • java.time.LocalTime

  • java.time.OffsetTime

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.util.Map<String, T>, wobei T ein gültiger VARIANT-Typ ist

  • T[], wobei T ein gültiger VARIANT-Typ ist

  • Liste<T>, wobei T ein gültiger VARIANT-Typ ist

  • OBJECT

  • String (muss gültiges JSON-Objekt sein)

  • Map<String, T>, wobei T ein gültiger Variant-Typ ist

  • GEOGRAPHY

  • Nicht unterstützt

  • GEOMETRY

  • Nicht unterstützt

Einschränkungen

  • Tabellen mit einer der folgenden Spalteneinstellungen werden nicht unterstützt:

    • AUTOINCREMENT oder IDENTITY

    • Standard-Spaltenwert, der nicht NULL ist.

  • Die Datentypen GEOGRAPHY und GEOMETRY werden nicht unterstützt.

  • Spalten mit Sortierungen werden nicht unterstützt.

  • Snowpipe Streaming unterstützt nur die Verwendung von 256-Bit-AES-Schlüsseln für die Datenverschlüsselung.

  • TRANSIENT- oder TEMPORARY-Tabellen werden nicht unterstützt.