Openflow Connector for Amazon Kinesis Data Streams を設定する¶
注釈
このコネクタは、 Snowflakeコネクタ規約 に従うものとします。
このトピックでは、Openflow Connector for Amazon Kinesis Data Streams を設定する方法について説明します。
Openflow Connector for Amazon Kinesis Data Streams は、スキーマ進化機能を使用してKinesisストリームからSnowflakeテーブルに JSON メッセージを取り込むために設計されています。
Openflow Connector for Kinesisの設定¶
前提条件¶
:doc:`/user-guide/data-integration/openflow/setup-openflow-byoc`または:doc:`Openflow のセットアップ - Snowflake デプロイメント</user-guide/data-integration/openflow/setup-openflow-spcs>`があることを確認してください。
Openflow - Snowflakeデプロイを使用している場合は、必要なドメインの構成 を確認して、Kinesisコネクタに必要なドメインへのアクセス権を付与していることを確認してください。
AWS での IAM のロールとポリシーの設定¶
AWS 管理者として、 AWS アカウントで以下のアクションを実行します。
KinesisデータストリームにアクセスするためにOpenflowが使用する AWS IAM のユーザーまたはロールを作成します。詳細については、AWS ドキュメントの IAM ユーザーの作成 をご参照ください。
AWS ユーザーが アクセスキーの認証情報 を構成していることを確認します。
AWS ユーザーに次の IAM 権限を付与します。
サービス
アクション
リソース(ARNs)
目的
Amazon Kinesis Data Streams
kinesis:DescribeStream、kinesis:DescribeStreamConsumer、kinesis:GetRecords、kinesis:GetShardIterator、kinesis:ListShards、kinesis:RegisterStreamConsumerarn:aws:kinesis:${REGION}:${ACCOUNT_ID}:stream/${STREAM_NAME}シャードを検出し、共有スループットポーリングを通じて記録を読み取り、ストリーム ARN を解決します。また、強化されたファンアウトコンシューマーを登録し、登録中にコンシューマーのステータスをポーリングします。
Amazon Kinesis Data Streams
kinesis:DeregisterStreamConsumer、kinesis:DescribeStreamConsumer、kinesis:SubscribeToShardarn:aws:kinesis:${REGION}:${ACCOUNT_ID}:stream/${STREAM_NAME}/consumer/*コンシューマー ARN ごとに強化されたファンアウトコンシューマーの説明、サブスクライブ、および登録解除。
Amazon DynamoDB
dynamodb:CreateTable、dynamodb:DeleteTable、dynamodb:DescribeTable、dynamodb:GetItem、dynamodb:PutItem、dynamodb:Query、dynamodb:Scan、dynamodb:UpdateItemarn:aws:dynamodb:${REGION}:${ACCOUNT_ID}:table/${APPLICATION_NAME}、arn:aws:dynamodb:${REGION}:${ACCOUNT_ID}:table/${APPLICATION_NAME}_migrationチェックポイント/リーステーブル(シャードリース、ノードハートビート、チェックポイント)と、レガシーチェックポイントテーブルからの1回限りの移行時に使用される仮移行テーブルを作成および管理します。
IAM ポリシーの例:
ポリシーの例を使用する前に、次のプレースホルダーを置き換えます。
注釈
${APPLICATION_NAME}_migrationテーブルはレガシーチェックポイントテーブルから新しいスキーマへの1回限りの移行中にのみ作成された DynamoDB の仮テーブルです。移行が完了すると自動的に削除されます。デプロイによりレガシー KCL ベースのコネクタが使用されたことがない場合は、移行テーブルの ARN をポリシーから省略できます。dynamodb:DeleteTableアクションは移行プロセス中に使用され、移行の完了が確認された後にポリシーから削除できます。kinesis:DeregisterStreamConsumerアクションは、プロセッサーがキャンバスから削除されたときに呼び出されます。IAM プリンシパルにこの権限が付与されていない場合、コンシューマーは AWS コンソールまたは CLI を使用して手動で登録解除される必要があります。
Snowflakeアカウントを設定する¶
Snowflakeアカウント管理者として、以下のタスクを実行します。
タイプを SERVICE として、新しいSnowflakeサービスユーザーを作成します。
新しいロールを作成するか、既存のロールを使用して データベース権限 を付与します。
コネクタでは、ユーザーが宛先テーブルを作成する必要があります。ユーザーにSnowflakeオブジェクトの管理に必要な権限が付与されていることを確認します。
オブジェクト
権限
注意
データベース
USAGE
スキーマ
USAGE
テーブル
OWNERSHIP
コネクタがデータをテーブルに取り込むために必要です。
Snowflakeは、アクセス制御を改善するために、各Kinesisストリームに個別のユーザーとロールを作成することをお勧めします。
以下のスクリプトを使用して、カスタムロールを作成および構成できます(SECURITYADMIN または同等のロールが必要です)。
注釈
権限はコネクタロールに直接付与する必要があり、継承することはできません。
宛先テーブルの構成
スキーマの変更にサーバー側のスキーマ進化を使用し、DMLエラーのロギングにエラーテーブル を使用することを強くお勧めします。
以下の例は、テーブルを作成し、OWNERSHIP 権限を追加する方法を示しています。
これらのコネクタは、スキーマの自動検出と進化をサポートします。Snowflakeのテーブルの構造は、コネクタによってロードされた新しいデータの構造をサポートするために、自動的に定義され、進化します。記録コンテンツの第1レベルのキーを、名前で一致するテーブル列に自動的にマッピングします(大文字と小文字の区別なし)。
スキーマ進化を有効にすると、Snowflakeは受信ストリームで検出された新しい列を追加し、NOT NULL 制約をドロップすることで、宛先テーブルを自動的に拡張して、新しいデータパターンに対応することができます。詳細については、 テーブルスキーマの進化 をご参照ください。
ENABLE_SCHEMA_EVOLUTION が有効になっていない場合は、テーブル定義を拡張してスキーマを手動で作成する必要があります。コネクタは、記録コンテンツの第1レベルのキーを名前でテーブル列と一致させようとします。JSONからのキーがテーブル列と一致しない場合、コネクタはキーを無視します。
(オプション)シークレットマネージャーの構成
Snowflakeではこの手順を強く推奨します。Openflowがサポートするシークレットマネージャ(AWS、Azure、Hashicorpなど)を構成し、公開キーと秘密キーを秘密ストアに格納します。
シークレットマネージャを構成したら、その認証方法を決定します。AWS では、他のシークレットを保持する必要がないため、Openflowに関連付けられた EC2 インスタンスロールを使用することをお勧めします。
Openflowキャンバスで、右上のハンバーガーメニューから、このシークレットマネージャーに関連付けられたパラメータープロバイダーを構成します。Controller Settings » Parameter Provider に移動してから、パラメーター値を取得します。
この時点で、すべての認証情報を関連するパラメーターパスで参照することができるため、機密性の高い値をOpenflow内で永続化する必要はありません。
ユーザーへのアクセス権の付与
コネクタによって取り込まれた未加工のデータへのアクセスを必要とする(たとえば、Snowflakeでのカスタム処理のため)他のSnowflakeユーザーについては、ステップ2で作成したロールを付与する必要があります。
コネクタを設定する¶
データエンジニアとして、以下のタスクを実行してコネクタをインストールおよび構成します。
コネクタをインストールする¶
Openflow概要ページに移動します。特集コネクタ セクションで、その他のコネクタを表示 を選択します。
Openflowコネクタページで、Amazon Kinesis Data Streams用Openflowコネクタ を見つけて、ランタイムに追加 を選択します。
ランタイムを選択ダイアログで、利用可能なランタイム ドロップダウンリストからランタイムを選択し、追加 をクリックします。
注釈
コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベース、スキーマおよびテーブルをSnowflakeで作成したことを確認します。
Snowflakeアカウント認証情報でデプロイに対する認証を行い、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたら、許可する を選択します。コネクタのインストールプロセスは数分で完了します。
Snowflakeアカウント認証情報でランタイムを認証します。
コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。
コネクタを構成する¶
必要に応じて、組み込みパラメーターを構成する前に、コネクタ構成をカスタマイズします。
プロセスグループパラメーターを入力する
インポートしたプロセスグループを右クリックし、 Parameters を選択します。
必要なパラメーター値を入力します。
共通パラメーター¶
パラメーター |
説明 |
必須 |
|---|---|---|
AWS アクセスキー ID |
Kinesis Streamおよび DynamoDB に接続するための AWS アクセスキー ID。 |
有り |
AWS Kinesisのリージョン |
接続先の AWS リージョン。通常の AWS リージョン形式(例: |
有り |
AWS シークレットアクセスキー |
Kinesis Streamおよび DynamoDB に接続するための AWS シークレットアクセスキー。 |
有り |
AWS Kinesisアプリケーション名 |
Kinesis Stream消費に対するアプリケーションの進捗状況を追跡するための DynamoDB テーブル名として使用される名前。 |
有り |
AWS Kinesisコンシューマータイプ |
Kinesisストリームから記録を読み取るために使用される戦略。 次のいずれかの値であることが必要: SHARED_THROUGHPUT、ENHANCED_FAN_OUT。 詳細については、共有スループットコンシューマーと強化されたファンアウトコンシューマーの違い をご参照ください。 |
有り |
AWS Kinesisの初期ストリーム位置 |
データが複製を開始する初期ストリーム位置。これは、指定された AWS Kinesisアプリケーション名の最初の開始時にのみ有効になります。 可能な値は次のとおりです。 LATEST: 保存された最新のレコード。 TRIM_HORIZON: 最も古い保存済みレコード。 |
有り |
AWS Kinesisストリーム名 |
データの消費元となる AWS Kinesisストリーム名。 |
有り |
Snowflake宛先データベース |
データが永続化されるデータベース。Snowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。 |
有り |
Snowflake宛先スキーマ |
データが永続化されるスキーマ。これはSnowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。 次の例をご参照ください。
|
有り |
Snowflake宛先テーブル |
データが永続化されるテーブル。Snowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。 |
有り |
コネクタを起動します。¶
プレーンを右クリックし、 Enable all Controller Services を選択します。
平面を右クリックして、Start を選択します。コネクタがデータの取り込みを開始します。
KINESISMETADATA 列についての理解¶
コネクタは、Kinesis記録に関するメタデータを含む KINESISMETADATA 構造を入力します。構造には次の情報が含まれています。
フィールド名 |
フィールド型 |
値の例 |
説明 |
|---|---|---|---|
ストリーム |
String |
|
記録が由来するKinesisストリームの名前です。 |
shardId |
String |
|
記録が由来するストリーム内のシャードの識別子です。 |
approximateArrival |
String |
|
記録がストリームに挿入されたおおよその時間(ISO 8601形式) |
partitionKey |
String |
|
記録のデータプロデューサーによって指定されたパーティションキー。 |
sequenceNumber |
String |
|
Kinesis Data Streamsによってシャード内の記録に割り当てられた一意のシーケンス番号。 |
subSequenceNumber |
数 |
|
記録のサブシーケンス番号(同じシーケンス番号を持つ集計記録に使用) |
shardedSequenceNumber |
String |
|
記録のシーケンス番号とサブシーケンス番号の組み合わせ。 |
取り込みのレイテンシの測定¶
行の変更時間に基づいて変更の追跡、増分処理、およびTime Travelクエリを実行するには、ROW_TIMESTAMP 機能を使用します。
宛先テーブルで次のコマンドを実行すると有効にできます。
行のタイムスタンプが有効になると、テーブルは METADATA$ROW_LAST_COMMIT_TIME 列を公開します。これは、各行が最後に変更された時点のタイムスタンプを返します。
詳細については、行のタイムスタンプ をご参照ください。
注釈
行のタイムスタンプはインタラクティブテーブルでは使用できません。詳細については、 インタラクティブテーブルの制限 をご参照ください。
Apache Iceberg™テーブルでのコネクタの使用¶
コネクタは、Snowflakeが管理するApache Iceberg™テーブルにデータを取り込むことができますが、以下の要件を満たしている必要があります。
Apache Iceberg™テーブルに関連付けられた外部ボリュームに対する USAGE 権限を付与されている必要があります。
コネクタを実行する前にApache Iceberg™テーブルを作成する必要があります。
外部ボリュームの使用許可¶
たとえば、Icebergテーブルが kinesis_external_volume 外部ボリュームを使用し、コネクタがロール openflow_kinesis_connector_role_1 を使用する場合、次のステートメントを実行します:
取り込み用のApache Iceberg™テーブルの作成¶
コネクタは自動的にIcebergテーブルを作成せず、スキーマの進化もサポートしません。コネクタを実行する前に、Icebergテーブルを手動で作成する必要があります。
Icebergテーブルを作成する場合、Icebergデータタイプ(VARIANTなど)または 互換性のあるSnowflakeデータタイプ を使用できます。
例えば、次のようなメッセージを考えてみましょう:
例のメッセージ用にIcebergテーブルを作成するには、以下のステートメントの1つを使用します:
インタラクティブテーブルでのコネクタの使用¶
インタラクティブテーブルは、低レイテンシの高同時実行性クエリ向けに最適化されたSnowflakeテーブルの特別なタイプです。インタラクティブテーブルの詳細については、 インタラクティブテーブルドキュメント をご参照ください。
インタラクティブテーブルを作成します。
重要な考慮事項:
インタラクティブテーブルには特定の制限とクエリ制限があります。コネクタで使用する前に インタラクティブテーブルドキュメント をご確認ください。
インタラクティブテーブルの場合、必要な変換はテーブル定義で処理する必要があります。
インタラクティブテーブルを効率的にクエリするには、インタラクティブウェアハウスが必要です。
宛先テーブルに対する顧客定義スキーマでコネクタを使用¶
コネクタは、各Kinesis記録をSnowflakeテーブルに挿入される行として扱います。たとえば、以下の JSON のように構造化されたメッセージのコンテンツを持つKinesisトピックがある場合:
デフォルトでは、JSON からすべてのフィールドを指定する必要はありません。スキーマの進化によって処理されます。ただし、静的スキーマを優先的に使用する場合は、次の処理を実行して作成できます。
顧客定義の PIPE でのコネクタの使用¶
独自のパイプを作成することを選択した場合は、パイプの COPY INTO ステートメントでデータ変換ロジックを定義できます。必要に応じて列の名前を変更しデータ型をキャストすることができます。例:
独自のパイプを定義する場合は、宛先テーブルの列が JSON キーと一致する必要はありません。列の名前を希望の名前に変更し、必要に応じてデータ型をキャストできます。
カスタムパイプで動作するようにコネクタを調整するには、次のタスクを実行します。
OpenflowキャンバスのKinesis取り込みフローで使用される PublishSnowpipeStreaming プロセッサーを右クリックします。
コンテキストメニューから Configure を選択します。
Properties タブに移動します。
宛先タイプフィールドで、Pipe を選択します。
パイプフィールドに、パイプの名前を入力します。
Apply を選択して構成を保存します。
エラー処理のカスタマイズ¶
エラー処理は、Snowpipe Streamingサービス内のOpenflow側の失敗とサーバー側の失敗に分割されます。
Openflowエラー(クライアント側の失敗):記録がSnowflakeに到達する前に、解析不可のペイロードやカスタム変換の失敗などのエラーが発生します。デフォルトでは、これらの記録は破棄されます。Openflowでこれらのエラーを処理することが可能です。そのためには、ConsumeKinesis プロセッサーで解析失敗関係から FlowFiles を使用します。
Snowpipe Streamingエラー(サーバー側の失敗):Snowflakeには正常に到達したものの、宛先テーブルのスキーマと互換性のない(例: 型の不一致)記録のエラーは、Snowflakeインフラストラクチャによってキャプチャされます。宛先テーブルでエラーのロギングが有効になっている(
error_logging = true)場合、これらの失敗した行は宛先のエラーテーブルに自動的に取り込まれます。