Einrichten von Openflow Connector for Kinesis

Bemerkung

Der Konnektor unterliegt den Bedingungen für Konnektoren.

Unter diesem Thema werden die Schritte zur Einrichtung von Openflow Connector for Kinesis beschrieben.

Voraussetzungen

  1. Stellen Sie sicher, dass Sie Allgemeine Informationen zu Openflow Connector for Kinesis gelesen haben.

  2. Stellen Sie sicher, dass Sie Openflow eingerichtet haben.

Einen Kinesis-Stream einrichten

Als AWS-Administrator führen Sie die folgenden Aktionen in Ihrem AWS-Konto durch:

  1. Stellen Sie sicher, dass Sie über ein AWS-Konto mit IAM Berechtigungen für den Zugriff auf Kinesis Streams und DynamoDB verfügen.

  2. Erstellen Sie optional einen Dead-Letter-Queue (DLQ)-Kinesis-Stream. Meldungen, die nicht erfolgreich geparst werden können, können an eine angegebene DLQ weitergeleitet werden.

Snowflake-Konto einrichten

Als Snowflake-Kontoadministrator führen Sie die folgenden Aufgaben aus:

  1. Erstellen Sie eine neue Rolle oder verwenden Sie eine vorhandene Rolle und erteilen Sie die Berechtigungen von Datenbanken.

  2. Erstellen Sie eine Zieldatenbank und ein Zielschema, die verwendet werden, um Zieltabellen zum Speichern der Daten zu erstellen.

    1. Wenn Sie die Fähigkeit des Konnektors nutzen möchten, automatisch eine Zieltabelle zu erstellen, wenn diese noch nicht existiert, stellen Sie sicher, dass der Benutzer über die erforderlichen Berechtigungen zum Erstellen und Verwalten von Snowflake-Objekten verfügt:

      Objekt

      Berechtigung

      Anmerkungen

      Datenbank

      USAGE

      Schema

      USAGE . CREATE TABLE .

      Nachdem die Objekte auf Schemaebene erstellt wurden, können die Berechtigungen für CREATE object widerrufen werden.

      Tabelle

      OWNERSHIP

      Nur erforderlich, wenn Sie den Kinesis-Konnektor verwenden, um Daten in eine bestehende Tabelle zu importieren. . Wenn der Konnektor eine neue Zieltabelle für Datensätze aus dem Kinesis-Stream erstellt, wird die Standardrolle für den in der Konfiguration angegebenen Benutzer zum Tabelleneigentümer.

      Sie können das folgende Skript verwenden, um eine benutzerdefinierte Rolle zu erstellen und zu konfigurieren (erfordert SECURITYADMIN oder gleichwertig):

      USE ROLE SECURITYADMIN;
      
      CREATE ROLE kinesis_connector_role_1;
      GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role_1;
      GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role_1;
      GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role_1;
      
      -- Only for existing tables
      GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE kinesis_connector_role_1;
      
      Copy
  3. Erstellen Sie einen neuen Benutzer für den Snowflake-Dienst mit dem Typ SERVICE.

  4. Weisen Sie dem Benutzer des Snowflake-Dienstes die Rolle zu, die Sie in den vorherigen Schritten erstellt haben.

    GRANT ROLE kinesis_connector_role_1 TO USER kinesis_connector_user_1;
    ALTER USER kinesis_connector_user_1 SET DEFAULT_ROLE = kinesis_connector_role_1;
    
    Copy
  5. Konfigurieren Sie mit Schlüsselpaar-Authentifizierung für den Snowflake SERVICE-Benutzer aus Schritt 3.

  6. Snowflake empfiehlt diesen Schritt dringend. Konfigurieren Sie einen von Openflow unterstützten Geheimnismanager, z. B. AWS, Azure und Hashicorp, und speichern Sie die öffentlichen und privaten Schlüssel im Geheimnisspeicher.

    Bemerkung

    Wenn Sie aus irgendeinem Grund keinen Geheimnismanager verwenden möchten, sind Sie dafür verantwortlich, die für die Schlüsselpaar-Authentifizierung verwendeten öffentlichen und privaten Schlüsseldateien gemäß den Sicherheitsrichtlinien Ihrer Organisation zu schützen.

    1. Nachem der Geheimnismanager konfiguriert ist, legen Sie fest, wie Sie sich bei ihm authentifizieren möchten. Auf AWS wird empfohlen, die mit Openflow verknüpfte EC2-Instanzrolle zu verwenden, da auf diese Weise keine weiteren Geheimnisse gespeichert werden müssen.

    2. Konfigurieren Sie in Openflow einen mit diesem Geheimnismanager verknüpften Parameter Provider über das Hamburger-Menü oben rechts. Navigieren Sie zu Controller Settings » Parameter Provider, und rufen Sie Ihre Parameterwerte ab.

    3. Zu diesem Zeitpunkt können alle Anmeldeinformationen mit den zugehörigen Parameterpfaden referenziert werden, und es müssen keine sensiblen Werte innerhalb von Openflow aufbewahrt werden.

  7. Wenn andere Snowflake-Benutzer Zugriff auf die vom Konnektor aufgenommenen Rohdokumente und -tabellen benötigen (z. B. für die benutzerdefinierte Verarbeitung in Snowflake), weisen Sie diesen Benutzern die in Schritt 1 erstellte Rolle zu.

  8. Bestimmen Sie ein Warehouse, das der Konnektor verwenden soll. Beginnen Sie mit der kleinsten Warehouse-Größe und experimentieren Sie dann mit der Größe in Abhängigkeit von der Anzahl der zu replizierenden Tabellen und der Menge der übertragenen Daten. Große Tabellenzahlen lassen sich in der Regel besser mit Multi-Cluster-Warehouses skalieren als mit größeren Warehouse-Größen.

Einrichten des Konnektors

Als Data Engineer führen Sie die folgenden Aufgaben aus, um den Konnektor zu installieren und zu konfigurieren:

Konnektor installieren

  1. Navigieren Sie zur Openflow-Übersichtsseite. Wählen Sie im Abschnitt Featured connectors die Option View more connectors aus.

  2. Suchen Sie auf der Seite Openflow-Konnektoren den Konnektor und wählen Sie Add to runtime.

  3. Wählen Sie im Dialog Select runtime Ihre Laufzeit aus der Dropdown-Liste Available runtimes aus.

  4. Wählen Sie Add aus.

    Bemerkung

    Bevor Sie den Konnektor installieren, stellen Sie sicher, dass Sie in Snowflake eine Datenbank und ein Schema für den Konnektor erstellt haben, in dem die aufgenommenen Daten gespeichert werden.

  5. Authentifizieren Sie sich bei der Bereitstellung mit den Anmeldedaten Ihres Snowflake-Kontos und wählen Sie Allow, wenn Sie dazu aufgefordert werden, damit die Laufzeitanwendung auf Ihr Snowflake-Konto zugreifen kann. Die Installation des Konnektors nimmt einige Minuten in Anspruch.

  6. Authentifizieren Sie sich bei der Laufzeit mit den Anmeldeinformationen Ihres Snowflake-Kontos.

Das Openflow-Canvas wird mit der hinzugefügten Prozessgruppe des Konnektors angezeigt.

Konnektor konfigurieren

  1. Klicken Sie mit der rechten Maustaste auf die importierte Prozessgruppe und wählen Sie Parameters.

  2. Geben Sie die erforderlichen Parameterwerte ein, wie unter Ablaufparameter beschrieben.

Ablaufparameter

In diesem Abschnitt werden die Ablaufparameter beschrieben, die Sie anhand der folgenden Parameterkontexte konfigurieren können:

Quellsystemparameter für Kinesis

Parameter

Beschreibung

AWS Region Code

Die AWS-Region, in der sich Ihr Kinesis-Stream befindet, zum Beispiel us-west-2.

AWS Access Key ID

Die AWS-Zugangsschlüssel-ID zur Verbindung mit Ihrem Kinesis Stream und DynamoDB.

AWS Secret Access Key

Der geheime AWS-Zugrifssschlüssel für die Verbindung zu Ihrem Kinesis-Stream und DynamoDB.

Schema Registry URL

Die URL der AVRO-Schema Registry Dies ist erforderlich, wenn der Parameter „AVRO Schema Access Strategy“ auf schema-reference-reader gesetzt ist.

Schema Registry Authentication Type

Der Authentifikationstyp, der von der AVRO-Schema Registryverwendet wird. Dies ist erforderlich, wenn der Parameter „AVRO Schema Access Strategy“ auf schema-reference-reader gesetzt ist.

Mögliche Werte sind:
  • NONE: Keine Authentifizierung verwendet

  • BASIC: Methode der Authentifizierung mit Benutzername/Kennwort verwendet

Schema Registry Username

Der Benutzername, der für die BASIC-Authentifizierung bei der AVRO-Schema Registry verwendet wird. Dies ist erforderlich, wenn der Parameter „Schema Registry Authentication Type“ auf BASIC gesetzt ist.

Schema Registry Password

Das Kennwort, das für die BASIC-Authentifizierung bei der AVRO-Schema Registry verwendet wird. Dies ist erforderlich, wenn der Parameter „Schema Registry Authentication Type“ auf BASIC gesetzt ist.

Zielsystemparameter für Kinesis

Parameter

Beschreibung

Destination Database

Die Datenbank, in der die Daten persistiert werden. Sie muss bereits in Snowflake vorhanden sein.

Destination Schema

Das Schema, in dem die Daten persistiert werden. Sie muss bereits in Snowflake vorhanden sein. Bei diesem Parameter wird zwischen Groß- und Kleinschreibung unterschieden.

Snowflake Account Identifier

Snowflake-Kontoname im Format [organisation-name]-[account-name], in dem die Daten gespeichert werden sollen.

Snowflake Authentication Strategy

Strategie zur Authentifizierung bei Snowflake. Mögliche Werte: SNOWFLAKE_SESSION_TOKEN, wenn Sie den Ablauf auf SPCS ausführen, und KEY_PAIR, wenn Sie den Zugriff mit einem privaten Schlüssel einrichten möchten.

Snowflake Private Key

Der private RSA Schlüssel, der für die Authentifizierung verwendet wird. Der RSA-Schlüssel muss nach den PKCS8-Standards formatiert sein und den Standard-PEM-Header und -Footer enthalten. Beachten Sie, dass entweder Snowflake Private Key File oder Snowflake Private Key definiert sein muss.

Snowflake Private Key File

Die Datei, die den privaten RSA-Schlüssel enthält, der für die Authentifizierung bei Snowflake verwendet wird. Sie ist nach den PKCS8-Standards formatiert und hat die Standard-PEM-Header und -Footer. Die Header beginnt mit -----BEGIN PRIVATE. Aktivieren Sie das Kontrollkästchen Reference asset, um die private Schlüsseldatei hochzuladen.

Snowflake Private Key Password

Das Kennwort, das mit der Snowflake-Privatschlüsseldatei verknüpft ist.

Snowflake Role

Snowflake-Rolle, die bei der Ausführung der Abfrage verwendet wird.

Snowflake-Benutzername

Benutzername,der für die Verbindung zur Snowflake-Instanz verwendet wird.

Snowflake Warehouse

Snowflake Warehouse, das zur Ausführung von Abfragen verwendet wird. Bei diesem Parameter wird zwischen Groß- und Kleinschreibung unterschieden.

Aufnahmeparameter für Kinesis

Parameter

Beschreibung

Kinesis Application Name

Der Name der DynamoDB-Tabelle, der verwendet wird, um den Fortschritt der Anwendung bei der Verarbeitung des Kinesis-Streams nachzuverfolgen.

Kinesis Stream Name

Name des AWS Kinesis-Streams, von dem Daten empfangen werden sollen.

Kinesis Initial Stream Position

Die anfängliche Stream-Position, von der aus die Replikation der Daten beginnt.

Mögliche Werte sind:
  • LATEST: Letzter gespeicherter Datensatz

  • TRIM_HORIZON: Frühester gespeicherter Datensatz

Kinesis DLQ Stream Name

Der Stream-Name, an den alle Datensätze, deren Verarbeitung fehlgeschlagen ist, gesendet werden. Wenn dieser Parameter nicht hinzugefügt wird, können Sie ein Warnzeichen im DLQ-bezogenen Teil des Konnektors im Openflow-Canvas erwarten.

Message Format

Das Format der Meldungen in Kinesis.

Mögliche Werte sind:
  • JSON: JSON ist ein von Menschen lesbares Meldungsformat, dessen Schema aus der Meldung selbst abgeleitet werden kann.

  • AVRO AVRO ist ein Meldungsformat, das ein Schema für den Zugriff auf die Daten in der Meldung benötigt.

AVRO Schema Access Strategy

Für den Zugriff auf Daten im AVRO-Meldungsformat ist das Schema erforderlich. Dieser Parameter definiert die Strategie für den Zugriff auf das AVRO-Schema einer bestimmten Meldung. Wenn der Parameter „Message Format“ auf AVRO gesetzt ist, wird das Schema verwendet.

Mögliche Werte:
  • embedded-avro-schema: Das Schema ist in den Datensatz selbst eingebettet

  • schema-reference-reader: Das Schema wird in Confluent Schema Registry gespeichert.

Kinesis Stream To Table Map

Mit diesem optionalen Parameter kann ein Benutzer angeben, welche Streams welchen Tabellen zugeordnet werden sollen. Jeder Stream und sein Tabellenname sollten durch einen Doppelpunkt getrennt werden. Dieser Tabellenname muss ein gültiger Snowflake-Bezeichner ohne Anführungszeichen sein. Die regulären Ausdrücke dürfen nicht mehrdeutig sein, und jeder übereinstimmende Stream darf nur mit einer einzigen Zieltabelle übereinstimmen. Wenn keine oder leere Treffer gefunden werden, wird der Stream-Name als Tabellenname verwendet.

Beispiele:
  • stream1:low_range,stream2:low_range,stream5:high_range,stream6:high_range

  • stream[0-4]:low_range,stream[5-9]:high_range

Iceberg Enabled

Gibt an, ob der Prozessor Daten in eine Iceberg-Tabelle aufnimmt. Der Prozessor schlägt fehl, wenn diese Eigenschaft nicht mit dem tatsächlichen Tabellentyp übereinstimmt.

Mögliche Werte:
  • true

  • false

Führen Sie den Ablauf aus

  1. Klicken Sie mit der rechten Maustaste auf die Ebene, und wählen Sie Enable all Controller Services.

  2. Klicken Sie mit der rechten Maustaste auf die importierte Prozessgruppe und wählen Sie Start.

Der Konnektor startet die Datenaufnahme.

Schema

Die vom Konnektor geladene Snowflake-Tabelle enthält Spalten, die nach den Schlüsseln Ihrer Kinesis-Meldungen benannt sind. Nachfolgend finden Sie ein Beispiel für eine solche Tabelle.

Zeile

ACCOUNT

SYMBOL

SIDE

QUANTITY

1

ABC123

ZTEST

BUY

3572

2

XYZ789

ZABZX

SELL

3024

3

XYZ789

ZTEST

SELL

799

4

ABC123

ZABZX

BUY

2033

5

ABC123

ZTEST

BUY

1558

Schemaentwicklung

Derzeit ist Iceberg Enabled auf false eingestellt. Wenn der Konnektor die Zieltabelle erstellt, ist die Schemaentwicklung standardmäßig aktiviert. Wenn Sie die Schemaentwicklung für eine bestehende Tabelle aktivieren oder deaktivieren möchten, verwenden Sie den Befehl ALTER TABLE, um den Parameter ENABLE_SCHEMA_EVOLUTION festzulegen. Sie müssen außerdem eine Rolle verwenden, die über die OWNERSHIP-Berechtigung für die Tabelle verfügt. Weitere Informationen dazu finden Sie unter Tabellenschemaentwicklung.

Wenn jedoch die Schemaentwicklung für eine vorhandene Tabelle deaktiviert ist, versucht der Konnektor, die Zeilen mit nicht übereinstimmenden Schemata an die konfigurierten Dead-Letter-Queues (DLQ) zu senden.

Für den Fall, dass Iceberg Enabled auf true eingestellt ist, siehe Abschnitt Schemaentwicklung für Apache Iceberg™ Tabellen.

Verwendung von Openflow Connector for Kinesis mit Apache Iceberg™ Tabellen

Openflow Connector for Kinesis kann Daten in eine von Snowflake verwaltete Apache Iceberg™ Tabelle aufnehmen.

Anforderungen und Einschränkungen

Bevor Sie den Konnektor für die Aufnahme von Iceberg-Tabellen konfigurieren, beachten Sie die folgenden Anforderungen und Beschränkungen:

  • Sie müssen eine Iceberg-Tabelle erstellen, bevor Sie den Konnektor ausführen.

  • Stellen Sie sicher, dass der Benutzer Daten in die erstellten Tabellen eingeben kann.

  • Die Schemaentwicklung wird für Iceberg-Tabellen nicht unterstützt.

Konfiguration und Einrichtung

Um den Konnektor für die Aufnahme von Iceberg-Tabellen zu konfigurieren, folgen Sie den Anweisungen unter Den Konnektor konfigurieren mit einigen Unterschieden, die in den folgenden Abschnitten beschrieben werden.

Aufnahme in die Iceberg-Tabellen aktivieren

Um die Aufnahme in eine Iceberg-Tabelle zu ermöglichen, müssen Sie den Parameter Iceberg Enabled auf true setzen.

Erstellen Sie eine Iceberg-Tabelle für die Datenaufnahme

Bevor Sie den Konnektorr ausführen, müssen Sie eine Iceberg-Tabelle erstellen. Da die Schemaentwicklung nicht unterstützt wird, müssen Sie die Tabelle mit allen Feldern erstellen, die die Kinesis-Meldung enthält.

Wenn Sie eine Iceberg-Tabelle erstellen, können Sie Iceberg-Datentypen oder kompatible Snowflake-Typen verwenden. Der halbstrukturierte VARIANT-Typ wird nicht unterstützt. Verwenden Sie stattdessen eine strukturierte OBJECT oder MAP.

Betrachten Sie zum Beispiel die folgende Meldung:

{
    "id": 1,
    "name": "Steve",
    "body_temperature": 36.6,
    "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
    "animals_possessed":
    {
        "dogs": true,
        "cats": false
    },
    "date_added": "2024-10-15"
}
Copy

Um eine Iceberg-Tabelle für die Beispielmeldung zu erstellen, verwenden Sie die folgende Anweisung:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    id INT,
    body_temperature FLOAT,
    name STRING,
    approved_coffee_types ARRAY(STRING),
    animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
    date_added DATE
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

Bemerkung

Bei Feldnamen innerhalb verschachtelter Strukturen wie dogs oder cats wird zwischen Groß- und Kleinschreibung unterschieden.

Schemaentwicklung für Apache Iceberg™ Tabellen

Derzeit unterstützt der Konnektor die Schemaentwicklung für Apache Iceberg™-Tabellen nicht.

Bekannte Probleme

  • Die Prozessgruppe des Konnektors hat einen Port mit dem Namen „Upload Failure“. Er kann verwendet werden, um FlowFiles zu bearbeiten, die nicht erfolgreich in Snowflake hochgeladen wurden. Wenn dieser Port nicht außerhalb der Prozessgruppe des Konnektors verbunden ist, wird ein Warnzeichen angezeigt, das ignoriert werden kann.

  • Alle Prozessoren können nach dem Anhalten angewiesen werden, einmal ausgeführt zu werden. Der ConsumeKinesisStream-Prozessor führt aufgrund seiner internen Architektur keine sinnvolle Arbeit aus, wenn er einmal ausgeführt werden soll. Damit der Prozessor seine Arbeit aufnehmen kann, muss er gestartet werden und etwa zwei Minuten lang laufen.