Einrichten von Openflow Connector for Kinesis für JSON-Datenformat¶
Bemerkung
Dieser Connector unterliegt den `Nutzungsbedingungen für Snowflake Connector<https://www.snowflake.com/legal/snowflake-connector-terms/>`_.
Unter diesem Thema wird beschrieben, wie Sie Openflow Connector for Kinesis für JSON-Datenformat einrichten. Dies ist ein vereinfachter Konnektor, der für die grundlegende Erfassung von Meldungen mit Funktionen zur Schemaentwicklung optimiert ist.
Das Openflow Connector for Kinesis für JSON-Datenformat ist für eine einfache JSON-Meldungserfassung von Kinesis-Streams in Snowflake-Tabellen konzipiert.
Voraussetzungen¶
Lesen Sie Allgemeine Informationen zu Openflow Connector for Kinesis.
Stellen Sie sicher, dass Sie folgende Aufgaben ausgeführt haben: Openflow einrichten mit BYOC oder Openflow einrichten – Snowflake-Bereitstellungen.
Stellen Sie bei der Verwendung von Openflow - Snowflake Deployments sicher, dass Sie Konfigurieren der erforderlichen Domänen gelesen haben und Zugriff auf die erforderlichen Domänen für den Kinesis-Konnektor gewährt haben.
Bemerkung
Wenn Sie die Unterstützung anderer Datenformate oder Features benötigen, wie z. B. DLQ, wenden Sie sich an Ihren Snowflake-Ansprechpartner.
Einen Kinesis-Stream einrichten¶
Als AWS-Administrator führen Sie die folgenden Aktionen in Ihrem AWS-Konto durch:
Stellen Sie sicher, dass Sie einen AWS-Benutzer mit IAM-Berechtigungen für den Zugriff auf Kinesis-Streams und DynamoDB haben.
Stellen Sie sicher, dass der AWS-Benutzer Zugriffsschlüssel-Anmeldeinformationen konfiguriert hat.
Snowflake-Konto einrichten¶
Als Snowflake-Kontoadministrator führen Sie die folgenden Aufgaben aus:
Erstellen Sie eine neue Rolle oder verwenden Sie eine vorhandene Rolle und erteilen Sie die Berechtigungen von Datenbanken.
Erstellen Sie eine Zieldatenbank und ein Zielschema, die verwendet werden, um Zieltabellen zum Speichern der Daten zu erstellen.
Wenn Sie die Fähigkeit des Konnektors nutzen möchten, automatisch eine Zieltabelle zu erstellen, wenn diese noch nicht existiert, stellen Sie sicher, dass der Benutzer über die erforderlichen Berechtigungen zum Erstellen und Verwalten von Snowflake-Objekten verfügt:
Objekt
Berechtigung
Anmerkungen
Datenbank
USAGE
Schema
USAGE . CREATE TABLE .
Nachdem die Objekte auf Schemaebene erstellt wurden, können die Berechtigungen für CREATE
objectwiderrufen werden.Tabelle
OWNERSHIP
Nur erforderlich, wenn Sie den Kinesis-Konnektor verwenden, um Daten in eine bestehende Tabelle zu importieren. . Wenn der Konnektor eine neue Zieltabelle für Datensätze aus dem Kinesis-Stream erstellt, wird die Standardrolle für den in der Konfiguration angegebenen Benutzer zum Tabelleneigentümer.
Sie können das folgende Skript verwenden, um eine benutzerdefinierte Rolle zu erstellen und zu konfigurieren (erfordert SECURITYADMIN oder gleichwertig):
USE ROLE SECURITYADMIN; CREATE ROLE kinesis_connector_role; GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role; GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role; GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role; -- Only for existing tables. GRANT OWNERSHIP ON TABLE existing_table TO ROLE kinesis_connector_role;
Erstellen Sie einen neuen Benutzer für den Snowflake-Dienst mit dem Typ SERVICE.
Weisen Sie dem Benutzer des Snowflake-Dienstes die Rolle zu, die Sie in den vorherigen Schritten erstellt haben.
GRANT ROLE kinesis_connector_role TO USER kinesis_connector_user; ALTER USER kinesis_connector_user SET DEFAULT_ROLE = kinesis_connector_role;
Konfigurieren Sie mit Schlüsselpaar-Authentifizierung für den Snowflake SERVICE-Benutzer aus Schritt 3.
Snowflake empfiehlt diesen Schritt dringend. Konfigurieren Sie einen von Openflow unterstützten Geheimnismanager, z. B. AWS, Azure und Hashicorp, und speichern Sie die öffentlichen und privaten Schlüssel im Geheimnisspeicher.
Bemerkung
Wenn Sie aus irgendeinem Grund keinen Geheimnismanager verwenden möchten, sind Sie dafür verantwortlich, die für die Schlüsselpaar-Authentifizierung verwendeten öffentlichen und privaten Schlüsseldateien gemäß den Sicherheitsrichtlinien Ihrer Organisation zu schützen.
Nachdem der Geheimnismanager konfiguriert wurde, legen Sie fest, wie Sie sich bei diesem authentifizieren möchten. Für AWS wird empfohlen, dass Sie die EC2-Instanzrolle verwenden, die mit Openflow verbunden ist, da auf diese Weise keine weiteren Geheimnisse bestehen bleiben müssen.
Konfigurieren Sie in Openflow über das Hamburger-Menü oben rechts einen Parameteranbieter, der mit diesem Secrets Manager verbunden ist. Navigieren Sie zu Controller Settings » Parameter Provider und rufen Sie dann Ihre Parameterwerte ab.
Zu diesem Zeitpunkt können alle Anmeldeinformationen mit den zugehörigen Parameterpfaden referenziert werden, und es müssen keine sensiblen Werte innerhalb von Openflow aufbewahrt werden.
Wenn andere Snowflake-Benutzer Zugriff auf die aufgenommenen Daten und die erstellten Tabellen benötigen (z. B. für die kundenspezifische Verarbeitung in Snowflake), weisen Sie diesen Benutzern die in Schritt 2 erstellte Rolle zu.
Einrichten des Konnektors¶
Als Data Engineer führen Sie die folgenden Aufgaben aus, um den Konnektor zu installieren und zu konfigurieren:
Konnektor installieren¶
Navigieren Sie zur Übersichtsseite von Openflow. Wählen Sie im Abschnitt Featured connectors die Option View more connectors aus.
Suchen Sie auf der Seite Openflow-Konnektoren den Konnektor und wählen Sie Add to runtime.
Wählen Sie im Dialogfeld Select runtime Ihre Laufzeitumgebung aus der Dropdown-Liste Available runtimes aus, und klicken Sie auf Add.
Bemerkung
Bevor Sie den Konnektor installieren, stellen Sie sicher, dass Sie in Snowflake eine Datenbank und ein Schema für den Konnektor erstellt haben, in dem die aufgenommenen Daten gespeichert werden.
Authentifizieren Sie sich bei der Bereitstellung mit den Anmeldedaten Ihres Snowflake-Kontos und wählen Sie Allow, wenn Sie dazu aufgefordert werden, damit die Laufzeitanwendung auf Ihr Snowflake-Konto zugreifen kann. Die Installation des Konnektors nimmt einige Minuten in Anspruch.
Authentifizieren Sie sich bei der Laufzeit mit den Anmeldeinformationen Ihres Snowflake-Kontos.
Das Openflow-Canvas wird mit der hinzugefügten Prozessgruppe des Konnektors angezeigt.
Konnektor konfigurieren¶
Klicken Sie mit der rechten Maustaste auf die importierte Prozessgruppe, und wählen Sie Parameters aus.
Füllen Sie die erforderlichen Parameterwerte aus, wie unter Parameter beschrieben.
Parameter¶
Dieser Abschnitt beschreibt alle Parameter für das Openflow Connector for Kinesis für JSON-Datenformat.
Der Konnektor besteht aus mehreren Modulen. Um das Set anzuzeigen, doppelklicken Sie auf die Konnektor-Prozessgruppe. Sie können die Parameter für jedes Modul im Parameterkontext des Moduls festlegen.
Snowflake-Zielparameter¶
Parameter |
Beschreibung |
Erforderlich |
|---|---|---|
Destination Database |
Die Datenbank, in der die Daten als persistent gespeichert werden. Sie muss bereits in Snowflake vorhanden sein. Beim Namen wird zwischen Groß- und Kleinschreibung unterschieden. Bei Bezeichnern ohne Anführungszeichen geben Sie den Namen in Großbuchstaben an. |
Ja |
Destination Schema |
Das Schema, in dem Daten beibehalten werden, muss bereits in Snowflake vorhanden sein. Beim Namen wird zwischen Groß- und Kleinschreibung unterschieden. Bei Bezeichnern ohne Anführungszeichen geben Sie den Namen in Großbuchstaben an. Sehen Sie sich die folgenden Beispiele an:
|
Ja |
Iceberg Enabled |
Ob Iceberg für Tabellenoperationen aktiviert ist. Entweder |
Ja |
Schema Evolution Enabled |
Aktiviert oder deaktiviert die Schemaentwicklung auf Konnektorebene. Wenn aktiviert, werden automatische Schemaänderungen für Tabellen ermöglicht. Beachten Sie, dass die Schemaentwicklung auch auf Ebene der einzelnen Tabellen über tabellenspezifische Parameter gesteuert werden kann. Entweder |
Ja |
Schema Evolution For New Tables Enabled |
Steuert, ob die Schemaentwicklung beim Erstellen neuer Tabellen aktiviert wird. Wenn auf „true“ gesetzt, werden neue Tabellen mit dem Parameter ENABLE_SCHEMA_EVOLUTION = TRUE erstellt. Wenn auf „false“ gesetzt, werden neue Tabellen mit ENABLE_SCHEMA_EVOLUTION = FALSE erstellt. Nicht anwendbar auf Iceberg-Tabellen, da diese nicht automatisch erstellt werden. Diese Einstellung wirkt sich nur auf die Tabellenerstellung aus, nicht auf bestehende Tabellen. Entweder |
Ja |
Snowflake Account Identifier |
Bei Verwendung von:
|
Ja |
Snowflake Authentication Strategy |
Bei Verwendung von:
|
Ja |
Snowflake Private Key |
Bei Verwendung von:
|
Nein |
Snowflake Private Key File |
Bei Verwendung von:
|
Nein |
Snowflake Private Key Password |
Bei Verwendung von
|
Nein |
Snowflake Role |
Bei Verwendung von
|
Ja |
Snowflake-Benutzername |
Bei Verwendung von
|
Ja |
Kinesis JSON-Quellparameter¶
Parameter |
Beschreibung |
Erforderlich |
|---|---|---|
AWS Region Code |
Die AWS-Region, in der sich Ihr Kinesis-Stream befindet, zum Beispiel |
Ja |
AWS Access Key ID |
Die AWS-Zugriffsschlüssel-ID für eine Verbindung zu Ihrem Kinesis-Stream, DynamoDB und optional CloudWatch. |
Ja |
AWS Secret Access Key |
Der geheime AWS-Zugriffsschlüssel für eine Verbindung zu Ihrem Kinesis-Stream, DynamoDB und optional CloudWatch. |
Ja |
Kinesis Application Name |
Der Name der DynamoDB-Tabelle, der verwendet wird, um den Fortschritt der Anwendung bei der Verarbeitung des Kinesis-Streams nachzuverfolgen. |
Ja |
Kinesis-Verbrauchertyp |
Die Strategie zum Lesen von Datensätzen aus einem Kinesis Stream. Muss einer der folgenden Werte sein: |
Ja |
Kinesis Initial Stream Position |
Die anfängliche Stream-Position, von der aus die Replikation der Daten beginnt. Mögliche Werte sind:
|
Ja |
Kinesis Stream Name |
Name des AWS Kinesis-Streams, von dem Daten empfangen werden sollen. |
Ja |
Veröffentlichung von Metriken |
Gibt an, wo die Metriken der Kinesis Client Library veröffentlicht werden. Mögliche Werte: |
Ja |
Führen Sie den Ablauf aus¶
Klicken Sie mit der rechten Maustaste auf die Ebene, und wählen Sie Enable all Controller Services aus.
Klicken Sie mit der rechten Maustaste auf die Prozessgruppe des Konnektors, und wählen Sie Start aus.
Der Konnektor startet die Datenaufnahme.
Tabellenschema¶
Die vom Konnektor geladene Snowflake-Tabelle enthält Spalten, die nach den Schlüsseln Ihrer Kinesis-Meldungen benannt sind. Der Konnektor fügt außerdem eine Spalte KINESISMETADATA hinzu, in der Metadaten zu dem Datensatz gespeichert werden.
Nachfolgend finden Sie ein Beispiel für eine Snowflake-Tabelle, die vom Konnektor geladen wird:
Zeile |
ACCOUNT |
SYMBOL |
SIDE |
QUANTITY |
KINESISMETADATA |
|---|---|---|---|---|---|
1 |
ABC123 |
ZTEST |
BUY |
3572 |
{ … KINESISMETADATA object … } |
2 |
XYZ789 |
ZABZX |
SELL |
3024 |
{ … KINESISMETADATA object … } |
3 |
XYZ789 |
ZTEST |
SELL |
799 |
{ … KINESISMETADATA object … } |
4 |
ABC123 |
ZABZX |
BUY |
2033 |
{ … KINESISMETADATA object … } |
Die KINESISMETADATA-Spalte enthält ein Objekt mit den folgenden Feldern:
Feldname |
Feldtyp |
Beispielwert |
Beschreibung |
|---|---|---|---|
|
String |
|
Der Name des Kinesis-Streams, aus dem der Datensatz stammt. |
|
String |
|
Der Bezeichner der Freigabe in dem Stream, aus dem der Datensatz stammt. |
|
String |
|
Die ungefähre Zeit, zu der der Datensatz in den Stream eingefügt wurde (ISO 8601-Format). |
|
String |
|
Der vom Datenersteller für den Datensatz angegebene Partitionsschlüssel. |
|
String |
|
Die eindeutige Sequenznummer, die dem Datensatz in der Freigabe von Kinesis-Datenstreams zugewiesen wurden. |
|
Zahl |
|
Die Untersequenznummer des Datensatzes (wird für aggregierte Datensätze mit der gleichen Sequenznummer verwendet). |
|
String |
|
Eine Kombination aus der Sequenznummer und der Untersequenznummer für den Datensatz. |
Schemaentwicklung¶
Der Konnektor unterstützt die automatische Schemaerkennung und die automatische Schemaentwicklung. Die Struktur von Tabellen in Snowflake wird automatisch definiert und weiterentwickelt, um die Struktur neuer Daten zu unterstützen, die vom Konnektor geladen werden.
Snowflake erkennt das Schema der eingehenden Daten und lädt die Daten in Tabellen, die mit einem benutzerdefinierten Schema übereinstimmen. Snowflake ermöglicht auch das Hinzufügen von neuen Spalten oder das Löschen der NOT NULL-Einschränkung von Spalten, die in den eingehenden Datensätzen fehlen.
Die Schemaerkennung mit dem Konnektor leitet Datentypen basierend auf den bereitgestellten JSON-Daten ab.
Wenn der Konnektor die Zieltabelle erstellt, ist die Schemaentwicklung standardmäßig aktiviert.
Wenn Sie die Schemaentwicklung für eine bestehende Tabelle aktivieren oder deaktivieren möchten, verwenden Sie den Befehl ALTER TABLE, um den Parameter ENABLE_SCHEMA_EVOLUTION festzulegen. Sie müssen außerdem eine Rolle verwenden, die über die OWNERSHIP-Berechtigung für die Tabelle verfügt. Weitere Informationen dazu finden Sie unter Tabellenschemaentwicklung.
Wenn jedoch die Schemaentwicklung für eine bestehende Tabelle deaktiviert ist, versucht der Konnektor, die Zeilen mit den nicht übereinstimmenden Schemas an den konfigurierten Fehler-Ausgabeport zu senden.
Unterstützung von Iceberg-Tabellen¶
Openflow Connector for Kinesis kann Daten in eine von Snowflake verwaltete Apache Iceberg™-Tabelle aufnehmen, wenn Iceberg Enabled auf true gesetzt ist.
Anforderungen und Einschränkungen¶
Bevor Sie Openflow Connector for Kinesis für die Aufnahme von Iceberg-Tabellen konfigurieren, beachten Sie die folgenden Anforderungen und Einschränkungen:
Sie müssen eine Iceberg-Tabelle erstellen, bevor Sie den Konnektor ausführen.
Stellen Sie sicher, dass der Benutzer Daten in die erstellten Tabellen eingeben kann.
Konfiguration und Einrichtung¶
Zur Konfiguration von Openflow Connector for Kinesis für die Aufnahme von Iceberg-Tabellen folgen Sie den Schritten unter Einrichten von Openflow Connector for Kinesis für JSON-Datenformat mit ein paar Unterschieden, wie in den folgenden Abschnitten beschrieben.
Aufnahme in die Iceberg-Tabellen aktivieren¶
Um die Aufnahme in eine Iceberg-Tabelle zu ermöglichen, müssen Sie den Parameter Iceberg Enabled auf true setzen.
Erstellen Sie eine Iceberg-Tabelle für die Datenaufnahme¶
Bevor Sie den Konnektorr ausführen, müssen Sie eine Iceberg-Tabelle erstellen. Das anfängliche Tabellenschema hängt von Ihrer Konnektoreinstellung für Schema Evolution Enabled ab.
Wenn die Schemaentwicklung aktiviert ist, müssen Sie eine Tabelle mit einer Spalte namens kinesisMetadata erstellen. Der Konnektor erstellt automatisch die Spalten für Meldungsfelder und ändert das Spaltenschema kinesisMetadata.
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
kinesisMetadata OBJECT()
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table'
ENABLE_SCHEMA_EVOLUTION = true;
Wenn die Schemaentwicklung deaktiviert ist, müssen Sie die Tabelle mit allen Feldern erstellen, die die Kinesis-Meldung enthält. Wenn Sie eine Iceberg-Tabelle erstellen, können Sie Iceberg-Datentypen oder kompatible Snowflake-Typen verwenden. Der halbstrukturierte VARIANT-Typ wird nicht unterstützt. Verwenden Sie stattdessen eins strukturiertes OBJECT oder MAP.
Betrachten Sie zum Beispiel die folgende Meldung:
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"date_added": "2024-10-15"
}
Die folgende Anweisung erstellt eine Tabelle mit allen Feldern, die die Kinesis-Meldung enthält:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
kinesisMetadata OBJECT(
stream STRING,
shardId STRING,
approximateArrival STRING,
partitionKey STRING,
sequenceNumber STRING,
subSequenceNumber INTEGER,
shardedSequenceNumber STRING
),
id INT,
body_temperature FLOAT,
name STRING,
approved_coffee_types ARRAY(STRING),
animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
date_added DATE
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
Bemerkung
kinesisMetadata muss immer erstellt werden. Bei Feldnamen innerhalb verschachtelter Strukturen wie dogs oder cats wird zwischen Groß- und Kleinschreibung unterschieden.