Set up Openflow Connector for Kinesis for JSON data format¶
注釈
This connector is subject to the Snowflake Connector Terms.
このトピックでは、Openflow Connector for Kinesis の JSON データ形式の設定手順について説明します。これは、スキーマ進化機能を備えた基本的なメッセージの取り込みに最適化された簡素化されたコネクタです。
Openflow Connector for Kinesis の JSON データ形式は、KinesisストリームからSnowflakeテーブルへの JSON メッセージの簡単な取り込みを目的として設計されています。
前提条件¶
Openflow Connector for Kinesis について を確認してください。
Ensure that you have Openflowの設定 - BYOC or Set up Openflow - Snowflake Deployments.
Openflow - Snowflake Deployments を使用する場合、:doc:`必要なドメインの構成</user-guide/data-integration/openflow/setup-openflow-spcs-sf-allow-list>`を精査し、:ref:`label-openflow_domains_used_by_openflow_connectors_kinesis`コネクタに必要なドメインへのアクセス許可を付与していることを確認してください。
注釈
DLQ などその他のデータ形式や機能のサポートが必要な場合、Snowflakeの担当者にお問い合わせください。
Kinesisストリームを設定する¶
AWS 管理者として、 AWS アカウントで以下のアクションを実行します。
Ensure that you have an AWS User with IAM permissions to access Kinesis Streams and DynamoDB.
Ensure that the AWS User has configured Access Key credentials.
Snowflakeアカウントを設定する¶
Snowflakeアカウント管理者として、以下のタスクを実行します。
新しいロールを作成するか、既存のロールを使用して データベース権限 を付与します。
データを格納する宛先テーブルを作成するために使用する宛先データベースと宛先スキーマを作成します。
宛先テーブルがまだ存在しない場合に自動的に作成するコネクタの機能を使用する予定の場合は、ユーザーがSnowflakeオブジェクトの作成および管理に必要な権限を持っていることを確認してください。
オブジェクト
権限
注意
データベース
USAGE
スキーマ
USAGE . CREATE TABLE .
スキーマレベルのオブジェクトが作成された後に、 CREATE
object権限を取り消すことができます。テーブル
OWNERSHIP
Kinesisコネクタを使用して 既存の テーブルにデータを取り込む場合にのみ必要です。. コネクタがKinesisストリームからの記録用に新しいターゲットテーブルを作成する場合、構成で指定されたユーザーの既定のロールがテーブル所有者になります。
以下のスクリプトを使用して、カスタムロールを作成および構成できます(SECURITYADMIN または同等のロールが必要です)。
USE ROLE SECURITYADMIN; CREATE ROLE kinesis_connector_role; GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role; GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role; GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role; -- Only for existing tables. GRANT OWNERSHIP ON TABLE existing_table TO ROLE kinesis_connector_role;
タイプを SERVICE として、新しいSnowflakeサービスユーザーを作成します。
Snowflakeサービスユーザーに、前の手順で作成したロールを付与します。
GRANT ROLE kinesis_connector_role TO USER kinesis_connector_user; ALTER USER kinesis_connector_user SET DEFAULT_ROLE = kinesis_connector_role;
ステップ3のSnowflake SERVICE ユーザー向けの キーペア認証 で構成します。
Snowflakeではこの手順を強く推奨します。Openflowがサポートするシークレットマネージャ(AWS、Azure、Hashicorpなど)を構成し、公開キーと秘密キーを秘密ストアに格納します。
注釈
何らかの理由でシークレットマネージャを使用したくない場合は、組織のセキュリティポリシーに従って、キーペア認証に使用する公開キーと秘密キーファイルを保護する責任があります。
シークレットマネージャを構成したら、その認証方法を決定します。AWS 上では、Openflowに関連付けられた EC2 インスタンスロールが推奨されます。こうすることで、他の秘密を永続化する必要がなくなるからです。
Openflowで、右上のハンバーガーメニューから、このシークレットマネージャーに関連付けられたパラメータープロバイダーを構成します。Controller Settings » Parameter Provider に移動してから、パラメーター値を取得します。
この時点で、すべての認証情報を関連するパラメーターパスで参照することができるため、機密性の高い値をOpenflow内で永続化する必要はありません。
If any other Snowflake users require access to the ingested data and created tables (for example, for custom processing in Snowflake), then grant those users the role created in step 2.
コネクタを設定する¶
データエンジニアとして、以下のタスクを実行してコネクタをインストールおよび構成します。
コネクタをインストールする¶
Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.
Openflowのコネクタページでコネクタを探し、 Add to runtime を選択します。
In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.
注釈
コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベースとスキーマをSnowflakeで作成したことを確認します。
Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。
Snowflakeアカウント認証情報でランタイムを認証します。
コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。
コネクタを構成する¶
インポートしたプロセスグループを右クリックし、 Parameters を選択します。
Populate the required parameter values as described in Parameters section below.
Parameters¶
このセクションでは、 Openflow Connector for Kinesis の JSON データ形式のすべてのパラメーターについて説明します。
コネクタはいくつかのモジュールで構成されています。セットを表示するには、コネクタプロセスグループをダブルクリックします。モジュールのパラメーターコンテキストで各モジュールのパラメーターを設定できます。
Snowflake destination parameters¶
パラメーター |
説明 |
必須 |
|---|---|---|
宛先データベース |
データが永続化されるデータベース。Snowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。 |
有り |
宛先スキーマ |
データが永続化されるスキーマ。これはSnowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。 次の例をご参照ください。
|
有り |
Iceberg Enabled |
Icebergがテーブル操作で有効になっているかどうか。次のいずれか: |
有り |
Schema Evolution Enabled |
コネクタレベルでスキーマの進化を有効または無効にします。有効にすると、テーブルのスキーマの自動変更が許可されます。スキーマの進化は、テーブル固有のパラメーターを介して、個々のテーブルレベルでも制御できることに注意してください。次のいずれか: |
有り |
Schema Evolution For New Tables Enabled |
新しいテーブルを作成するときにスキーマの進化を有効にするかどうかを制御します。「true」に設定すると、 ENABLE_SCHEMA_EVOLUTION = TRUE パラメーターで新しいテーブルが作成されます。「false」に設定すると、 ENABLE_SCHEMA_EVOLUTION = FALSE パラメーターで新しいテーブルが作成されます。Icebergテーブルは自動的に作成されないため、適用されません。この設定はテーブルの作成にのみ影響し、既存のテーブルには影響しません。次のいずれか: |
有り |
Snowflakeアカウント識別子 |
以下を使用する場合:
|
有り |
Snowflake認証ストラテジー |
以下を使用する場合:
|
有り |
Snowflake秘密キー |
以下を使用する場合:
|
無し |
Snowflake秘密キーファイル |
以下を使用する場合:
|
無し |
Snowflake秘密キーパスワード |
以下を使用する場合
|
無し |
Snowflakeロール |
以下を使用する場合
|
有り |
Snowflakeのユーザー名 |
以下を使用する場合
|
有り |
Kinesis JSON Source Parameters¶
パラメーター |
説明 |
必須 |
|---|---|---|
AWS リージョンコード |
|
有り |
AWS アクセスキー ID |
The AWS Access Key ID to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch. |
有り |
AWS シークレットアクセスキー |
The AWS Secret Access Key to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch. |
有り |
Kinesisアプリケーション名 |
アプリケーションのKinesisストリーム消費の進捗を追跡するための DynamoDB テーブル名に使用される名前。 |
有り |
Kinesisストリーム初期位置 |
データの複製を開始するストリームの初期位置。
|
有り |
Kinesisストリーム名 |
データを消費する AWS Kinesisストリーム名。 |
有り |
メトリック公開 |
Kinesisクライアントライブラリのメトリックが公開される場所を指定します。可能な値: |
有り |
フローを実行する¶
プレーンを右クリックし、 Enable all Controller Services を選択します。
Right-click on the connector's process group and select Start.
コネクタがデータの取り込みを開始します。
Table Schema¶
The Snowflake table loaded by the connector contains columns named by the keys of your Kinesis messages.
The connector also adds a KINESISMETADATA column which stores metadata about the record.
以下は、コネクタによってロードされるSnowflakeテーブルの例です。
行 |
ACCOUNT |
SYMBOL |
SIDE |
QUANTITY |
KINESISMETADATA |
|---|---|---|---|---|---|
1 |
ABC123 |
ZTEST |
BUY |
3572 |
{ ... KINESISMETADATA object ... } |
2 |
XYZ789 |
ZABZX |
SELL |
3024 |
{ ... KINESISMETADATA object ... } |
3 |
XYZ789 |
ZTEST |
SELL |
799 |
{ ... KINESISMETADATA object ... } |
4 |
ABC123 |
ZABZX |
BUY |
2033 |
{ ... KINESISMETADATA object ... } |
KINESISMETADATA 列には、次のフィールドを持つオブジェクトが含まれます。
フィールド名 |
フィールド型 |
Example Value |
説明 |
|---|---|---|---|
|
String |
|
記録が由来するKinesisストリームの名前です。 |
|
String |
|
記録が由来するストリーム内のシャードの識別子です。 |
|
String |
|
記録がストリームに挿入されたおおよその時間(ISO 8601形式) |
|
String |
|
記録のデータプロデューサーによって指定されたパーティションキー。 |
|
String |
|
Kinesis Data Streamsによってシャード内の記録に割り当てられた一意のシーケンス番号。 |
|
数 |
|
記録のサブシーケンス番号(同じシーケンス番号を持つ集計記録に使用) |
|
String |
|
記録のシーケンス番号とサブシーケンス番号の組み合わせ。 |
スキーマの進化¶
このコネクタは、スキーマの自動検出と進化をサポートしています。Snowflakeのテーブルの構造は、コネクタによってロードされた新しいデータの構造をサポートするために、自動的に定義され、進化します。
Snowflakeは受信データのスキーマを検出し、ユーザー定義スキーマに一致するテーブルにデータをロードします。Snowflakeでは、新しい列を追加したり、新しい受信レコードで欠落している列から NOT NULL 制約をドロップしたりすることもできます。
コネクタを使用したスキーマ検出は、提供された JSON データに基づいてデータ型を推測します。
コネクタがターゲットテーブルを作成する場合、スキーマ進化はデフォルトで有効になります。
If you want to enable or disable schema evolution on an existing table,
use the ALTER TABLE command to set the ENABLE_SCHEMA_EVOLUTION parameter.
You must also use a role that has the OWNERSHIP privilege on the table. For more information, see テーブルスキーマの進化.
However, if schema evolution is disabled for an existing table, then the connector will try to send the rows with mismatched schemas to the configured failure output port.
Iceberg table support¶
Openflow Connector for Kinesis can ingest data into a Snowflake-managed Apache Iceberg™ table when Iceberg Enabled is set to true.
要件と制限¶
Before you configure the Openflow Connector for Kinesis for Iceberg table ingestion, note the following requirements and limitations:
コネクタを実行する前にIcebergテーブルを作成する必要があります。
ユーザーが作成したテーブルにデータを挿入するアクセス権を持っていることを確認します。
構成とセットアップ¶
To configure the Openflow Connector for Kinesis for Iceberg table ingestion, follow the steps in Set up Openflow Connector for Kinesis for JSON data format with a few differences noted in the following sections.
Icebergテーブルへの取り込みを有効にする¶
Icebergテーブルへの取り込みを有効にするには、 Iceberg Enabled パラメーターを true に設定する必要があります。
インジェスト用にIcebergテーブルを作成¶
コネクタを実行する前に、Icebergテーブルを作成する必要があります。初期のテーブルスキーマは、コネクタの スキーマ進化が有効です プロパティ設定によって異なります。
スキーマの進化を有効にすると、 kinesisMetadata という列を持つテーブルを作成する必要があります。コネクタはメッセージフィールドの列を自動的に作成し、 kinesisMetadata 列スキーマを変更します。
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
kinesisMetadata OBJECT()
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table'
ENABLE_SCHEMA_EVOLUTION = true;
If schema evolution is disabled, you must create the table with all fields the Kinesis message contains. When you create an Iceberg table, you can use Iceberg data types or compatible Snowflake types. The semi-structured VARIANT type isn't supported. Instead, use a structured OBJECT or MAP.
例えば、次のようなメッセージを考えてみましょう:
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"date_added": "2024-10-15"
}
次のステートメントは、Kinesisメッセージに含まれるすべてのフィールドを持つテーブルを作成します。
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
kinesisMetadata OBJECT(
stream STRING,
shardId STRING,
approximateArrival STRING,
partitionKey STRING,
sequenceNumber STRING,
subSequenceNumber INTEGER,
shardedSequenceNumber STRING
),
id INT,
body_temperature FLOAT,
name STRING,
approved_coffee_types ARRAY(STRING),
animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
date_added DATE
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
注釈
kinesisMetadata must always be created. Field names inside nested structures such as dogs or cats are case sensitive.