Snowpipe Streaming

Snowpipe Streaming ist der Dienst von Snowflake zum kontinuierlichen Laden von Streaming-Daten mit niedriger Latenz direkt in Snowflake. Er ermöglicht die Erfassung und Analyse von Daten nahezu in Echtzeit, was für zeitnahe Einblicke und unmittelbare operative Reaktionen entscheidend ist. Große Datenmengen aus verschiedenen Streaming-Quellen werden innerhalb von Sekunden für Abfragen und Analysen zur Verfügung gestellt.

Wert von Snowpipe Streaming

  • Datenverfügbarkeit in Echtzeit: Im Gegensatz zu herkömmlichen Batch-Lademethoden werden die Daten bei ihrem Eintreffen aufgenommen. Dies unterstützt Anwendungsfälle wie Live-Dashboards, Echtzeitanalysen und Betrugserkennung.

  • Effiziente Streaming-Workloads: Verwendet Snowpipe Streaming-SDKs, um Zeilen direkt in Tabellen zu schreiben. Dadurch entfällt die Notwendigkeit, Daten in einem Cloud-Zwischenspeicher abzulegen. Dieser direkte Ansatz reduziert die Latenz und vereinfacht die Datenaufnahmearchitektur.

  • Vereinfachte Datenpipelines: Bietet einen optimierten Ansatz für kontinuierliche Datenpipelines aus Quellen wie Anwendungsereignissen, IoTSensoren, Change Data Capture (CDC) Streams und Message Queues (z. B. Apache Kafka).

  • Serverlos und skalierbar: Als serverlos Angebot skaliert es automatisch die Computeressourcen auf der Grundlage der Datenaufnahmelast.

  • Kosteneffizient für Streaming: Die Abrechnung ist für die Streaming-Datenaufnahme optimiert und bietet potenziell kostengünstigere Lösungen für umfangreiche Datenfeeds mit geringer Latenz.

Mit Snowpipe Streaming können Sie Echtzeitdatenanwendungen in der Snowflake Data Cloud erstellen, sodass Sie Entscheidungen auf der Grundlage der aktuellsten verfügbaren Daten treffen können.

Snowpipe Streaming-Implementierungen

Snowpipe Streaming bietet zwei verschiedene Implementierungen, um unterschiedlichen Anforderungen an die Datenaufnahme und Leistungserwartungen gerecht zu werden: Snowpipe Streaming mit leistungsstarker Architektur und Snowpipe Streaming mit klassischer Architektur:

  • Snowpipe Streaming mit leistungsstarker Architektur

    Snowflake hat diese Implementierung der nächsten Generation entwickelt, um den Durchsatz erheblich zu steigern, die Streaming-Leistung zu optimieren und ein vorhersehbares Kostenmodell zu bieten, das den Stagingbereich für fortschrittliche Daten-Streaming-Funktionen schafft.

    Wichtige Merkmale:

    • SDK: Verwendet das neue Snowpipe Streaming-SDK.

    • Preisgestaltung: Transparente, durchsatzbasierte Preisgestaltung (Credits pro unkomprimiertem GB)

    • Datenflussmanagement: Nutzt das PIPE-Objekt zur Verwaltung des Datenflusses und zur Ermöglichung einfacher Transformationen zum Aufnahmezeitpunkt. Für dieses PIPE-Objekt werden Kanäle geöffnet.

    • Datenaufnahme: Bietet eine REST-API für die direkte, einfache Datenaufnahme über die PIPE.

    • Schemavalidierung: Wird auf der Serverseite während der Datenaufnahme anhand des in der PIPE definierten Schemas durchgeführt.

    • Leistung: Entwickelt für einen deutlich höheren Durchsatz und eine verbesserte Abfrageeffizienz bei aufgenommenen Daten.

    Wir empfehlen Ihnen, diese fortschrittliche Architektur zu erkunden – insbesondere für neue Streaming-Projekte.

  • Snowpipe Streaming mit klassischer Architektur

    Dies ist die ursprüngliche, allgemein verfügbare Implementierung, die eine zuverlässige Lösung für etablierte Datenpipelines darstellt.

    Wichtige Merkmale:

    • SDK: Verwendet das snowflake-ingest-sdk.

    • Verwalten des Datenflusses: Verwendet nicht das PIPE-Objektkonzept für die Streaming-Datenaufnahme. Kanäle werden direkt für Zieltabellen konfiguriert und geöffnet.

    • Preisgestaltung: Basierend auf einer Kombination von serverlosen Computeressourcen, die für die Datenaufnahme genutzt werden, und der Anzahl aktiver Clientverbindungen.

Auswählen Ihrer Implementierung

Berücksichtigen Sie bei der Auswahl einer Implementierung Ihren unmittelbaren Bedarf und Ihre langfristige Datenstrategie:

  • Neue Streaming-Projekte: Wir empfehlen, Snowpipe Streaming mit leistungsstarker Architektur in Betracht zu ziehen – aufgrund ihres zukunftsorientierten Designs sowie ihrer besseren Leistung, Skalierbarkeit und Vorhersagbarkeit der Kosten.

  • Leistungsanforderungen: Die High-Performance-Architektur ist auf maximalen Durchsatz und Optimierung der Echtzeitleistung ausgelegt.

  • Bevorzugte Preisgestaltung: Die High-Performance-Architektur bietet eine klare, durchsatzbasierte Preisgestaltung, während die klassische Architektur auf der Grundlage des serverlosen Computing und der Clientverbindungen abgerechnet wird.

  • Vorhandene Setups: Bestehende Anwendungen mit klassischer Architektur können weiterhin ausgeführt werden. Bei künftigen Erweiterungen oder Neugestaltungen sollten Sie erwägen, zur leistungsstarken Architektur zu migrieren bzw. diese zu integrieren.

  • Feature-Set und Verwaltung: Das PIPE-Objekt in der High-Performance- Architektur bietet erweiterte Management- und Transformationsfunktionen, die in der klassischen Architektur nicht vorhanden sind.

Snowpipe Streaming versus Snowpipe

Snowpipe Streaming ist als Ergänzung zu Snowpipe gedacht, nicht als Ersatz. Verwenden Sie die Snowpipe Streaming-API in Streaming-Szenarien, in denen Daten mit Zeilen gestreamt werden (z. B. Apache Kafka-Themen), anstatt in Dateien geschrieben zu werden. Die API eignet sich für einen Erfassungs-Workflow, bei dem eine bestehende kundenspezifische Java-Anwendung eingesetzt wird, die Datensätze produziert oder empfängt. Mit der API müssen Sie keine Dateien erstellen, um Daten in Snowflake-Tabellen zu laden, und die API ermöglicht das automatische, kontinuierliche Laden von Datenstreams in Snowflake, sobald die Daten verfügbar sind.

Snowpipe Streaming

In der folgenden Tabelle werden die Unterschiede zwischen Snowpipe Streaming und Snowpipe beschrieben:

Kategorie

Snowpipe Streaming

Snowpipe

Form der zu ladenden Daten

Zeilen

Dateien. Wenn Ihre bestehende Datenpipeline Dateien im Blobspeicher generiert, empfehlen wir die Verwendung von Snowpipe anstelle der API.

Anforderungen an Software von Drittanbietern

Kundenspezifischer Java-Anwendungscode-Wrapper für das Snowflake Ingest SDK

Keine

Reihenfolge der Daten

Geordnete Einfügungen innerhalb jedes Kanals

Nicht unterstützt. Snowpipe kann Daten aus Dateien in einer Reihenfolge laden, die von den Zeitstempeln der Dateierstellung im Cloudspeicher abweicht.

Ladeverlauf

Ladeverlauf wird in Ansicht SNOWPIPE_STREAMING_FILE_MIGRATION_HISTORY (Account Usage) erfasst

Aufgezeichneter Ladehverlauf in COPY_HISTORY (Account Usage) und COPY_HISTORY-Funktion (Information Schema)

Pipeobjekt

Bei der klassischen Architektur ist kein Pipe-Objekt erforderlich: Die API schreibt Datensätze direkt in die Zieltabellen. Die High-Performance-Architektur erfordert ein Pipe-Objekt.

Erfordert ein Pipeobjekt, das Stagingdatei-Daten in eine Warteschlange stellt und in Zieltabellen lädt.

Kanäle

Die API erfasst Zeilen über einen oder mehrere Kanäle. Ein Kanal stellt eine benannte logische Streaming-Verbindung zu Snowflake dar, über die Daten auf geordnete Weise in eine Tabelle geladen werden. Die Reihenfolge der Zeilen und die entsprechenden Offset-Token werden innerhalb eines Kanals aufbewahrt, aber nicht zwischen Kanälen, die auf dieselbe Tabelle verweisen.

In der klassischen Architektur ist ein einzelner Kanal genau einer Tabelle in Snowflake zugeordnet, auch wenn mehrere Kanäle auf dieselbe Tabelle verweisen können. Das Client-SDK kann mehrere Kanäle zu mehreren Tabellen öffnen, aber das SDK kann keine kontenübergreifenden Kanäle öffnen. Kanäle sind für eine lange Verwendungsdauer gedacht, wenn ein Client aktiv Daten einfügt, und sollten bei Neustarts des Clientprozesses wiederverwendet werden, da die Informationen zu Offset-Token erhalten bleiben. Die Daten innerhalb des Kanals werden standardmäßig jede Sekunde automatisch gelöscht, sodass der Kanal nicht geschlossen werden muss. Weitere Informationen dazu finden Sie unter Empfehlungen zur Latenz.

Kanaltabellenzuordnung für Snowpipe Streaming-Client

Wenn Sie einen Kanal und die zugehörigen Offset-Metadaten nicht mehr benötigen, können Sie ihn mithilfe der DropChannelRequest-API dauerhaft löschen. Es gibt zwei Möglichkeiten, einen Kanal zu löschen:

  • Löschen eines Kanals beim Schließen Die Daten im Kanal werden automatisch gelöscht, bevor der Kanal gelöscht wird.

  • Blindes Löschen eines Kanals Dies wird von uns nicht empfohlen, da durch das Löschen eines Kanals alle ausstehenden Daten verworfen werden.

Sie können den Befehl SHOW CHANNELS ausführen, um die Kanäle aufzulisten, für die Sie Zugriffsrechte haben. Weitere Informationen dazu finden Sie unter SHOW CHANNELS.

Bemerkung

Inaktive Kanäle werden zusammen mit ihren Offset-Token automatisch nach 30 Tagen Inaktivität gelöscht.

Offset-Token

Ein Offset-Token ist eine Zeichenfolge, die ein Client in seine Anfragen zur Zeilenübermittlung (z. B. für einzelne oder mehrere Zeilen) einfügen kann, um den Fortschritt der Datenaufnahme pro Kanal zu verfolgen. Die verwendeten Methoden sind insertRow oder insertRows für die klassische Architektur und appendRow oder appendRows für die High-Performance-Architektur. Das Token wird bei der Erstellung des Kanals mit NULL initialisiert, und es wird aktualisiert, wenn die Zeilen mit einem bereitgestellten Offset-Token durch einen asynchronen Prozess an Snowflake übergeben werden. Clients können in regelmäßigen Abständen Anfragen an die Methode getLatestCommittedOffsetToken stellen, um das neueste Offset-Token für einen bestimmten Kanal zu erhalten und dieses zu verwenden, um den Fortschritt der Datenaufnahmezu ermitteln. Dieses Token wird von Snowflake nicht zum Deduplizieren verwendet; jedoch können Clients dieses Token verwenden, um mithilfe eigener Logik eine Duplikatsvermeidung umzusetzen.

Wenn ein Client einen Kanal wieder öffnet, wird das letzte gespeicherte Offset-Token zurückgegeben. Der Client kann seine Position in der Datenquelle mithilfe des Tokens zurücksetzen, um zu vermeiden, dass dieselben Daten zweimal gesendet werden. Beachten Sie, dass bei einem Wiederöffnungsereignis eines Kanals alle in Snowflake gepufferten, unbestätigten Daten verworfen werden, damit sie nicht übertragen werden.

Sie können das letzte bestätigte Offset-Token verwenden, um die folgenden allgemeinen Anwendungsfälle durchzuführen:

  • Verfolgen des Erfassungsfortschritts

  • Überprüfen, ob ein bestimmter Offset übertragen wurde, indem dieser mit dem letzten übertragenen Offset-Token verglichen wird

  • Aktualisieren des Quellen-Offsets und Bereinigen der bereits bestätigten Daten

  • Aktivieren der Deduplizierung und Sicherstellen der genau einmaligen Bereitstellung von Daten

Beispielsweise könnte der Kafka-Konnektor ein Offset-Token aus einem Topic lesen, wie <partition>:<offset> oder einfach <offset>, wenn die Partition im Kanalnamen kodiert ist. Betrachten Sie das folgende Szenario:

  1. Der Kafka-Konnektor geht online und öffnet einen Kanal, der Partition 1 im Kafka-Topic T mit dem Kanalnamen T:P1 entspricht.

  2. Der Konnektor beginnt mit dem Lesen von Datensätzen aus der Kafka-Partition.

  3. Der Konnektor ruft die API auf und führt eine insertRows-Methodenanforderung mit dem Offset aus, der dem Datensatz als Offset-Token zugeordnet ist.

    Das Offset-Token könnte zum Beispiel 10 lauten und damit auf den zehnten Datensatz in der Kafka-Partition verweisen.

  4. Der Konnektor führt in regelmäßigen Abständen getLatestCommittedOffsetToken-Methodenanforderungen aus, um den Erfassungsfortschritt zu ermitteln.

Wenn der Kafka-Konnektor abstürzt, kann mit dem folgende Ablauf das Lesen von Datensätzen aus dem korrekten Offset für die Kafka-Partition wieder aufgenommen werden:

  1. Der Kafka-Konnektor ist wieder online und öffnet den Kanal erneut unter demselben Namen wie zuvor.

  2. Der Konnektor ruft die API auf und führt eine getLatestCommittedOffsetToken-Methodenanforderung aus, um den letzten bestätigten Offset für die Partition zu erhalten.

    Angenommen, das letzte persistent gespeicherte Offset-Token ist 20.

  3. Der Konnektor verwendet die Kafka-Lese-APIs, um einen Cursor zurückzusetzen, der dem Offset plus 1 zugeordnet ist (in diesem Beispiel 21).

  4. Der Konnektor setzt das Lesen der Datensätze fort. Nachdem der Lesecursor erfolgreich neu positioniert wurde, werden keine doppelten Daten abgerufen.

In einem weiteren Beispiel liest eine Anwendung Protokolle aus einem Verzeichnis und verwendet das Snowpipe Streaming Client SDK, um diese Protokolle nach Snowflake zu exportieren. Sie könnten eine Anwendung für den Protokollexport entwickeln, die Folgendes umsetzt:

  1. Auflisten der Dateien im Log-Verzeichnis.

    Angenommen, das Protokollierungs-Framework generiert Protokolldateien, die lexikografisch sortiert werden können, wobei neue Protokolldateien an das Ende dieser Sortierung positioniert werden.

  2. Liest eine Protokolldatei zeilenweise und ruft die API auf, die insertRows-Methodenanforderungen mit einem Offset-Token ausführt, das dem Namen der Protokolldatei und der Zeilenzahl oder Byteposition entspricht.

    Ein Offset-Token könnte zum Beispiel messages_1.log:20 sein, wobei messages_1.log der Name der Protokolldatei und 20 die Zeilennummer ist.

Wenn die Anwendung abstürzt oder neu gestartet werden muss, kann die API aufgerufen und eine getLatestCommittedOffsetToken-Methodenanforderung ausgeführt werden, um das Offset-Token abzurufen, das der letzten exportierten Protokolldatei und -zeile entspricht. Im Beispiel wäre dies messages_1.log:20. Die Anwendung würde dann messages_1.log öffnen und nach Zeile 21 suchen, um zu verhindern, dass dieselbe Protokollzeile zweimal erfasst wird.

Bemerkung

Die Informationen zum Offset-Token können verloren gehen. Das Offset-Token ist mit einem Kanalobjekt verknüpft, und ein Kanal wird automatisch gelöscht, wenn für einen Zeitraum von 30 Tagen keine neue Erfassung unter Verwendung des Kanals ausgeführt wird. Um den Verlust des Offset-Tokens zu verhindern, sollten Sie einen separaten Offset pflegen und das Offset-Token des Kanals bei Bedarf zurückzusetzen.

Rollen von offsetToken und continuationToken

offsetToken und continuationToken werden beide verwendet, um eine genau einmalige Datenbereitstellung sicherzustellen, aber sie dienen unterschiedlichen Zwecken und werden von unterschiedlichen Untersystemen verwaltet. Der primäre Unterschied besteht darin, wer den Wert des Tokens und den Umfang seiner Verwendung kontrolliert.

  • continuationToken (gilt nur für die leistungsstarke -Architektur und wird nur von direkten REST API-Benutzern verwendet):

    Dieses Token wird von Snowflake verwaltet und ist unerlässlich, um den Status einer einzelnen, kontinuierlichen Streaming-Sitzung aufrecht zu erhalten. Wenn ein Client Daten über die Append Rows API sendet, gibt Snowflake ein continuationToken zurück. Der Client muss dieses Token in seiner nächsten AppendRows-Anforderung zurücksenden, um sicherzustellen, dass die Daten in der richtigen Reihenfolge und ohne Lücken empfangen werden. Snowflake verwendet das Token, um doppelte oder fehlende Daten im Falle eines erneuten SDK-Versuchs zu erkennen und zu verhindern.

  • offsetToken (gilt sowohl für klassische als auch für High-Performance-Architekturen):

    Dieses Token ist ein benutzerdefinierter Bezeichner, der die genau einmalige Bereitstellung von einer externen Quelle ermöglicht. Snowflake verwendet dieses Token nicht für seine eigenen internen Operationen oder um die erneute Erfassung zu verhindern. Stattdessen speichert Snowflake diesen Wert einfach. Es liegt in der Verantwortung des externen Systems (wie eines Kafka-Konnektors), das offsetToken von Snowflake zu lesen und es zu verwenden, um den eigenen Aufnahmefortschritt zu verfolgen und das Senden doppelter Daten zu vermeiden, wenn der externe Stream wiederholt werden muss.

Nur-Einfüge-Operationen

Die API ist derzeit auf das Einfügen von Zeilen beschränkt. Um Daten ändern, löschen oder kombinieren zu können, müssen Sie die „rohen“ Datensätze in eine oder mehrere Stagingtabellen schreiben. Das Zusammenführen, Verknüpfen oder Transformieren der Daten kann mithilfe von kontinuierlichen Datenpipelines erfolgen, mit denen geänderte Daten in Zielberichtstabellen eingefügt werden können.

Unterstützte Java-Datentypen

Die folgende Tabelle fasst zusammen, welche Java-Datentypen für das Einfügen in Snowflake-Spalten unterstützt werden:

Snowflake-Spaltentyp

Zulässiger Java-Datentyp

  • CHAR

  • VARCHAR

  • String

  • primitive Datentypen (int, boolean, char …)

  • BigInteger, BigDecimal

  • BINARY

  • byte[]

  • String (hexadezimal-codiert)

  • NUMBER

  • numerische Typen (BigInteger, BigDecimal, byte, int, double …)

  • String

  • FLOAT

  • numerische Typen (BigInteger, BigDecimal, byte, int, double …)

  • String

  • BOOLEAN

  • boolean

  • numerische Typen (BigInteger, BigDecimal, byte, int, double …)

  • String

Siehe Details zur Boolean-Konvertierung.

  • TIME

  • java.time.LocalTime

  • java.time.OffsetTime

  • String

    • Als Integer gespeicherte Zeit

    • HH24:MI:SS.FFTZH:TZM (zum Beispiel 20:57:01.123456789+07:00)

    • HH24:MI:SS.FF (zum Beispiel 20:57:01.123456789)

    • HH24:MI:SS (zum Beispiel 20:57:01)

    • HH24:MI (zum Beispiel 20:57)

  • DATE

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.time.Instant

  • String

    • Als Integer gespeichertes Datum

    • YYYY-MM-DD (zum Beispiel 2013-04-28)

    • YYYY-MM-DDTHH24:MI:SS.FFTZH:TZM (zum Beispiel 2013-04-28T20:57:01.123456789+07:00)

    • YYYY-MM-DDTHH24:MI:SS.FF (zum Beispiel 2013-04-28T20:57:01.123456)

    • YYYY-MM-DDTHH24:MI:SS (zum Beispiel 2013-04-28T20:57:01)

    • YYYY-MM-DDTHH24:MI (zum Beispiel 2013-04-28T20:57)

    • YYYY-MM-DDTHH24:MI:SSTZH:TZM (zum Beispiel 2013-04-28T20:57:01-07:00)

    • YYYY-MM-DDTHH24:MITZH:TZM (zum Beispiel 2013-04-28T20:57-07:00)

  • TIMESTAMP_NTZ

  • TIMESTAMP_LTZ

  • TIMESTAMP_TZ

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.time.Instant

  • String

    • Als Integer gespeicherter Zeitstempel

    • YYYY-MM-DD (zum Beispiel 2013-04-28)

    • YYYY-MM-DDTHH24:MI:SS.FFTZH:TZM (zum Beispiel 2013-04-28T20:57:01.123456789+07:00)

    • YYYY-MM-DDTHH24:MI:SS.FF (zum Beispiel 2013-04-28T20:57:01.123456)

    • YYYY-MM-DDTHH24:MI:SS (zum Beispiel 2013-04-28T20:57:01)

    • YYYY-MM-DDTHH24:MI (zum Beispiel 2013-04-28T20:57)

    • YYYY-MM-DDTHH24:MI:SSTZH:TZM (zum Beispiel 2013-04-28T20:57:01-07:00)

    • YYYY-MM-DDTHH24:MITZH:TZM (zum Beispiel 2013-04-28T20:57-07:00)

  • VARIANT

  • ARRAY

  • String (muss gültiges JSON sein)

  • primitive Datentypen und deren Arrays

  • BigInteger, BigDecimal

  • java.time.LocalTime

  • java.time.OffsetTime

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.util.Map<String, T>, wobei T ein gültiger VARIANT-Typ ist

  • T[], wobei T ein gültiger VARIANT-Typ ist

  • Liste<T>, wobei T ein gültiger VARIANT-Typ ist

  • OBJECT

  • String (muss gültiges JSON-Objekt sein)

  • Map<String, T>, wobei T ein gültiger Variant-Typ ist

  • GEOGRAPHY

  • In der klassischen Architektur nicht unterstützt, aber in der High-Performance-Architektur unterstützt.

  • GEOMETRY

  • In der klassischen Architektur nicht unterstützt, aber in der High-Performance-Architektur unterstützt.

Erforderliche Zugriffsrechte

Zum Aufrufen der Snowpipe Streaming-API ist eine Rolle mit den folgenden Berechtigungen erforderlich:

Objekt

Berechtigung

Tabelle

OWNERSHIP oder mindestens INSERT und EVOLVE SCHEMA (nur erforderlich bei Verwendung der Schemaentwicklung für den Kafka-Konnektor mit Snowpipe Streaming)

Datenbank

USAGE

Schema

USAGE

Pipe

OPERATE (nur für die High-Performance-Architektur erforderlich)