Set up Openflow Connector for Kinesis for JSON data format

Bemerkung

This connector is subject to the Snowflake Connector Terms.

Unter diesem Thema werden die Einrichtungsschritte des Openflow Connector for Kinesis für JSON-Datenformats beschrieben. Dies ist ein vereinfachter Konnektor, der für die grundlegende Erfassung von Meldungen mit Funktionen zur Schemaentwicklung optimiert ist.

Das Openflow Connector for Kinesis für JSON-Datenformat ist für eine einfache JSON-Meldungserfassung von Kinesis-Streams in Snowflake-Tabellen konzipiert.

Voraussetzungen

  1. Stellen Sie sicher, dass Sie Allgemeine Informationen zu Openflow Connector for Kinesis gelesen haben.

  2. Ensure that you have Openflow einrichten – BYOC or Set up Openflow - Snowflake Deployments.

  3. Stellen Sie bei der Verwendung von Openflow - Snowflake Deployments sicher, dass Sie Konfigurieren der erforderlichen Domänen gelesen und Zugriff auf die erforderlichen Domänen für den Kinesis-Konnektor gewährt haben.

Bemerkung

Wenn Sie die Unterstützung anderer Datenformate oder Features benötigen, wie z. B. DLQ, wenden Sie sich an Ihren Snowflake-Ansprechpartner.

Einen Kinesis-Stream einrichten

Als AWS-Administrator führen Sie die folgenden Aktionen in Ihrem AWS-Konto durch:

  1. Ensure that you have an AWS User with IAM permissions to access Kinesis Streams and DynamoDB.

  2. Ensure that the AWS User has configured Access Key credentials.

Snowflake-Konto einrichten

Als Snowflake-Kontoadministrator führen Sie die folgenden Aufgaben aus:

  1. Erstellen Sie eine neue Rolle oder verwenden Sie eine vorhandene Rolle und erteilen Sie die Berechtigungen von Datenbanken.

  2. Erstellen Sie eine Zieldatenbank und ein Zielschema, die verwendet werden, um Zieltabellen zum Speichern der Daten zu erstellen.

    1. Wenn Sie die Fähigkeit des Konnektors nutzen möchten, automatisch eine Zieltabelle zu erstellen, wenn diese noch nicht existiert, stellen Sie sicher, dass der Benutzer über die erforderlichen Berechtigungen zum Erstellen und Verwalten von Snowflake-Objekten verfügt:

      Objekt

      Berechtigung

      Anmerkungen

      Datenbank

      USAGE

      Schema

      USAGE . CREATE TABLE .

      Nachdem die Objekte auf Schemaebene erstellt wurden, können die Berechtigungen für CREATE object widerrufen werden.

      Tabelle

      OWNERSHIP

      Nur erforderlich, wenn Sie den Kinesis-Konnektor verwenden, um Daten in eine bestehende Tabelle zu importieren. . Wenn der Konnektor eine neue Zieltabelle für Datensätze aus dem Kinesis-Stream erstellt, wird die Standardrolle für den in der Konfiguration angegebenen Benutzer zum Tabelleneigentümer.

      Sie können das folgende Skript verwenden, um eine benutzerdefinierte Rolle zu erstellen und zu konfigurieren (erfordert SECURITYADMIN oder gleichwertig):

      USE ROLE SECURITYADMIN;
      
      CREATE ROLE kinesis_connector_role;
      GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role;
      GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role;
      GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role;
      
      -- Only for existing tables.
      GRANT OWNERSHIP ON TABLE existing_table TO ROLE kinesis_connector_role;
      
      Copy
  3. Erstellen Sie einen neuen Benutzer für den Snowflake-Dienst mit dem Typ SERVICE.

  4. Weisen Sie dem Benutzer des Snowflake-Dienstes die Rolle zu, die Sie in den vorherigen Schritten erstellt haben.

    GRANT ROLE kinesis_connector_role TO USER kinesis_connector_user;
    ALTER USER kinesis_connector_user SET DEFAULT_ROLE = kinesis_connector_role;
    
    Copy
  5. Konfigurieren Sie mit Schlüsselpaar-Authentifizierung für den Snowflake SERVICE-Benutzer aus Schritt 3.

  6. Snowflake empfiehlt diesen Schritt dringend. Konfigurieren Sie einen von Openflow unterstützten Geheimnismanager, z. B. AWS, Azure und Hashicorp, und speichern Sie die öffentlichen und privaten Schlüssel im Geheimnisspeicher.

    Bemerkung

    Wenn Sie aus irgendeinem Grund keinen Geheimnismanager verwenden möchten, sind Sie dafür verantwortlich, die für die Schlüsselpaar-Authentifizierung verwendeten öffentlichen und privaten Schlüsseldateien gemäß den Sicherheitsrichtlinien Ihrer Organisation zu schützen.

    1. Nachem der Geheimnismanager konfiguriert ist, legen Sie fest, wie Sie sich bei ihm authentifizieren möchten. Auf AWS wird empfohlen, die mit Openflow verknüpfte EC2-Instanzrolle zu verwenden, da auf diese Weise keine weiteren Geheimnisse gespeichert werden müssen.

    2. Konfigurieren Sie in Openflow über das Hamburger-Menü oben rechts einen Parameteranbieter, der mit diesem Secrets Manager verbunden ist. Navigieren Sie zu Controller Settings » Parameter Provider und rufen Sie dann Ihre Parameterwerte ab.

    3. Zu diesem Zeitpunkt können alle Anmeldeinformationen mit den zugehörigen Parameterpfaden referenziert werden, und es müssen keine sensiblen Werte innerhalb von Openflow aufbewahrt werden.

  7. If any other Snowflake users require access to the ingested data and created tables (for example, for custom processing in Snowflake), then grant those users the role created in step 2.

Einrichten des Konnektors

Als Data Engineer führen Sie die folgenden Aufgaben aus, um den Konnektor zu installieren und zu konfigurieren:

Konnektor installieren

  1. Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.

  2. Suchen Sie auf der Seite Openflow-Konnektoren den Konnektor und wählen Sie Add to runtime.

  3. In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.

    Bemerkung

    Bevor Sie den Konnektor installieren, stellen Sie sicher, dass Sie in Snowflake eine Datenbank und ein Schema für den Konnektor erstellt haben, in dem die aufgenommenen Daten gespeichert werden.

  4. Authentifizieren Sie sich bei der Bereitstellung mit den Anmeldedaten Ihres Snowflake-Kontos und wählen Sie Allow, wenn Sie dazu aufgefordert werden, damit die Laufzeitanwendung auf Ihr Snowflake-Konto zugreifen kann. Die Installation des Konnektors nimmt einige Minuten in Anspruch.

  5. Authentifizieren Sie sich bei der Laufzeit mit den Anmeldeinformationen Ihres Snowflake-Kontos.

Das Openflow-Canvas wird mit der hinzugefügten Prozessgruppe des Konnektors angezeigt.

Konnektor konfigurieren

  1. Klicken Sie mit der rechten Maustaste auf die importierte Prozessgruppe und wählen Sie Parameters.

  2. Populate the required parameter values as described in Parameters section below.

Parameters

Dieser Abschnitt beschreibt alle Parameter für das Openflow Connector for Kinesis für JSON-Datenformat.

Der Konnektor besteht aus mehreren Modulen. Um das Set anzuzeigen, doppelklicken Sie auf die Konnektor-Prozessgruppe. Sie können die Parameter für jedes Modul im Parameterkontext des Moduls festlegen.

Snowflake destination parameters

Parameter

Beschreibung

Erforderlich

Destination Database

Die Datenbank, in der die Daten als persistent gespeichert werden. Sie muss bereits in Snowflake vorhanden sein. Beim Namen wird zwischen Groß- und Kleinschreibung unterschieden. Bei Bezeichnern ohne Anführungszeichen geben Sie den Namen in Großbuchstaben an.

Ja

Destination Schema

Das Schema, in dem Daten beibehalten werden, muss bereits in Snowflake vorhanden sein. Beim Namen wird zwischen Groß- und Kleinschreibung unterschieden. Bei Bezeichnern ohne Anführungszeichen geben Sie den Namen in Großbuchstaben an.

Sehen Sie sich die folgenden Beispiele an:

  • CREATE SCHEMA SCHEMA_NAME oder CREATE SCHEMA schema_name: verwenden Sie SCHEMA_NAME

  • CREATE SCHEMA "schema_name" oder CREATE SCHEMA "SCHEMA_NAME": verwenden Sie schema_name bzw. SCHEMA_NAME.

Ja

Iceberg Enabled

Ob Iceberg für Tabellenoperationen aktiviert ist. Entweder true oder false.

Ja

Schema Evolution Enabled

Aktiviert oder deaktiviert die Schemaentwicklung auf Konnektorebene. Wenn aktiviert, werden automatische Schemaänderungen für Tabellen ermöglicht. Beachten Sie, dass die Schemaentwicklung auch auf Ebene der einzelnen Tabellen über tabellenspezifische Parameter gesteuert werden kann. Entweder true oder false.

Ja

Schema Evolution For New Tables Enabled

Steuert, ob die Schemaentwicklung beim Erstellen neuer Tabellen aktiviert wird. Wenn auf „true“ gesetzt, werden neue Tabellen mit dem Parameter ENABLE_SCHEMA_EVOLUTION = TRUE erstellt. Wenn auf „false“ gesetzt, werden neue Tabellen mit ENABLE_SCHEMA_EVOLUTION = FALSE erstellt. Nicht anwendbar auf Iceberg-Tabellen, da diese nicht automatisch erstellt werden. Diese Einstellung wirkt sich nur auf die Tabellenerstellung aus, nicht auf bestehende Tabellen. Entweder true oder false.

Ja

Snowflake Account Identifier

Bei Verwendung von:

  • Strategie für die Authentifizierung mit Sitzungstoken: Muss leer sein.

  • KEY_PAIR: Snowflake-Kontoname im Format [Organisationsname]-[Kontoname], wobei die Daten persistent gespeichert werden.

Ja

Snowflake Authentication Strategy

Bei Verwendung von:

  • Snowflake Openflow-Bereitstellung oder BYOC: Verwenden Sie SNOWFLAKE_SESSION_TOKEN. Dieses Token wird automatisch von Snowflake verwaltet. Für BYOC-Bereitstellungen müssen zuvor Laufzeitrollen konfiguriert sein, um SNOWFLAKE_SESSION_TOKEN zu verwenden.

  • BYOC: Alternatively BYOC can use KEY_PAIR as the value for authentication strategy.

Ja

Snowflake Private Key

Bei Verwendung von:

  • Strategie für die Authentifizierung mit Sitzungstoken: Muss leer sein.

  • KEY_PAIR: Muss der RSA private Schlüssel sein, der für die Authentifizierung verwendet wird.

    The RSA key must be formatted according to PKCS8 standards and have standard PEM headers and footers. Note that either a Snowflake Private Key File or a Snowflake Private Key must be defined.

Nein

Snowflake Private Key File

Bei Verwendung von:

  • Strategie für die Authentifizierung mit Sitzungstoken: Die private Schlüsseldatei muss leer sein.

  • KEY_PAIR: Laden Sie die Datei hoch, die den RSA Private Key für die Authentifizierung bei Snowflake enthält, formatiert nach PKCS8-Standards und mit Standard-PEM-Header und -Footer. Die Header-Zeile beginnt mit -----BEGIN PRIVATE. Aktivieren Sie das Kontrollkästchen Reference asset, um die Private Key-Datei hochzuladen.

Nein

Snowflake Private Key Password

Bei Verwendung von

  • Strategie für die Authentifizierung mit Sitzungstoken: Muss leer sein.

  • KEY_PAIR: Geben Sie das Kennwort an, das mit der privaten Snowflake-Schlüsseldatei verbunden ist.

Nein

Snowflake Role

Bei Verwendung von

  • Session Token Authentication Strategy: Use your Snowflake Role. You can find your Snowflake Role in the Openflow UI, by navigating to View Details for your Runtime.

  • KEY_PAIR Authentifizierungsstrategie: Verwenden Sie eine gültige Rolle, die für Ihren Dienstbenutzer konfiguriert ist.

Ja

Snowflake-Benutzername

Bei Verwendung von

  • Strategie für die Authentifizierung mit Sitzungstoken: Muss leer sein.

  • KEY_PAIR: Geben Sie den Benutzernamen an, der für die Verbindung mit der Snowflake-Instanz verwendet wird.

Ja

Kinesis JSON Source Parameters

Parameter

Beschreibung

Erforderlich

AWS Region Code

Die AWS-Region, in der sich Ihr Kinesis-Stream befindet, zum Beispiel us-west-2.

Ja

AWS Access Key ID

The AWS Access Key ID to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch.

Ja

AWS Secret Access Key

The AWS Secret Access Key to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch.

Ja

Kinesis Application Name

Der Name der DynamoDB-Tabelle, der verwendet wird, um den Fortschritt der Anwendung bei der Verarbeitung des Kinesis-Streams nachzuverfolgen.

Ja

Kinesis Initial Stream Position

Die anfängliche Stream-Position, von der aus die Replikation der Daten beginnt.

Mögliche Werte sind:
  • LATEST: Letzter gespeicherter Datensatz

  • TRIM_HORIZON: Frühester gespeicherter Datensatz

Ja

Kinesis Stream Name

Name des AWS Kinesis-Streams, von dem Daten empfangen werden sollen.

Ja

Veröffentlichung von Metriken

Gibt an, wo die Metriken der Kinesis Client Library veröffentlicht werden. Mögliche Werte: DISABLED, LOGS, CLOUDWATCH.

Ja

Führen Sie den Ablauf aus

  1. Klicken Sie mit der rechten Maustaste auf die Ebene, und wählen Sie Enable all Controller Services.

  2. Right-click on the connector’s process group and select Start.

Der Konnektor startet die Datenaufnahme.

Table Schema

The Snowflake table loaded by the connector contains columns named by the keys of your Kinesis messages. The connector also adds a KINESISMETADATA column which stores metadata about the record.

Nachfolgend finden Sie ein Beispiel für eine Snowflake-Tabelle, die vom Konnektor geladen wird:

Zeile

ACCOUNT

SYMBOL

SIDE

QUANTITY

KINESISMETADATA

1

ABC123

ZTEST

BUY

3572

{ … KINESISMETADATA object … }

2

XYZ789

ZABZX

SELL

3024

{ … KINESISMETADATA object … }

3

XYZ789

ZTEST

SELL

799

{ … KINESISMETADATA object … }

4

ABC123

ZABZX

BUY

2033

{ … KINESISMETADATA object … }

Die KINESISMETADATA-Spalte enthält ein Objekt mit den folgenden Feldern:

Feldname

Feldtyp

Example Value

Beschreibung

stream

String

stream-name

Der Name des Kinesis-Streams, aus dem der Datensatz stammt.

shardId

String

shardId-000000000001

Der Bezeichner der Freigabe in dem Stream, aus dem der Datensatz stammt.

approximateArrival

String

2025-11-05T09:12:15.300

Die ungefähre Zeit, zu der der Datensatz in den Stream eingefügt wurde (ISO 8601-Format).

partitionKey

String

key-1234

Der vom Datenersteller für den Datensatz angegebene Partitionsschlüssel.

sequenceNumber

String

123456789

Die eindeutige Sequenznummer, die dem Datensatz in der Freigabe von Kinesis-Datenstreams zugewiesen wurden.

subSequenceNumber

Zahl

2

Die Untersequenznummer des Datensatzes (wird für aggregierte Datensätze mit der gleichen Sequenznummer verwendet).

shardedSequenceNumber

String

12345678900002

Eine Kombination aus der Sequenznummer und der Untersequenznummer für den Datensatz.

Schemaentwicklung

Dieser Konnektor unterstützt die automatische Schemaerkennung und die automatische Schemaentwicklung. Die Struktur von Tabellen in Snowflake wird automatisch definiert und weiterentwickelt, um die Struktur neuer Daten zu unterstützen, die vom Konnektor geladen werden.

Snowflake erkennt das Schema der eingehenden Daten und lädt die Daten in Tabellen, die mit einem benutzerdefinierten Schema übereinstimmen. Snowflake ermöglicht auch das Hinzufügen von neuen Spalten oder das Löschen der NOT NULL-Einschränkung von Spalten, die in den eingehenden Datensätzen fehlen.

Die Schemaerkennung mit dem Konnektor leitet Datentypen basierend auf den bereitgestellten JSON-Daten ab.

Wenn der Konnektor die Zieltabelle erstellt, ist die Schemaentwicklung standardmäßig aktiviert.

If you want to enable or disable schema evolution on an existing table, use the ALTER TABLE command to set the ENABLE_SCHEMA_EVOLUTION parameter. You must also use a role that has the OWNERSHIP privilege on the table. For more information, see Tabellenschemaentwicklung.

However, if schema evolution is disabled for an existing table, then the connector will try to send the rows with mismatched schemas to the configured failure output port.

Iceberg table support

Openflow Connector for Kinesis can ingest data into a Snowflake-managed Apache Iceberg™ table when Iceberg Enabled is set to true.

Anforderungen und Einschränkungen

Before you configure the Openflow Connector for Kinesis for Iceberg table ingestion, note the following requirements and limitations:

  • Sie müssen eine Iceberg-Tabelle erstellen, bevor Sie den Konnektor ausführen.

  • Stellen Sie sicher, dass der Benutzer Daten in die erstellten Tabellen eingeben kann.

Konfiguration und Einrichtung

To configure the Openflow Connector for Kinesis for Iceberg table ingestion, follow the steps in Set up Openflow Connector for Kinesis for JSON data format with a few differences noted in the following sections.

Aufnahme in die Iceberg-Tabellen aktivieren

Um die Aufnahme in eine Iceberg-Tabelle zu ermöglichen, müssen Sie den Parameter Iceberg Enabled auf true setzen.

Erstellen Sie eine Iceberg-Tabelle für die Datenaufnahme

Bevor Sie den Konnektorr ausführen, müssen Sie eine Iceberg-Tabelle erstellen. Das anfängliche Tabellenschema hängt von Ihrer Konnektoreinstellung für Schema Evolution Enabled ab.

Wenn die Schemaentwicklung aktiviert ist, müssen Sie eine Tabelle mit einer Spalte namens kinesisMetadata erstellen. Der Konnektor erstellt automatisch die Spalten für Meldungsfelder und ändert das Spaltenschema kinesisMetadata.

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    kinesisMetadata OBJECT()
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table'
  ENABLE_SCHEMA_EVOLUTION = true;
Copy

If schema evolution is disabled, you must create the table with all fields the Kinesis message contains. When you create an Iceberg table, you can use Iceberg data types or compatible Snowflake types. The semi-structured VARIANT type isn’t supported. Instead, use a structured OBJECT or MAP.

Betrachten Sie zum Beispiel die folgende Meldung:

{
    "id": 1,
    "name": "Steve",
    "body_temperature": 36.6,
    "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
    "animals_possessed":
    {
        "dogs": true,
        "cats": false
    },
    "date_added": "2024-10-15"
}
Copy

Die folgende Anweisung erstellt eine Tabelle mit allen Feldern, die die Kinesis-Meldung enthält:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    kinesisMetadata OBJECT(
        stream STRING,
        shardId STRING,
        approximateArrival STRING,
        partitionKey STRING,
        sequenceNumber STRING,
        subSequenceNumber INTEGER,
        shardedSequenceNumber STRING
    ),
    id INT,
    body_temperature FLOAT,
    name STRING,
    approved_coffee_types ARRAY(STRING),
    animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
    date_added DATE
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

Bemerkung

kinesisMetadata must always be created. Field names inside nested structures such as dogs or cats are case sensitive.