Set up Openflow Connector for Kinesis for JSON data format

Bemerkung

This connector is subject to the Snowflake Connector Terms.

This topic describes how to set up Openflow Connector for Kinesis for JSON data format. This is a simplified connector optimized for basic message ingestion with schema evolution capabilities.

Das Openflow Connector for Kinesis für JSON-Datenformat ist für eine einfache JSON-Meldungserfassung von Kinesis-Streams in Snowflake-Tabellen konzipiert.

Voraussetzungen

  1. Review Allgemeine Informationen zu Openflow Connector for Kinesis.

  2. Ensure that you have set up Openflow with BYOC or set up Openflow with Snowflake Deployments.

  3. If you are using Openflow - Snowflake Deployments, ensure that you have reviewed configuring required domains and have granted access to the required domains for the Kinesis connector.

Bemerkung

Wenn Sie die Unterstützung anderer Datenformate oder Features benötigen, wie z. B. DLQ, wenden Sie sich an Ihren Snowflake-Ansprechpartner.

Einen Kinesis-Stream einrichten

Als AWS-Administrator führen Sie die folgenden Aktionen in Ihrem AWS-Konto durch:

  1. Ensure that you have an AWS User with IAM permissions to access Kinesis Streams and DynamoDB.

  2. Ensure that the AWS User has configured Access Key credentials.

Snowflake-Konto einrichten

Als Snowflake-Kontoadministrator führen Sie die folgenden Aufgaben aus:

  1. Erstellen Sie eine neue Rolle oder verwenden Sie eine vorhandene Rolle und erteilen Sie die Berechtigungen von Datenbanken.

  2. Erstellen Sie eine Zieldatenbank und ein Zielschema, die verwendet werden, um Zieltabellen zum Speichern der Daten zu erstellen.

    1. Wenn Sie die Fähigkeit des Konnektors nutzen möchten, automatisch eine Zieltabelle zu erstellen, wenn diese noch nicht existiert, stellen Sie sicher, dass der Benutzer über die erforderlichen Berechtigungen zum Erstellen und Verwalten von Snowflake-Objekten verfügt:

      Objekt

      Berechtigung

      Anmerkungen

      Datenbank

      USAGE

      Schema

      USAGE . CREATE TABLE .

      Nachdem die Objekte auf Schemaebene erstellt wurden, können die Berechtigungen für CREATE object widerrufen werden.

      Tabelle

      OWNERSHIP

      Nur erforderlich, wenn Sie den Kinesis-Konnektor verwenden, um Daten in eine bestehende Tabelle zu importieren. . Wenn der Konnektor eine neue Zieltabelle für Datensätze aus dem Kinesis-Stream erstellt, wird die Standardrolle für den in der Konfiguration angegebenen Benutzer zum Tabelleneigentümer.

      Sie können das folgende Skript verwenden, um eine benutzerdefinierte Rolle zu erstellen und zu konfigurieren (erfordert SECURITYADMIN oder gleichwertig):

      USE ROLE SECURITYADMIN;
      
      CREATE ROLE kinesis_connector_role;
      GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role;
      GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role;
      GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role;
      
      -- Only for existing tables.
      GRANT OWNERSHIP ON TABLE existing_table TO ROLE kinesis_connector_role;
      
      Copy
  3. Erstellen Sie einen neuen Benutzer für den Snowflake-Dienst mit dem Typ SERVICE.

  4. Weisen Sie dem Benutzer des Snowflake-Dienstes die Rolle zu, die Sie in den vorherigen Schritten erstellt haben.

    GRANT ROLE kinesis_connector_role TO USER kinesis_connector_user;
    ALTER USER kinesis_connector_user SET DEFAULT_ROLE = kinesis_connector_role;
    
    Copy
  5. Konfigurieren Sie mit Schlüsselpaar-Authentifizierung für den Snowflake SERVICE-Benutzer aus Schritt 3.

  6. Snowflake empfiehlt diesen Schritt dringend. Konfigurieren Sie einen von Openflow unterstützten Geheimnismanager, z. B. AWS, Azure und Hashicorp, und speichern Sie die öffentlichen und privaten Schlüssel im Geheimnisspeicher.

    Bemerkung

    Wenn Sie aus irgendeinem Grund keinen Geheimnismanager verwenden möchten, sind Sie dafür verantwortlich, die für die Schlüsselpaar-Authentifizierung verwendeten öffentlichen und privaten Schlüsseldateien gemäß den Sicherheitsrichtlinien Ihrer Organisation zu schützen.

    1. After the secrets manager is configured, determine how you will authenticate to it. On AWS, it’s recommended that you use the EC2 instance role associated with Openflow as this way no other secrets have to be persisted.

    2. Konfigurieren Sie in Openflow über das Hamburger-Menü oben rechts einen Parameteranbieter, der mit diesem Secrets Manager verbunden ist. Navigieren Sie zu Controller Settings » Parameter Provider und rufen Sie dann Ihre Parameterwerte ab.

    3. Zu diesem Zeitpunkt können alle Anmeldeinformationen mit den zugehörigen Parameterpfaden referenziert werden, und es müssen keine sensiblen Werte innerhalb von Openflow aufbewahrt werden.

  7. If any other Snowflake users require access to the ingested data and created tables (for example, for custom processing in Snowflake), grant those users the role created in step 2.

Einrichten des Konnektors

Als Data Engineer führen Sie die folgenden Aufgaben aus, um den Konnektor zu installieren und zu konfigurieren:

Konnektor installieren

  1. Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.

  2. Suchen Sie auf der Seite Openflow-Konnektoren den Konnektor und wählen Sie Add to runtime.

  3. In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.

    Bemerkung

    Bevor Sie den Konnektor installieren, stellen Sie sicher, dass Sie in Snowflake eine Datenbank und ein Schema für den Konnektor erstellt haben, in dem die aufgenommenen Daten gespeichert werden.

  4. Authentifizieren Sie sich bei der Bereitstellung mit den Anmeldedaten Ihres Snowflake-Kontos und wählen Sie Allow, wenn Sie dazu aufgefordert werden, damit die Laufzeitanwendung auf Ihr Snowflake-Konto zugreifen kann. Die Installation des Konnektors nimmt einige Minuten in Anspruch.

  5. Authentifizieren Sie sich bei der Laufzeit mit den Anmeldeinformationen Ihres Snowflake-Kontos.

Das Openflow-Canvas wird mit der hinzugefügten Prozessgruppe des Konnektors angezeigt.

Konnektor konfigurieren

  1. Right-click the imported process group and select Parameters.

  2. Populate the required parameter values as described in Parameters.

Parameters

Dieser Abschnitt beschreibt alle Parameter für das Openflow Connector for Kinesis für JSON-Datenformat.

The connector consists of several modules. To see the set, double-click the connector process group. You can set the parameters for each module in the module’s parameter context.

Snowflake destination parameters

Parameter

Beschreibung

Erforderlich

Destination Database

Die Datenbank, in der die Daten als persistent gespeichert werden. Sie muss bereits in Snowflake vorhanden sein. Beim Namen wird zwischen Groß- und Kleinschreibung unterschieden. Bei Bezeichnern ohne Anführungszeichen geben Sie den Namen in Großbuchstaben an.

Ja

Destination Schema

Das Schema, in dem Daten beibehalten werden, muss bereits in Snowflake vorhanden sein. Beim Namen wird zwischen Groß- und Kleinschreibung unterschieden. Bei Bezeichnern ohne Anführungszeichen geben Sie den Namen in Großbuchstaben an.

Sehen Sie sich die folgenden Beispiele an:

  • CREATE SCHEMA SCHEMA_NAME oder CREATE SCHEMA schema_name: verwenden Sie SCHEMA_NAME

  • CREATE SCHEMA "schema_name" oder CREATE SCHEMA "SCHEMA_NAME": verwenden Sie schema_name bzw. SCHEMA_NAME.

Ja

Iceberg Enabled

Ob Iceberg für Tabellenoperationen aktiviert ist. Entweder true oder false.

Ja

Schema Evolution Enabled

Aktiviert oder deaktiviert die Schemaentwicklung auf Konnektorebene. Wenn aktiviert, werden automatische Schemaänderungen für Tabellen ermöglicht. Beachten Sie, dass die Schemaentwicklung auch auf Ebene der einzelnen Tabellen über tabellenspezifische Parameter gesteuert werden kann. Entweder true oder false.

Ja

Schema Evolution For New Tables Enabled

Steuert, ob die Schemaentwicklung beim Erstellen neuer Tabellen aktiviert wird. Wenn auf „true“ gesetzt, werden neue Tabellen mit dem Parameter ENABLE_SCHEMA_EVOLUTION = TRUE erstellt. Wenn auf „false“ gesetzt, werden neue Tabellen mit ENABLE_SCHEMA_EVOLUTION = FALSE erstellt. Nicht anwendbar auf Iceberg-Tabellen, da diese nicht automatisch erstellt werden. Diese Einstellung wirkt sich nur auf die Tabellenerstellung aus, nicht auf bestehende Tabellen. Entweder true oder false.

Ja

Snowflake Account Identifier

Bei Verwendung von:

  • Strategie für die Authentifizierung mit Sitzungstoken: Muss leer sein.

  • KEY_PAIR: Snowflake-Kontoname im Format [Organisationsname]-[Kontoname], wobei die Daten persistent gespeichert werden.

Ja

Snowflake Authentication Strategy

Bei Verwendung von:

  • Snowflake Openflow Deployment or BYOC: Use SNOWFLAKE_MANAGED_TOKEN. This token is managed automatically by Snowflake. BYOC deployments must have previously configured runtime roles to use SNOWFLAKE_MANAGED_TOKEN.

  • BYOC: Alternatively BYOC can use KEY_PAIR as the value for authentication strategy.

Ja

Snowflake Private Key

Bei Verwendung von:

  • Strategie für die Authentifizierung mit Sitzungstoken: Muss leer sein.

  • KEY_PAIR: Muss der RSA private Schlüssel sein, der für die Authentifizierung verwendet wird.

    The RSA key must be formatted according to PKCS8 standards and have standard PEM headers and footers. Note that either a Snowflake Private Key File or a Snowflake Private Key must be defined.

Nein

Snowflake Private Key File

Bei Verwendung von:

  • Strategie für die Authentifizierung mit Sitzungstoken: Die private Schlüsseldatei muss leer sein.

  • KEY_PAIR: Laden Sie die Datei hoch, die den RSA Private Key für die Authentifizierung bei Snowflake enthält, formatiert nach PKCS8-Standards und mit Standard-PEM-Header und -Footer. Die Header-Zeile beginnt mit -----BEGIN PRIVATE. Aktivieren Sie das Kontrollkästchen Reference asset, um die Private Key-Datei hochzuladen.

Nein

Snowflake Private Key Password

Bei Verwendung von

  • Strategie für die Authentifizierung mit Sitzungstoken: Muss leer sein.

  • KEY_PAIR: Geben Sie das Kennwort an, das mit der privaten Snowflake-Schlüsseldatei verbunden ist.

Nein

Snowflake Role

Bei Verwendung von

  • Session Token Authentication Strategy: Use your Snowflake Role. You can find your Snowflake Role in the Openflow UI, by navigating to View Details for your Runtime.

  • KEY_PAIR Authentifizierungsstrategie: Verwenden Sie eine gültige Rolle, die für Ihren Dienstbenutzer konfiguriert ist.

Ja

Snowflake-Benutzername

Bei Verwendung von

  • Strategie für die Authentifizierung mit Sitzungstoken: Muss leer sein.

  • KEY_PAIR: Geben Sie den Benutzernamen an, der für die Verbindung mit der Snowflake-Instanz verwendet wird.

Ja

Kinesis JSON Source Parameters

Parameter

Beschreibung

Erforderlich

AWS Region Code

Die AWS-Region, in der sich Ihr Kinesis-Stream befindet, zum Beispiel us-west-2.

Ja

AWS Access Key ID

The AWS Access Key ID to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch.

Ja

AWS Secret Access Key

The AWS Secret Access Key to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch.

Ja

Kinesis Application Name

Der Name der DynamoDB-Tabelle, der verwendet wird, um den Fortschritt der Anwendung bei der Verarbeitung des Kinesis-Streams nachzuverfolgen.

Ja

Kinesis Consumer Type

Die Strategie zum Lesen von Datensätzen aus einem Kinesis Stream. Muss einer der folgenden Werte sein: SHARED_THROUGHPUT oder ENHANCED_FAN_OUT. Weitere Informationen dazu finden Sie unter Entwickeln von erweiterten Fan-Out-Verbrauchern.

Ja

Kinesis Initial Stream Position

Die anfängliche Stream-Position, von der aus die Replikation der Daten beginnt.

Mögliche Werte sind:

  • LATEST: Letzter gespeicherter Datensatz

  • TRIM_HORIZON: Frühester gespeicherter Datensatz

Ja

Kinesis Stream Name

Name des AWS Kinesis-Streams, von dem Daten empfangen werden sollen.

Ja

Veröffentlichung von Metriken

Gibt an, wo die Metriken der Kinesis Client Library veröffentlicht werden. Mögliche Werte: DISABLED, LOGS, CLOUDWATCH.

Ja

Führen Sie den Ablauf aus

  1. Right-click the plane and select Enable all Controller Services.

  2. Right-click the connector’s process group and select Start.

Der Konnektor startet die Datenaufnahme.

Table schema

The Snowflake table loaded by the connector contains columns named by the keys of your Kinesis messages. The connector also adds a KINESISMETADATA column which stores metadata about the record.

Nachfolgend finden Sie ein Beispiel für eine Snowflake-Tabelle, die vom Konnektor geladen wird:

Zeile

ACCOUNT

SYMBOL

SIDE

QUANTITY

KINESISMETADATA

1

ABC123

ZTEST

BUY

3572

{ … KINESISMETADATA object … }

2

XYZ789

ZABZX

SELL

3024

{ … KINESISMETADATA object … }

3

XYZ789

ZTEST

SELL

799

{ … KINESISMETADATA object … }

4

ABC123

ZABZX

BUY

2033

{ … KINESISMETADATA object … }

Die KINESISMETADATA-Spalte enthält ein Objekt mit den folgenden Feldern:

Feldname

Feldtyp

Example Value

Beschreibung

stream

String

stream-name

Der Name des Kinesis-Streams, aus dem der Datensatz stammt.

shardId

String

shardId-000000000001

Der Bezeichner der Freigabe in dem Stream, aus dem der Datensatz stammt.

approximateArrival

String

2025-11-05T09:12:15.300

Die ungefähre Zeit, zu der der Datensatz in den Stream eingefügt wurde (ISO 8601-Format).

partitionKey

String

key-1234

Der vom Datenersteller für den Datensatz angegebene Partitionsschlüssel.

sequenceNumber

String

123456789

Die eindeutige Sequenznummer, die dem Datensatz in der Freigabe von Kinesis-Datenstreams zugewiesen wurden.

subSequenceNumber

Zahl

2

Die Untersequenznummer des Datensatzes (wird für aggregierte Datensätze mit der gleichen Sequenznummer verwendet).

shardedSequenceNumber

String

12345678900002

Eine Kombination aus der Sequenznummer und der Untersequenznummer für den Datensatz.

Schemaentwicklung

The connector supports automatic schema detection and evolution. The structure of tables in Snowflake is defined and evolved automatically to support the structure of new data loaded by the connector.

Snowflake erkennt das Schema der eingehenden Daten und lädt die Daten in Tabellen, die mit einem benutzerdefinierten Schema übereinstimmen. Snowflake ermöglicht auch das Hinzufügen von neuen Spalten oder das Löschen der NOT NULL-Einschränkung von Spalten, die in den eingehenden Datensätzen fehlen.

Die Schemaerkennung mit dem Konnektor leitet Datentypen basierend auf den bereitgestellten JSON-Daten ab.

Wenn der Konnektor die Zieltabelle erstellt, ist die Schemaentwicklung standardmäßig aktiviert.

If you want to enable or disable schema evolution on an existing table, use the ALTER TABLE command to set the ENABLE_SCHEMA_EVOLUTION parameter. You must also use a role that has the OWNERSHIP privilege on the table. For more information, see Tabellenschemaentwicklung.

However, if schema evolution is disabled for an existing table, then the connector tries to send the rows with mismatched schemas to the configured failure output port.

Iceberg table support

Openflow Connector for Kinesis can ingest data into a Snowflake-managed Apache Iceberg™ table when Iceberg Enabled is set to true.

Anforderungen und Einschränkungen

Before you configure Openflow Connector for Kinesis for Iceberg table ingestion, note the following requirements and limitations:

  • Sie müssen eine Iceberg-Tabelle erstellen, bevor Sie den Konnektor ausführen.

  • Stellen Sie sicher, dass der Benutzer Daten in die erstellten Tabellen eingeben kann.

Konfiguration und Einrichtung

To configure Openflow Connector for Kinesis for Iceberg table ingestion, follow the steps in Set up Openflow Connector for Kinesis for JSON data format with a few differences noted in the following sections.

Aufnahme in die Iceberg-Tabellen aktivieren

Um die Aufnahme in eine Iceberg-Tabelle zu ermöglichen, müssen Sie den Parameter Iceberg Enabled auf true setzen.

Erstellen Sie eine Iceberg-Tabelle für die Datenaufnahme

Bevor Sie den Konnektorr ausführen, müssen Sie eine Iceberg-Tabelle erstellen. Das anfängliche Tabellenschema hängt von Ihrer Konnektoreinstellung für Schema Evolution Enabled ab.

If schema evolution is enabled, you must create a table with a column named kinesisMetadata. The connector automatically creates the columns for message fields and alters the kinesisMetadata column schema.

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    kinesisMetadata OBJECT()
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table'
  ENABLE_SCHEMA_EVOLUTION = true;
Copy

If schema evolution is disabled, you must create the table with all fields the Kinesis message contains. When you create an Iceberg table, you can use Iceberg data types or compatible Snowflake types. The semi-structured VARIANT type isn’t supported. Instead, use a structured OBJECT or MAP.

Betrachten Sie zum Beispiel die folgende Meldung:

{
    "id": 1,
    "name": "Steve",
    "body_temperature": 36.6,
    "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
    "animals_possessed":
    {
        "dogs": true,
        "cats": false
    },
    "date_added": "2024-10-15"
}
Copy

Die folgende Anweisung erstellt eine Tabelle mit allen Feldern, die die Kinesis-Meldung enthält:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    kinesisMetadata OBJECT(
        stream STRING,
        shardId STRING,
        approximateArrival STRING,
        partitionKey STRING,
        sequenceNumber STRING,
        subSequenceNumber INTEGER,
        shardedSequenceNumber STRING
    ),
    id INT,
    body_temperature FLOAT,
    name STRING,
    approved_coffee_types ARRAY(STRING),
    animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
    date_added DATE
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

Bemerkung

kinesisMetadata must always be created. Field names inside nested structures such as dogs or cats are case sensitive.