Openflow Connector for Kinesis の JSON データ形式を設定する¶
注釈
このコネクタは、 Snowflakeコネクタ規約 に従うものとします。
このトピックでは、 JSON データ形式用に Openflow Connector for Kinesis を設定する方法について説明します。これは、スキーマ進化機能を備えた基本的なメッセージの取り込みに最適化された簡素化されたコネクタです。
Openflow Connector for Kinesis の JSON データ形式は、KinesisストリームからSnowflakeテーブルへの JSON メッセージの簡単な取り込みを目的として設計されています。
前提条件¶
BYOCを使用してOpenflowを設定 しているか、 Snowflakeデプロイメントを使用してOpenflowを設定 していることを確認します。
|OFSFSPCS-plural|を使用する場合、必要なドメインの構成 を精査し、 Kinesis コネクタに必要なドメインへのアクセス許可を付与していることを確認してください。
注釈
DLQ などその他のデータ形式や機能のサポートが必要な場合、Snowflakeの担当者にお問い合わせください。
Kinesisストリームを設定する¶
AWS 管理者として、 AWS アカウントで以下のアクションを実行します。
Kinesis Streamsと DynamoDB <https://docs.aws.amazon.com/streams/latest/dev/kcl-iam-permissions.html>`_ にアクセスするための `IAM 権限を持つ AWS ユーザーがいることを確認します。
AWS ユーザーが `アクセスキーの認証情報<https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html>`_ を構成したことを確認します。
Snowflakeアカウントを設定する¶
Snowflakeアカウント管理者として、以下のタスクを実行します。
新しいロールを作成するか、既存のロールを使用して データベース権限 を付与します。
データを格納する宛先テーブルを作成するために使用する宛先データベースと宛先スキーマを作成します。
宛先テーブルがまだ存在しない場合に自動的に作成するコネクタの機能を使用する予定の場合は、ユーザーがSnowflakeオブジェクトの作成および管理に必要な権限を持っていることを確認してください。
オブジェクト
権限
注意
データベース
USAGE
スキーマ
USAGE . CREATE TABLE .
スキーマレベルのオブジェクトが作成された後に、 CREATE
object権限を取り消すことができます。テーブル
OWNERSHIP
Kinesisコネクタを使用して 既存の テーブルにデータを取り込む場合にのみ必要です。. コネクタがKinesisストリームからの記録用に新しいターゲットテーブルを作成する場合、構成で指定されたユーザーの既定のロールがテーブル所有者になります。
以下のスクリプトを使用して、カスタムロールを作成および構成できます(SECURITYADMIN または同等のロールが必要です)。
USE ROLE SECURITYADMIN; CREATE ROLE kinesis_connector_role; GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role; GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role; GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role; -- Only for existing tables. GRANT OWNERSHIP ON TABLE existing_table TO ROLE kinesis_connector_role;
タイプを SERVICE として、新しいSnowflakeサービスユーザーを作成します。
Snowflakeサービスユーザーに、前の手順で作成したロールを付与します。
GRANT ROLE kinesis_connector_role TO USER kinesis_connector_user; ALTER USER kinesis_connector_user SET DEFAULT_ROLE = kinesis_connector_role;
ステップ3のSnowflake SERVICE ユーザー向けの キーペア認証 で構成します。
Snowflakeではこの手順を強く推奨します。Openflowがサポートするシークレットマネージャ(AWS、Azure、Hashicorpなど)を構成し、公開キーと秘密キーを秘密ストアに格納します。
注釈
何らかの理由でシークレットマネージャを使用したくない場合は、組織のセキュリティポリシーに従って、キーペア認証に使用する公開キーと秘密キーファイルを保護する責任があります。
シークレットマネージャーを構成した後、認証する方法を決定します。AWS では、他のシークレットを保持する必要がないため、Openflowに関連付けられた EC2 インスタンスロールを使用することをお勧めします。
Openflowで、右上のハンバーガーメニューから、このシークレットマネージャーに関連付けられたパラメータープロバイダーを構成します。Controller Settings » Parameter Provider に移動してから、パラメーター値を取得します。
この時点で、すべての認証情報を関連するパラメーターパスで参照することができるため、機密性の高い値をOpenflow内で永続化する必要はありません。
他のSnowflakeユーザーがインジェストされたデータと作成されたテーブルへのアクセスを必要とする場合(たとえば、Snowflakeのカスタム処理のため)、ステップ2で作成したロールをそれらのユーザーに付与します。
コネクタを設定する¶
データエンジニアとして、以下のタスクを実行してコネクタをインストールおよび構成します。
コネクタをインストールする¶
Openflow概要ページに移動します。Featured connectors セクションで、 View more connectors を選択します。
Openflowのコネクタページでコネクタを探し、 Add to runtime を選択します。
Select runtime ダイアログで、Available runtimes ドロップダウンリストからランタイムを選択して Add をクリックします。
注釈
コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベースとスキーマをSnowflakeで作成したことを確認します。
Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。
Snowflakeアカウント認証情報でランタイムを認証します。
コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。
コネクタを構成する¶
インポートされたプロセスグループを右クリックし、 Parameters を選択します。
パラメーター の説明に従って、必要なパラメーター値を入力します。
パラメーター¶
このセクションでは、 Openflow Connector for Kinesis の JSON データ形式のすべてのパラメーターについて説明します。
コネクタはいくつかのモジュールで構成されています。セットを表示するには、コネクタプロセスグループをダブルクリックします。モジュールのパラメーターコンテキストで各モジュールのパラメーターを設定できます。
Snowflake宛先パラメーター¶
パラメーター |
説明 |
必須 |
|---|---|---|
宛先データベース |
データが永続化されるデータベース。Snowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。 |
有り |
宛先スキーマ |
データが永続化されるスキーマ。これはSnowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。 次の例をご参照ください。
|
有り |
Iceberg Enabled |
Icebergがテーブル操作で有効になっているかどうか。次のいずれか: |
有り |
スキーマ進化が有効です |
コネクタレベルでスキーマの進化を有効または無効にします。有効にすると、テーブルのスキーマの自動変更が許可されます。スキーマの進化は、テーブル固有のパラメーターを介して、個々のテーブルレベルでも制御できることに注意してください。次のいずれか: |
有り |
有効化された新しいテーブルのスキーマ進化 |
新しいテーブルを作成するときにスキーマの進化を有効にするかどうかを制御します。「true」に設定すると、 ENABLE_SCHEMA_EVOLUTION = TRUE パラメーターで新しいテーブルが作成されます。「false」に設定すると、 ENABLE_SCHEMA_EVOLUTION = FALSE パラメーターで新しいテーブルが作成されます。Icebergテーブルは自動的に作成されないため、適用されません。この設定はテーブルの作成にのみ影響し、既存のテーブルには影響しません。次のいずれか: |
有り |
Snowflakeアカウント識別子 |
以下を使用する場合:
|
有り |
Snowflake認証ストラテジー |
以下を使用する場合:
|
有り |
Snowflake秘密キー |
以下を使用する場合:
|
無し |
Snowflake秘密キーファイル |
以下を使用する場合:
|
無し |
Snowflake秘密キーパスワード |
以下を使用する場合
|
無し |
Snowflakeロール |
以下を使用する場合
|
有り |
Snowflakeのユーザー名 |
以下を使用する場合
|
有り |
Kinesis JSON ソースパラメーター¶
パラメーター |
説明 |
必須 |
|---|---|---|
AWS リージョンコード |
|
有り |
AWS アクセスキー ID |
Kinesis Stream、 DynamoDB、およびオプションで CloudWatch に接続するための AWS アクセスキー ID。 |
有り |
AWS シークレットアクセスキー |
Kinesis Stream、 DynamoDB、およびオプションで CloudWatch に接続するための AWS シークレットアクセスキー。 |
有り |
Kinesisアプリケーション名 |
アプリケーションのKinesisストリーム消費の進捗を追跡するための DynamoDB テーブル名に使用される名前。 |
有り |
Kinesisコンシューマータイプ |
Kinesisストリームから記録を読み取るために使用される戦略。これは次のいずれかの値である必要があります: |
有り |
Kinesisストリーム初期位置 |
データの複製を開始するストリームの初期位置。 可能な値は次のとおりです。
|
有り |
Kinesisストリーム名 |
データを消費する AWS Kinesisストリーム名。 |
有り |
メトリック公開 |
Kinesisクライアントライブラリのメトリックが公開される場所を指定します。可能な値: |
有り |
フローを実行する¶
平面を右クリックして、 Enable all Controller Services を選択します。
コネクタのプロセスグループを右クリックし、 Start を選択します。
コネクタがデータの取り込みを開始します。
テーブルスキーマ¶
コネクタによってロードされたSnowflakeテーブルには、Kinesisメッセージのキーの名前が付けられた列が含まれます。コネクタは、レコードに関するメタデータを格納する KINESISMETADATA 列も追加します。
以下は、コネクタによってロードされるSnowflakeテーブルの例です。
行 |
ACCOUNT |
SYMBOL |
SIDE |
QUANTITY |
KINESISMETADATA |
|---|---|---|---|---|---|
1 |
ABC123 |
ZTEST |
BUY |
3572 |
{ ... KINESISMETADATA object ... } |
2 |
XYZ789 |
ZABZX |
SELL |
3024 |
{ ... KINESISMETADATA object ... } |
3 |
XYZ789 |
ZTEST |
SELL |
799 |
{ ... KINESISMETADATA object ... } |
4 |
ABC123 |
ZABZX |
BUY |
2033 |
{ ... KINESISMETADATA object ... } |
KINESISMETADATA 列には、次のフィールドを持つオブジェクトが含まれます。
フィールド名 |
フィールド型 |
値の例 |
説明 |
|---|---|---|---|
|
String |
|
記録が由来するKinesisストリームの名前です。 |
|
String |
|
記録が由来するストリーム内のシャードの識別子です。 |
|
String |
|
記録がストリームに挿入されたおおよその時間(ISO 8601形式) |
|
String |
|
記録のデータプロデューサーによって指定されたパーティションキー。 |
|
String |
|
Kinesis Data Streamsによってシャード内の記録に割り当てられた一意のシーケンス番号。 |
|
数 |
|
記録のサブシーケンス番号(同じシーケンス番号を持つ集計記録に使用) |
|
String |
|
記録のシーケンス番号とサブシーケンス番号の組み合わせ。 |
スキーマの進化¶
このコネクタは、スキーマの自動検出と進化をサポートしています。Snowflakeのテーブルの構造は、コネクタによってロードされた新しいデータの構造をサポートするために、自動的に定義され、進化します。
Snowflakeは受信データのスキーマを検出し、ユーザー定義スキーマに一致するテーブルにデータをロードします。Snowflakeでは、新しい列を追加したり、新しい受信レコードで欠落している列から NOT NULL 制約をドロップしたりすることもできます。
コネクタを使用したスキーマ検出は、提供された JSON データに基づいてデータ型を推測します。
コネクタがターゲットテーブルを作成する場合、スキーマ進化はデフォルトで有効になります。
既存のテーブルのスキーマ進化を有効または無効にする場合は、 ALTER TABLE コマンドを使用して ENABLE_SCHEMA_EVOLUTION パラメーターを設定します。テーブルで:code:OWNERSHIP`権限を持つロールを使用することも必要です。詳細については、 :doc:/user-guide/data-load-schema-evolution` をご参照ください。
ただし、既存のテーブルでスキーマ進化が無効になっている場合、コネクタは一致しないスキーマを持つ行を構成された失敗出力ポートに送信しようとします。
Icebergテーブルのサポート¶
Openflow Connector for Kinesis は、Iceberg有効 が true に設定されている場合、Snowflake管理の Apache Iceberg™ にデータを取り込むことができます。
要件と制限¶
Icebergテーブルの取り込み用に Openflow Connector for Kinesis を構成する前に、次の要件と制限に注意してください。
コネクタを実行する前にIcebergテーブルを作成する必要があります。
ユーザーが作成したテーブルにデータを挿入するアクセス権を持っていることを確認します。
構成とセットアップ¶
Icebergテーブルの取り込み用に Openflow Connector for Kinesis を構成するには、次のセクションに記載されているいくつかの違いを踏まえて、 Openflow Connector for Kinesis の JSON データ形式を設定する のステップに従います。
Icebergテーブルへの取り込みを有効にする¶
Icebergテーブルへの取り込みを有効にするには、 Iceberg Enabled パラメーターを true に設定する必要があります。
インジェスト用にIcebergテーブルを作成¶
コネクタを実行する前に、Icebergテーブルを作成する必要があります。初期のテーブルスキーマは、コネクタの スキーマ進化が有効です プロパティ設定によって異なります。
スキーマの進化を有効にすると、 kinesisMetadata という列を持つテーブルを作成する必要があります。コネクタはメッセージフィールドの列を自動的に作成し、 kinesisMetadata 列スキーマを変更します。
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
kinesisMetadata OBJECT()
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table'
ENABLE_SCHEMA_EVOLUTION = true;
スキーマの進化が無効になっている場合は、Kinesisメッセージが含むすべてのフィールドでテーブルを作成する必要があります。Icebergテーブルを作成する場合、Icebergデータタイプまたは 互換性のあるSnowflakeデータタイプ を使用できます。半構造化 VARIANT タイプはサポートされていません。代わりに、 構造化された OBJECT または MAP を使用してください。
例えば、次のようなメッセージを考えてみましょう:
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"date_added": "2024-10-15"
}
次のステートメントは、Kinesisメッセージに含まれるすべてのフィールドを持つテーブルを作成します。
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
kinesisMetadata OBJECT(
stream STRING,
shardId STRING,
approximateArrival STRING,
partitionKey STRING,
sequenceNumber STRING,
subSequenceNumber INTEGER,
shardedSequenceNumber STRING
),
id INT,
body_temperature FLOAT,
name STRING,
approved_coffee_types ARRAY(STRING),
animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
date_added DATE
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
注釈
kinesisMetadata は常に作成される必要があります。dogs や cats のような入れ子構造内のフィールド名では、大文字と小文字が区別されます。