Einrichten von Openflow Connector for Kinesis¶
Bemerkung
Der Konnektor unterliegt den Bedingungen für Konnektoren.
Unter diesem Thema werden die Schritte zur Einrichtung von Openflow Connector for Kinesis beschrieben.
Voraussetzungen¶
Stellen Sie sicher, dass Sie Allgemeine Informationen zu Openflow Connector for Kinesis gelesen haben.
Stellen Sie sicher, dass Sie Openflow eingerichtet haben.
Einen Kinesis-Stream einrichten¶
Als AWS-Administrator führen Sie die folgenden Aktionen in Ihrem AWS-Konto durch:
Stellen Sie sicher, dass Sie über ein AWS-Konto mit IAM Berechtigungen für den Zugriff auf Kinesis Streams und DynamoDB verfügen.
Erstellen Sie optional einen Dead-Letter-Queue (DLQ)-Kinesis-Stream. Meldungen, die nicht erfolgreich geparst werden können, können an eine angegebene DLQ weitergeleitet werden.
Snowflake-Konto einrichten¶
Als Snowflake-Kontoadministrator führen Sie die folgenden Aufgaben aus:
Erstellen Sie eine neue Rolle oder verwenden Sie eine vorhandene Rolle und erteilen Sie die Berechtigungen von Datenbanken.
Erstellen Sie eine Zieldatenbank und ein Zielschema, die verwendet werden, um Zieltabellen zum Speichern der Daten zu erstellen.
Wenn Sie die Fähigkeit des Konnektors nutzen möchten, automatisch eine Zieltabelle zu erstellen, wenn diese noch nicht existiert, stellen Sie sicher, dass der Benutzer über die erforderlichen Berechtigungen zum Erstellen und Verwalten von Snowflake-Objekten verfügt:
Objekt
Berechtigung
Anmerkungen
Datenbank
USAGE
Schema
USAGE . CREATE TABLE .
Nachdem die Objekte auf Schemaebene erstellt wurden, können die Berechtigungen für CREATE
object
widerrufen werden.Tabelle
OWNERSHIP
Nur erforderlich, wenn Sie den Kinesis-Konnektor verwenden, um Daten in eine bestehende Tabelle zu importieren. . Wenn der Konnektor eine neue Zieltabelle für Datensätze aus dem Kinesis-Stream erstellt, wird die Standardrolle für den in der Konfiguration angegebenen Benutzer zum Tabelleneigentümer.
Sie können das folgende Skript verwenden, um eine benutzerdefinierte Rolle zu erstellen und zu konfigurieren (erfordert SECURITYADMIN oder gleichwertig):
USE ROLE SECURITYADMIN; CREATE ROLE kinesis_connector_role_1; GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role_1; GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role_1; GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role_1; -- Only for existing tables GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE kinesis_connector_role_1;
Erstellen Sie einen neuen Benutzer für den Snowflake-Dienst mit dem Typ SERVICE.
Weisen Sie dem Benutzer des Snowflake-Dienstes die Rolle zu, die Sie in den vorherigen Schritten erstellt haben.
GRANT ROLE kinesis_connector_role_1 TO USER kinesis_connector_user_1; ALTER USER kinesis_connector_user_1 SET DEFAULT_ROLE = kinesis_connector_role_1;
Konfigurieren Sie mit Schlüsselpaar-Authentifizierung für den Snowflake SERVICE-Benutzer aus Schritt 3.
Snowflake empfiehlt diesen Schritt dringend. Konfigurieren Sie einen von Openflow unterstützten Geheimnismanager, z. B. AWS, Azure und Hashicorp, und speichern Sie die öffentlichen und privaten Schlüssel im Geheimnisspeicher.
Bemerkung
Wenn Sie aus irgendeinem Grund keinen Geheimnismanager verwenden möchten, sind Sie dafür verantwortlich, die für die Schlüsselpaar-Authentifizierung verwendeten öffentlichen und privaten Schlüsseldateien gemäß den Sicherheitsrichtlinien Ihrer Organisation zu schützen.
Nachem der Geheimnismanager konfiguriert ist, legen Sie fest, wie Sie sich bei ihm authentifizieren möchten. Auf AWS wird empfohlen, die mit Openflow verknüpfte EC2-Instanzrolle zu verwenden, da auf diese Weise keine weiteren Geheimnisse gespeichert werden müssen.
Konfigurieren Sie in Openflow einen mit diesem Geheimnismanager verknüpften Parameter Provider über das Hamburger-Menü oben rechts. Navigieren Sie zu Controller Settings » Parameter Provider, und rufen Sie Ihre Parameterwerte ab.
Zu diesem Zeitpunkt können alle Anmeldeinformationen mit den zugehörigen Parameterpfaden referenziert werden, und es müssen keine sensiblen Werte innerhalb von Openflow aufbewahrt werden.
Wenn andere Snowflake-Benutzer Zugriff auf die vom Konnektor aufgenommenen Rohdokumente und -tabellen benötigen (z. B. für die benutzerdefinierte Verarbeitung in Snowflake), weisen Sie diesen Benutzern die in Schritt 1 erstellte Rolle zu.
Bestimmen Sie ein Warehouse, das der Konnektor verwenden soll. Beginnen Sie mit der kleinsten Warehouse-Größe und experimentieren Sie dann mit der Größe in Abhängigkeit von der Anzahl der zu replizierenden Tabellen und der Menge der übertragenen Daten. Große Tabellenzahlen lassen sich in der Regel besser mit Multi-Cluster-Warehouses skalieren als mit größeren Warehouse-Größen.
Einrichten des Konnektors¶
Als Data Engineer führen Sie die folgenden Aufgaben aus, um den Konnektor zu installieren und zu konfigurieren:
Konnektor installieren¶
Navigieren Sie zur Openflow-Übersichtsseite. Wählen Sie im Abschnitt Featured connectors die Option View more connectors aus.
Suchen Sie auf der Seite Openflow-Konnektoren den Konnektor und wählen Sie Add to runtime.
Wählen Sie im Dialog Select runtime Ihre Laufzeit aus der Dropdown-Liste Available runtimes aus.
Wählen Sie Add aus.
Bemerkung
Bevor Sie den Konnektor installieren, stellen Sie sicher, dass Sie in Snowflake eine Datenbank und ein Schema für den Konnektor erstellt haben, in dem die aufgenommenen Daten gespeichert werden.
Authentifizieren Sie sich bei der Bereitstellung mit den Anmeldedaten Ihres Snowflake-Kontos und wählen Sie Allow, wenn Sie dazu aufgefordert werden, damit die Laufzeitanwendung auf Ihr Snowflake-Konto zugreifen kann. Die Installation des Konnektors nimmt einige Minuten in Anspruch.
Authentifizieren Sie sich bei der Laufzeit mit den Anmeldeinformationen Ihres Snowflake-Kontos.
Das Openflow-Canvas wird mit der hinzugefügten Prozessgruppe des Konnektors angezeigt.
Konnektor konfigurieren¶
Klicken Sie mit der rechten Maustaste auf die importierte Prozessgruppe und wählen Sie Parameters.
Geben Sie die erforderlichen Parameterwerte ein, wie unter Ablaufparameter beschrieben.
Ablaufparameter¶
In diesem Abschnitt werden die Ablaufparameter beschrieben, die Sie anhand der folgenden Parameterkontexte konfigurieren können:
Quellsystemparameter für Kinesis: werden verwendet, um die Verbindung mit Kinesis herzustellen.
Zielsystemparameter für Kinesis: werden verwendet, um die Verbindung mit Snowflake herzustellen.
Aufnahmeparameter für Kinesis: werden verwendet, um die Konfiguration der von Kinesis heruntergeladenen Daten zu definieren.
Quellsystemparameter für Kinesis¶
Parameter |
Beschreibung |
---|---|
AWS Region Code |
Die AWS-Region, in der sich Ihr Kinesis-Stream befindet, zum Beispiel |
AWS Access Key ID |
Die AWS-Zugangsschlüssel-ID zur Verbindung mit Ihrem Kinesis Stream und DynamoDB. |
AWS Secret Access Key |
Der geheime AWS-Zugrifssschlüssel für die Verbindung zu Ihrem Kinesis-Stream und DynamoDB. |
Schema Registry URL |
Die URL der AVRO-Schema Registry Dies ist erforderlich, wenn der Parameter „AVRO Schema Access Strategy“ auf |
Schema Registry Authentication Type |
Der Authentifikationstyp, der von der AVRO-Schema Registryverwendet wird. Dies ist erforderlich, wenn der Parameter „AVRO Schema Access Strategy“ auf
|
Schema Registry Username |
Der Benutzername, der für die |
Schema Registry Password |
Das Kennwort, das für die |
Zielsystemparameter für Kinesis¶
Parameter |
Beschreibung |
---|---|
Destination Database |
Die Datenbank, in der die Daten persistiert werden. Sie muss bereits in Snowflake vorhanden sein. |
Destination Schema |
Das Schema, in dem die Daten persistiert werden. Sie muss bereits in Snowflake vorhanden sein. Bei diesem Parameter wird zwischen Groß- und Kleinschreibung unterschieden. |
Snowflake Account Identifier |
Snowflake-Kontoname im Format [organisation-name]-[account-name], in dem die Daten gespeichert werden sollen. |
Snowflake Authentication Strategy |
Strategie zur Authentifizierung bei Snowflake. Mögliche Werte: |
Snowflake Private Key |
Der private RSA Schlüssel, der für die Authentifizierung verwendet wird. Der RSA-Schlüssel muss nach den PKCS8-Standards formatiert sein und den Standard-PEM-Header und -Footer enthalten. Beachten Sie, dass entweder Snowflake Private Key File oder Snowflake Private Key definiert sein muss. |
Snowflake Private Key File |
Die Datei, die den privaten RSA-Schlüssel enthält, der für die Authentifizierung bei Snowflake verwendet wird. Sie ist nach den PKCS8-Standards formatiert und hat die Standard-PEM-Header und -Footer. Die Header beginnt mit |
Snowflake Private Key Password |
Das Kennwort, das mit der Snowflake-Privatschlüsseldatei verknüpft ist. |
Snowflake Role |
Snowflake-Rolle, die bei der Ausführung der Abfrage verwendet wird. |
Snowflake-Benutzername |
Benutzername,der für die Verbindung zur Snowflake-Instanz verwendet wird. |
Snowflake Warehouse |
Snowflake Warehouse, das zur Ausführung von Abfragen verwendet wird. Bei diesem Parameter wird zwischen Groß- und Kleinschreibung unterschieden. |
Aufnahmeparameter für Kinesis¶
Parameter |
Beschreibung |
---|---|
Kinesis Application Name |
Der Name der DynamoDB-Tabelle, der verwendet wird, um den Fortschritt der Anwendung bei der Verarbeitung des Kinesis-Streams nachzuverfolgen. |
Kinesis Stream Name |
Name des AWS Kinesis-Streams, von dem Daten empfangen werden sollen. |
Kinesis Initial Stream Position |
Die anfängliche Stream-Position, von der aus die Replikation der Daten beginnt.
|
Kinesis DLQ Stream Name |
Der Stream-Name, an den alle Datensätze, deren Verarbeitung fehlgeschlagen ist, gesendet werden. Wenn dieser Parameter nicht hinzugefügt wird, können Sie ein Warnzeichen im DLQ-bezogenen Teil des Konnektors im Openflow-Canvas erwarten. |
Message Format |
Das Format der Meldungen in Kinesis.
|
AVRO Schema Access Strategy |
Für den Zugriff auf Daten im AVRO-Meldungsformat ist das Schema erforderlich. Dieser Parameter definiert die Strategie für den Zugriff auf das AVRO-Schema einer bestimmten Meldung. Wenn der Parameter „Message Format“ auf
|
Kinesis Stream To Table Map |
Mit diesem optionalen Parameter kann ein Benutzer angeben, welche Streams welchen Tabellen zugeordnet werden sollen. Jeder Stream und sein Tabellenname sollten durch einen Doppelpunkt getrennt werden. Dieser Tabellenname muss ein gültiger Snowflake-Bezeichner ohne Anführungszeichen sein. Die regulären Ausdrücke dürfen nicht mehrdeutig sein, und jeder übereinstimmende Stream darf nur mit einer einzigen Zieltabelle übereinstimmen. Wenn keine oder leere Treffer gefunden werden, wird der Stream-Name als Tabellenname verwendet.
|
Iceberg Enabled |
Gibt an, ob der Prozessor Daten in eine Iceberg-Tabelle aufnimmt. Der Prozessor schlägt fehl, wenn diese Eigenschaft nicht mit dem tatsächlichen Tabellentyp übereinstimmt.
|
Führen Sie den Ablauf aus¶
Klicken Sie mit der rechten Maustaste auf die Ebene, und wählen Sie Enable all Controller Services.
Klicken Sie mit der rechten Maustaste auf die importierte Prozessgruppe und wählen Sie Start.
Der Konnektor startet die Datenaufnahme.
Schema¶
Die vom Konnektor geladene Snowflake-Tabelle enthält Spalten, die nach den Schlüsseln Ihrer Kinesis-Meldungen benannt sind. Nachfolgend finden Sie ein Beispiel für eine solche Tabelle.
Zeile |
ACCOUNT |
SYMBOL |
SIDE |
QUANTITY |
---|---|---|---|---|
1 |
ABC123 |
ZTEST |
BUY |
3572 |
2 |
XYZ789 |
ZABZX |
SELL |
3024 |
3 |
XYZ789 |
ZTEST |
SELL |
799 |
4 |
ABC123 |
ZABZX |
BUY |
2033 |
5 |
ABC123 |
ZTEST |
BUY |
1558 |
Schemaentwicklung¶
Derzeit ist Iceberg Enabled
auf false
eingestellt. Wenn der Konnektor die Zieltabelle erstellt, ist die Schemaentwicklung standardmäßig aktiviert. Wenn Sie die Schemaentwicklung für eine bestehende Tabelle aktivieren oder deaktivieren möchten, verwenden Sie den Befehl ALTER TABLE, um den Parameter ENABLE_SCHEMA_EVOLUTION
festzulegen. Sie müssen außerdem eine Rolle verwenden, die über die OWNERSHIP
-Berechtigung für die Tabelle verfügt. Weitere Informationen dazu finden Sie unter Tabellenschemaentwicklung.
Wenn jedoch die Schemaentwicklung für eine vorhandene Tabelle deaktiviert ist, versucht der Konnektor, die Zeilen mit nicht übereinstimmenden Schemata an die konfigurierten Dead-Letter-Queues (DLQ) zu senden.
Für den Fall, dass Iceberg Enabled
auf true
eingestellt ist, siehe Abschnitt Schemaentwicklung für Apache Iceberg™ Tabellen.
Verwendung von Openflow Connector for Kinesis mit Apache Iceberg™ Tabellen¶
Openflow Connector for Kinesis kann Daten in eine von Snowflake verwaltete Apache Iceberg™ Tabelle aufnehmen.
Anforderungen und Einschränkungen¶
Bevor Sie den Konnektor für die Aufnahme von Iceberg-Tabellen konfigurieren, beachten Sie die folgenden Anforderungen und Beschränkungen:
Sie müssen eine Iceberg-Tabelle erstellen, bevor Sie den Konnektor ausführen.
Stellen Sie sicher, dass der Benutzer Daten in die erstellten Tabellen eingeben kann.
Die Schemaentwicklung wird für Iceberg-Tabellen nicht unterstützt.
Konfiguration und Einrichtung¶
Um den Konnektor für die Aufnahme von Iceberg-Tabellen zu konfigurieren, folgen Sie den Anweisungen unter Den Konnektor konfigurieren mit einigen Unterschieden, die in den folgenden Abschnitten beschrieben werden.
Aufnahme in die Iceberg-Tabellen aktivieren¶
Um die Aufnahme in eine Iceberg-Tabelle zu ermöglichen, müssen Sie den Parameter Iceberg Enabled
auf true
setzen.
Erstellen Sie eine Iceberg-Tabelle für die Datenaufnahme¶
Bevor Sie den Konnektorr ausführen, müssen Sie eine Iceberg-Tabelle erstellen. Da die Schemaentwicklung nicht unterstützt wird, müssen Sie die Tabelle mit allen Feldern erstellen, die die Kinesis-Meldung enthält.
Wenn Sie eine Iceberg-Tabelle erstellen, können Sie Iceberg-Datentypen oder kompatible Snowflake-Typen verwenden. Der halbstrukturierte VARIANT-Typ wird nicht unterstützt. Verwenden Sie stattdessen eine strukturierte OBJECT oder MAP.
Betrachten Sie zum Beispiel die folgende Meldung:
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"date_added": "2024-10-15"
}
Um eine Iceberg-Tabelle für die Beispielmeldung zu erstellen, verwenden Sie die folgende Anweisung:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table ( id INT, body_temperature FLOAT, name STRING, approved_coffee_types ARRAY(STRING), animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN), date_added DATE ) EXTERNAL_VOLUME = 'my_volume' CATALOG = 'SNOWFLAKE' BASE_LOCATION = 'my_location/my_iceberg_table';
Bemerkung
Bei Feldnamen innerhalb verschachtelter Strukturen wie dogs
oder cats
wird zwischen Groß- und Kleinschreibung unterschieden.
Schemaentwicklung für Apache Iceberg™ Tabellen¶
Derzeit unterstützt der Konnektor die Schemaentwicklung für Apache Iceberg™-Tabellen nicht.
Bekannte Probleme¶
Die Prozessgruppe des Konnektors hat einen Port mit dem Namen „Upload Failure“. Er kann verwendet werden, um FlowFiles zu bearbeiten, die nicht erfolgreich in Snowflake hochgeladen wurden. Wenn dieser Port nicht außerhalb der Prozessgruppe des Konnektors verbunden ist, wird ein Warnzeichen angezeigt, das ignoriert werden kann.
Alle Prozessoren können nach dem Anhalten angewiesen werden, einmal ausgeführt zu werden. Der ConsumeKinesisStream-Prozessor führt aufgrund seiner internen Architektur keine sinnvolle Arbeit aus, wenn er einmal ausgeführt werden soll. Damit der Prozessor seine Arbeit aufnehmen kann, muss er gestartet werden und etwa zwei Minuten lang laufen.