Set up Openflow Connector for Kinesis for JSON data format¶
Bemerkung
This connector is subject to the Snowflake Connector Terms.
This topic describes how to set up Openflow Connector for Kinesis for JSON data format. This is a simplified connector optimized for basic message ingestion with schema evolution capabilities.
Das Openflow Connector for Kinesis für JSON-Datenformat ist für eine einfache JSON-Meldungserfassung von Kinesis-Streams in Snowflake-Tabellen konzipiert.
Voraussetzungen¶
Review Allgemeine Informationen zu Openflow Connector for Kinesis.
Ensure that you have set up Openflow with BYOC or set up Openflow with Snowflake Deployments.
If you are using Openflow - Snowflake Deployments, ensure that you have reviewed configuring required domains and have granted access to the required domains for the Kinesis connector.
Bemerkung
Wenn Sie die Unterstützung anderer Datenformate oder Features benötigen, wie z. B. DLQ, wenden Sie sich an Ihren Snowflake-Ansprechpartner.
Einen Kinesis-Stream einrichten¶
Als AWS-Administrator führen Sie die folgenden Aktionen in Ihrem AWS-Konto durch:
Ensure that you have an AWS User with IAM permissions to access Kinesis Streams and DynamoDB.
Ensure that the AWS User has configured Access Key credentials.
Snowflake-Konto einrichten¶
Als Snowflake-Kontoadministrator führen Sie die folgenden Aufgaben aus:
Erstellen Sie eine neue Rolle oder verwenden Sie eine vorhandene Rolle und erteilen Sie die Berechtigungen von Datenbanken.
Erstellen Sie eine Zieldatenbank und ein Zielschema, die verwendet werden, um Zieltabellen zum Speichern der Daten zu erstellen.
Wenn Sie die Fähigkeit des Konnektors nutzen möchten, automatisch eine Zieltabelle zu erstellen, wenn diese noch nicht existiert, stellen Sie sicher, dass der Benutzer über die erforderlichen Berechtigungen zum Erstellen und Verwalten von Snowflake-Objekten verfügt:
Objekt
Berechtigung
Anmerkungen
Datenbank
USAGE
Schema
USAGE . CREATE TABLE .
Nachdem die Objekte auf Schemaebene erstellt wurden, können die Berechtigungen für CREATE
objectwiderrufen werden.Tabelle
OWNERSHIP
Nur erforderlich, wenn Sie den Kinesis-Konnektor verwenden, um Daten in eine bestehende Tabelle zu importieren. . Wenn der Konnektor eine neue Zieltabelle für Datensätze aus dem Kinesis-Stream erstellt, wird die Standardrolle für den in der Konfiguration angegebenen Benutzer zum Tabelleneigentümer.
Sie können das folgende Skript verwenden, um eine benutzerdefinierte Rolle zu erstellen und zu konfigurieren (erfordert SECURITYADMIN oder gleichwertig):
USE ROLE SECURITYADMIN; CREATE ROLE kinesis_connector_role; GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role; GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role; GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role; -- Only for existing tables. GRANT OWNERSHIP ON TABLE existing_table TO ROLE kinesis_connector_role;
Erstellen Sie einen neuen Benutzer für den Snowflake-Dienst mit dem Typ SERVICE.
Weisen Sie dem Benutzer des Snowflake-Dienstes die Rolle zu, die Sie in den vorherigen Schritten erstellt haben.
GRANT ROLE kinesis_connector_role TO USER kinesis_connector_user; ALTER USER kinesis_connector_user SET DEFAULT_ROLE = kinesis_connector_role;
Konfigurieren Sie mit Schlüsselpaar-Authentifizierung für den Snowflake SERVICE-Benutzer aus Schritt 3.
Snowflake empfiehlt diesen Schritt dringend. Konfigurieren Sie einen von Openflow unterstützten Geheimnismanager, z. B. AWS, Azure und Hashicorp, und speichern Sie die öffentlichen und privaten Schlüssel im Geheimnisspeicher.
Bemerkung
Wenn Sie aus irgendeinem Grund keinen Geheimnismanager verwenden möchten, sind Sie dafür verantwortlich, die für die Schlüsselpaar-Authentifizierung verwendeten öffentlichen und privaten Schlüsseldateien gemäß den Sicherheitsrichtlinien Ihrer Organisation zu schützen.
After the secrets manager is configured, determine how you will authenticate to it. On AWS, it’s recommended that you use the EC2 instance role associated with Openflow as this way no other secrets have to be persisted.
Konfigurieren Sie in Openflow über das Hamburger-Menü oben rechts einen Parameteranbieter, der mit diesem Secrets Manager verbunden ist. Navigieren Sie zu Controller Settings » Parameter Provider und rufen Sie dann Ihre Parameterwerte ab.
Zu diesem Zeitpunkt können alle Anmeldeinformationen mit den zugehörigen Parameterpfaden referenziert werden, und es müssen keine sensiblen Werte innerhalb von Openflow aufbewahrt werden.
If any other Snowflake users require access to the ingested data and created tables (for example, for custom processing in Snowflake), grant those users the role created in step 2.
Einrichten des Konnektors¶
Als Data Engineer führen Sie die folgenden Aufgaben aus, um den Konnektor zu installieren und zu konfigurieren:
Konnektor installieren¶
Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.
Suchen Sie auf der Seite Openflow-Konnektoren den Konnektor und wählen Sie Add to runtime.
In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.
Bemerkung
Bevor Sie den Konnektor installieren, stellen Sie sicher, dass Sie in Snowflake eine Datenbank und ein Schema für den Konnektor erstellt haben, in dem die aufgenommenen Daten gespeichert werden.
Authentifizieren Sie sich bei der Bereitstellung mit den Anmeldedaten Ihres Snowflake-Kontos und wählen Sie Allow, wenn Sie dazu aufgefordert werden, damit die Laufzeitanwendung auf Ihr Snowflake-Konto zugreifen kann. Die Installation des Konnektors nimmt einige Minuten in Anspruch.
Authentifizieren Sie sich bei der Laufzeit mit den Anmeldeinformationen Ihres Snowflake-Kontos.
Das Openflow-Canvas wird mit der hinzugefügten Prozessgruppe des Konnektors angezeigt.
Konnektor konfigurieren¶
Right-click the imported process group and select Parameters.
Populate the required parameter values as described in Parameters.
Parameters¶
Dieser Abschnitt beschreibt alle Parameter für das Openflow Connector for Kinesis für JSON-Datenformat.
The connector consists of several modules. To see the set, double-click the connector process group. You can set the parameters for each module in the module’s parameter context.
Snowflake destination parameters¶
Parameter |
Beschreibung |
Erforderlich |
|---|---|---|
Destination Database |
Die Datenbank, in der die Daten als persistent gespeichert werden. Sie muss bereits in Snowflake vorhanden sein. Beim Namen wird zwischen Groß- und Kleinschreibung unterschieden. Bei Bezeichnern ohne Anführungszeichen geben Sie den Namen in Großbuchstaben an. |
Ja |
Destination Schema |
Das Schema, in dem Daten beibehalten werden, muss bereits in Snowflake vorhanden sein. Beim Namen wird zwischen Groß- und Kleinschreibung unterschieden. Bei Bezeichnern ohne Anführungszeichen geben Sie den Namen in Großbuchstaben an. Sehen Sie sich die folgenden Beispiele an:
|
Ja |
Iceberg Enabled |
Ob Iceberg für Tabellenoperationen aktiviert ist. Entweder |
Ja |
Schema Evolution Enabled |
Aktiviert oder deaktiviert die Schemaentwicklung auf Konnektorebene. Wenn aktiviert, werden automatische Schemaänderungen für Tabellen ermöglicht. Beachten Sie, dass die Schemaentwicklung auch auf Ebene der einzelnen Tabellen über tabellenspezifische Parameter gesteuert werden kann. Entweder |
Ja |
Schema Evolution For New Tables Enabled |
Steuert, ob die Schemaentwicklung beim Erstellen neuer Tabellen aktiviert wird. Wenn auf „true“ gesetzt, werden neue Tabellen mit dem Parameter ENABLE_SCHEMA_EVOLUTION = TRUE erstellt. Wenn auf „false“ gesetzt, werden neue Tabellen mit ENABLE_SCHEMA_EVOLUTION = FALSE erstellt. Nicht anwendbar auf Iceberg-Tabellen, da diese nicht automatisch erstellt werden. Diese Einstellung wirkt sich nur auf die Tabellenerstellung aus, nicht auf bestehende Tabellen. Entweder |
Ja |
Snowflake Account Identifier |
Bei Verwendung von:
|
Ja |
Snowflake Authentication Strategy |
Bei Verwendung von:
|
Ja |
Snowflake Private Key |
Bei Verwendung von:
|
Nein |
Snowflake Private Key File |
Bei Verwendung von:
|
Nein |
Snowflake Private Key Password |
Bei Verwendung von
|
Nein |
Snowflake Role |
Bei Verwendung von
|
Ja |
Snowflake-Benutzername |
Bei Verwendung von
|
Ja |
Kinesis JSON Source Parameters¶
Parameter |
Beschreibung |
Erforderlich |
|---|---|---|
AWS Region Code |
Die AWS-Region, in der sich Ihr Kinesis-Stream befindet, zum Beispiel |
Ja |
AWS Access Key ID |
The AWS Access Key ID to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch. |
Ja |
AWS Secret Access Key |
The AWS Secret Access Key to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch. |
Ja |
Kinesis Application Name |
Der Name der DynamoDB-Tabelle, der verwendet wird, um den Fortschritt der Anwendung bei der Verarbeitung des Kinesis-Streams nachzuverfolgen. |
Ja |
Kinesis Consumer Type |
Die Strategie zum Lesen von Datensätzen aus einem Kinesis Stream. Muss einer der folgenden Werte sein: |
Ja |
Kinesis Initial Stream Position |
Die anfängliche Stream-Position, von der aus die Replikation der Daten beginnt. Mögliche Werte sind:
|
Ja |
Kinesis Stream Name |
Name des AWS Kinesis-Streams, von dem Daten empfangen werden sollen. |
Ja |
Veröffentlichung von Metriken |
Gibt an, wo die Metriken der Kinesis Client Library veröffentlicht werden. Mögliche Werte: |
Ja |
Führen Sie den Ablauf aus¶
Right-click the plane and select Enable all Controller Services.
Right-click the connector’s process group and select Start.
Der Konnektor startet die Datenaufnahme.
Table schema¶
The Snowflake table loaded by the connector contains columns named by the keys of your Kinesis messages.
The connector also adds a KINESISMETADATA column which stores metadata about the record.
Nachfolgend finden Sie ein Beispiel für eine Snowflake-Tabelle, die vom Konnektor geladen wird:
Zeile |
ACCOUNT |
SYMBOL |
SIDE |
QUANTITY |
KINESISMETADATA |
|---|---|---|---|---|---|
1 |
ABC123 |
ZTEST |
BUY |
3572 |
{ … KINESISMETADATA object … } |
2 |
XYZ789 |
ZABZX |
SELL |
3024 |
{ … KINESISMETADATA object … } |
3 |
XYZ789 |
ZTEST |
SELL |
799 |
{ … KINESISMETADATA object … } |
4 |
ABC123 |
ZABZX |
BUY |
2033 |
{ … KINESISMETADATA object … } |
Die KINESISMETADATA-Spalte enthält ein Objekt mit den folgenden Feldern:
Feldname |
Feldtyp |
Example Value |
Beschreibung |
|---|---|---|---|
|
String |
|
Der Name des Kinesis-Streams, aus dem der Datensatz stammt. |
|
String |
|
Der Bezeichner der Freigabe in dem Stream, aus dem der Datensatz stammt. |
|
String |
|
Die ungefähre Zeit, zu der der Datensatz in den Stream eingefügt wurde (ISO 8601-Format). |
|
String |
|
Der vom Datenersteller für den Datensatz angegebene Partitionsschlüssel. |
|
String |
|
Die eindeutige Sequenznummer, die dem Datensatz in der Freigabe von Kinesis-Datenstreams zugewiesen wurden. |
|
Zahl |
|
Die Untersequenznummer des Datensatzes (wird für aggregierte Datensätze mit der gleichen Sequenznummer verwendet). |
|
String |
|
Eine Kombination aus der Sequenznummer und der Untersequenznummer für den Datensatz. |
Schemaentwicklung¶
The connector supports automatic schema detection and evolution. The structure of tables in Snowflake is defined and evolved automatically to support the structure of new data loaded by the connector.
Snowflake erkennt das Schema der eingehenden Daten und lädt die Daten in Tabellen, die mit einem benutzerdefinierten Schema übereinstimmen. Snowflake ermöglicht auch das Hinzufügen von neuen Spalten oder das Löschen der NOT NULL-Einschränkung von Spalten, die in den eingehenden Datensätzen fehlen.
Die Schemaerkennung mit dem Konnektor leitet Datentypen basierend auf den bereitgestellten JSON-Daten ab.
Wenn der Konnektor die Zieltabelle erstellt, ist die Schemaentwicklung standardmäßig aktiviert.
If you want to enable or disable schema evolution on an existing table,
use the ALTER TABLE command to set the ENABLE_SCHEMA_EVOLUTION parameter.
You must also use a role that has the OWNERSHIP privilege on the table. For more information, see Tabellenschemaentwicklung.
However, if schema evolution is disabled for an existing table, then the connector tries to send the rows with mismatched schemas to the configured failure output port.
Iceberg table support¶
Openflow Connector for Kinesis can ingest data into a Snowflake-managed Apache Iceberg™ table when Iceberg Enabled is set to true.
Anforderungen und Einschränkungen¶
Before you configure Openflow Connector for Kinesis for Iceberg table ingestion, note the following requirements and limitations:
Sie müssen eine Iceberg-Tabelle erstellen, bevor Sie den Konnektor ausführen.
Stellen Sie sicher, dass der Benutzer Daten in die erstellten Tabellen eingeben kann.
Konfiguration und Einrichtung¶
To configure Openflow Connector for Kinesis for Iceberg table ingestion, follow the steps in Set up Openflow Connector for Kinesis for JSON data format with a few differences noted in the following sections.
Aufnahme in die Iceberg-Tabellen aktivieren¶
Um die Aufnahme in eine Iceberg-Tabelle zu ermöglichen, müssen Sie den Parameter Iceberg Enabled auf true setzen.
Erstellen Sie eine Iceberg-Tabelle für die Datenaufnahme¶
Bevor Sie den Konnektorr ausführen, müssen Sie eine Iceberg-Tabelle erstellen. Das anfängliche Tabellenschema hängt von Ihrer Konnektoreinstellung für Schema Evolution Enabled ab.
If schema evolution is enabled, you must create a table with a column named kinesisMetadata.
The connector automatically creates the columns for message fields and alters the kinesisMetadata column schema.
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
kinesisMetadata OBJECT()
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table'
ENABLE_SCHEMA_EVOLUTION = true;
If schema evolution is disabled, you must create the table with all fields the Kinesis message contains. When you create an Iceberg table, you can use Iceberg data types or compatible Snowflake types. The semi-structured VARIANT type isn’t supported. Instead, use a structured OBJECT or MAP.
Betrachten Sie zum Beispiel die folgende Meldung:
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"date_added": "2024-10-15"
}
Die folgende Anweisung erstellt eine Tabelle mit allen Feldern, die die Kinesis-Meldung enthält:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
kinesisMetadata OBJECT(
stream STRING,
shardId STRING,
approximateArrival STRING,
partitionKey STRING,
sequenceNumber STRING,
subSequenceNumber INTEGER,
shardedSequenceNumber STRING
),
id INT,
body_temperature FLOAT,
name STRING,
approved_coffee_types ARRAY(STRING),
animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
date_added DATE
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
Bemerkung
kinesisMetadata must always be created. Field names inside nested structures such as dogs or cats are case sensitive.