Openflow Connector for Kinesis を設定する¶
注釈
コネクタには、 コネクタ利用規約 が適用されます。
このトピックでは、 Openflow Connector for Kinesis を設定する手順について説明します。
前提条件¶
Openflow Connector for Kinesis について を確認してください。
Openflowを設定した ことを確認します。
Kinesisストリームを設定する¶
AWS 管理者として、 AWS アカウントで以下のアクションを実行します。
IAM 権限を持つ AWS アカウントがあり、Kinesis Streamsおよび DynamoDB にアクセスできることを確認します。
オプションで、デッドレターキュー(DLQ)Kinesisストリームを作成します。正常に解析できなかったメッセージは、指定した DLQ にリダイレクトすることができます。
Snowflakeアカウントを設定する¶
Snowflakeアカウント管理者として、以下のタスクを実行します。
新しいロールを作成するか、既存のロールを使用して データベース権限 を付与します。
データを格納する宛先テーブルを作成するために使用する宛先データベースと宛先スキーマを作成します。
宛先テーブルがまだ存在しない場合に自動的に作成するコネクタの機能を使用する予定の場合は、ユーザーがSnowflakeオブジェクトの作成および管理に必要な権限を持っていることを確認してください。
オブジェクト
権限
注意
データベース
USAGE
スキーマ
USAGE . CREATE TABLE .
スキーマレベルのオブジェクトが作成された後に、 CREATE
object
権限を取り消すことができます。テーブル
OWNERSHIP
Kinesisコネクタを使用して 既存の テーブルにデータを取り込む場合にのみ必要です。 . コネクタがKinesisストリームからの記録用に新しいターゲットテーブルを作成する場合、構成で指定されたユーザーの既定のロールがテーブル所有者になります。
以下のスクリプトを使用して、カスタムロールを作成および構成できます(SECURITYADMIN または同等のロールが必要です)。
USE ROLE SECURITYADMIN; CREATE ROLE kinesis_connector_role_1; GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role_1; GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role_1; GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role_1; -- Only for existing tables GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE kinesis_connector_role_1;
タイプを SERVICE として、新しいSnowflakeサービスユーザーを作成します。
Snowflakeサービスユーザーに、前の手順で作成したロールを付与します。
GRANT ROLE kinesis_connector_role_1 TO USER kinesis_connector_user_1; ALTER USER kinesis_connector_user_1 SET DEFAULT_ROLE = kinesis_connector_role_1;
ステップ3のSnowflake SERVICE ユーザー向けの キーペア認証 で構成します。
Snowflakeではこの手順を強く推奨します。Openflowがサポートするシークレットマネージャ(AWS、Azure、Hashicorpなど)を構成し、公開キーと秘密キーを秘密ストアに格納します。
注釈
何らかの理由でシークレットマネージャを使用したくない場合は、組織のセキュリティポリシーに従って、キーペア認証に使用する公開キーと秘密キーファイルを保護する責任があります。
シークレットマネージャを構成したら、その認証方法を決定します。AWS 上では、Openflowに関連付けられた EC2 インスタンスロールが推奨されます。こうすることで、他の秘密を永続化する必要がなくなるからです。
Openflowで、右上のハンバーガーメニューから、このシークレットマネージャに関連付けられたParameter Providerを構成します。 Controller Settings » Parameter Provider に移動し、パラメーターの値をフェッチします。
この時点で、すべての認証情報を関連するパラメーターパスで参照することができるため、機密性の高い値をOpenflow内で永続化する必要はありません。
他のSnowflakeユーザーが、コネクタによって取り込まれた生の取り込みドキュメントやとテーブルへのアクセスを必要とする場合は(Snowflakeでのカスタム処理のためなど)、それらのユーザーにステップ1で作成したロールを付与します。
コネクタが使用するウェアハウスを指定します。まずは最小のウェアハウスサイズから始め、複製するテーブルの数や転送するデータ量に応じて異なるサイズを試してみてください。テーブル数が大きい場合は、通常、ウェアハウスのサイズを大きくするよりも、 マルチクラスターウェアハウス を使用した方がスケーリングが向上します。
コネクタを設定する¶
データエンジニアとして、以下のタスクを実行してコネクタをインストールおよび構成します。
コネクタをインストールする¶
Openflowの概要ページに移動します。 Featured connectors セクションで、 View more connectors を選択します。
Openflowのコネクタページでコネクタを探し、 Add to runtime を選択します。
Select runtime ダイアログで、 Available runtimes ドロップダウンリストからランタイムを選択します。
Add を選択します。
注釈
コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベースとスキーマをSnowflakeで作成したことを確認します。
Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。
Snowflakeアカウント認証情報でランタイムを認証します。
コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。
コネクタを構成する¶
インポートしたプロセスグループを右クリックし、 Parameters を選択します。
フローパラメーター の説明に従って、必要なパラメーター値を入力します。
フローパラメーター¶
このセクションでは、以下のパラメーターコンテキストに基づいて構成できるフローパラメーターについて説明します。
Kinesisソースパラメーター: Kinesisとの接続を確立するために使用します。
Kinesis宛先パラメーター: Snowflakeとの接続を確立するために使用します。
Kinesis取り込みパラメーター: Kinesisからダウンロードするデータの構成を定義するために使用します。
Kinesisソースパラメーター¶
パラメーター |
説明 |
---|---|
AWS リージョンコード |
|
AWS アクセスキー ID |
Kinesisストリームおよび DynamoDB に接続するための AWS アクセスキー ID。 |
AWS シークレットアクセスキー |
Kinesisストリーム DynamoDB に接続するための AWS シークレットアクセスキー。 |
スキーマレジストリ URL |
AVRO スキーマレジストリの URL。これは、 AVRO Schema Access Strategyパラメーターが |
スキーマレジストリの認証タイプ |
AVRO スキーマレジストリが使用する認証タイプ。これは、 AVRO Schema Access Strategyパラメーターが
|
Schema Registry Username |
AVRO スキーマレジストリの |
Schema Registry Password |
AVRO スキーマレジストリへの |
Kinesis宛先パラメーター¶
パラメーター |
説明 |
---|---|
宛先データベース |
データが永続化されるデータベース。既にSnowflakeに存在している必要があります。 |
宛先スキーマ |
データが永続化されるスキーマ。既にSnowflakeに存在している必要があります。このパラメーターは大文字と小文字を区別します。 |
Snowflakeアカウント識別子 |
データが永続化されるSnowflakeアカウント名(形式: [organization-name]- [account-name])。 |
Snowflake認証ストラテジー |
Snowflakeへの認証のストラテジー。可能な値: SPCS でフローを実行している場合は |
Snowflake秘密キー |
認証に使用される RSA 秘密キー。RSA キーは、 PKCS8 標準に従って書式設定され、標準的な PEM ヘッダーとフッターを持つ必要があります。Snowflake秘密キーファイルまたはSnowflake秘密キーのいずれかを定義する必要があることに注意してください。 |
Snowflake秘密キーファイル |
Snowflakeへの認証に使用される RSA 秘密キーを含むファイル。 PKCS8 標準に従って書式設定され、標準的な PEM ヘッダーとフッターを持ちます。ヘッダー行は |
Snowflake秘密キーパスワード |
Snowflake秘密キーファイルに関連付けられたパスワード。 |
Snowflakeロール |
クエリ実行時に使用されるSnowflakeロール。 |
Snowflakeのユーザー名 |
Snowflakeインスタンスへの接続に使用するユーザー名。 |
Snowflakeウェアハウス |
クエリ実行に使用されるSnowflakeウェアハウス。このパラメーターは大文字と小文字を区別します。 |
Kinesis取り込みパラメーター¶
パラメーター |
説明 |
---|---|
Kinesisアプリケーション名 |
アプリケーションのKinesisストリーム消費の進捗を追跡するための DynamoDB テーブル名に使用される名前。 |
Kinesisストリーム名 |
データを消費する AWS Kinesisストリーム名。 |
Kinesisストリーム初期位置 |
データの複製を開始するストリームの初期位置。
|
Kinesis DLQ ストリーム名 |
処理に失敗したすべての記録が送信されるストリーム名。このパラメーターが追加されていない場合、Openflowキャンバスのコネクタの DLQ 関連部分に警告記号が表示されます。 |
メッセージ形式 |
Kinesisのメッセージ形式。
|
AVRO Schema Access Strategy |
AVRO メッセージ形式のデータにアクセスするには、スキーマが必要です。このパラメーターは、特定メッセージの AVRO スキーマにアクセスするためのストラテジーを定義します。メッセージ形式パラメーターが
|
Kinesisストリームからテーブルへのマッピング |
このオプションのパラメーターにより、ユーザーはどのストリームをどのテーブルにマッピングするかを指定できます。各ストリームとそのテーブル名はコロンで区切る必要があります。このテーブル名は、引用符で囲まれていないSnowflakeの有効な識別子である必要があります。正規表現はあいまいであってはならず、一致するストリームは単一のターゲットテーブルのみに一致しなければなりません。空または一致するものがない場合は、ストリーム名がテーブル名として使用されます。
|
Iceberg Enabled |
プロセッサーがデータをIcebergテーブルに取り込むかどうかを指定します。このプロパティが実際のテーブルタイプと一致しない場合、プロセッサーは失敗します。
|
フローを実行する¶
プレーンを右クリックし、 Enable all Controller Services を選択します。
インポートしたプロセスグループを右クリックし、 Start を選択します。
コネクタがデータの取り込みを開始します。
スキーマ¶
コネクタによってロードされたSnowflakeテーブルには、Kinesisメッセージのキーによって命名された列が含まれます。以下にそのようなテーブルの例を示します。
行 |
ACCOUNT |
SYMBOL |
SIDE |
QUANTITY |
---|---|---|---|---|
1 |
ABC123 |
ZTEST |
BUY |
3572 |
2 |
XYZ789 |
ZABZX |
SELL |
3024 |
3 |
XYZ789 |
ZTEST |
SELL |
799 |
4 |
ABC123 |
ZABZX |
BUY |
2033 |
5 |
ABC123 |
ZTEST |
BUY |
1558 |
スキーマの進化¶
現在、 Iceberg Enabled
が false
に設定されている場合。コネクタがターゲットテーブルを作成する場合、スキーマ進化はデフォルトで有効になります。既存のテーブルでスキーマ進化を有効または無効にしたい場合は、 ALTER TABLE コマンドを使用して、 ENABLE_SCHEMA_EVOLUTION
パラメーターを設定します。テーブルで OWNERSHIP
権限を持つロールを使用することも必要です。詳細については、 テーブルスキーマの進化 をご参照ください。
ただし、既存のテーブルに対してスキーマの進化が無効になっている場合、コネクタは不一致のスキーマがある行を構成されたデッドレターキュー(DLQ)に送ろうとします。
Iceberg Enabled
が true
に設定されているケースについては、段落 Apache Iceberg™ テーブルのスキーマの進化 をご参照ください。
Apache Iceberg™ テーブルと Openflow Connector for Kinesis の併用¶
Openflow Connector for Kinesis では、Snowflakeが管理する Apache Iceberg™テーブル にデータを取り込むことができます。
要件と制限¶
Icebergテーブルの取り込み用にコネクタを構成する前に、以下の要件と制限に注意してください。
コネクタを実行する前にIcebergテーブルを作成する必要があります。
ユーザーが作成したテーブルにデータを挿入するアクセス権を持っていることを確認します。
スキーマの進化はIcebergテーブルではサポートされていません。
構成とセットアップ¶
Icebergテーブルの取り込み用にコネクタを構成するには、 コネクタを構成する の指示に従いますが、以下のセクションで説明するいくつかの違いがあります。
Icebergテーブルへの取り込みを有効にする¶
Icebergテーブルへの取り込みを有効にするには、 Iceberg Enabled
パラメーターを true
に設定する必要があります。
インジェスト用にIcebergテーブルを作成¶
コネクタを実行する前に、Icebergテーブルを作成する必要があります。スキーマの進化はサポートされていないため、Kinesisメッセージが含むすべてのフィールドでテーブルを作成する必要があります。
Icebergテーブルを作成する場合、Icebergデータタイプまたは 互換性のあるSnowflakeデータタイプ を使用できます。半構造化 VARIANT タイプはサポートされていません。代わりに、 構造化された OBJECT または MAP を使用してください。
例えば、次のようなメッセージを考えてみましょう:
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"date_added": "2024-10-15"
}
例のメッセージ用にIcebergテーブルを作成するには、以下のステートメントを使用します:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table ( id INT, body_temperature FLOAT, name STRING, approved_coffee_types ARRAY(STRING), animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN), date_added DATE ) EXTERNAL_VOLUME = 'my_volume' CATALOG = 'SNOWFLAKE' BASE_LOCATION = 'my_location/my_iceberg_table';
注釈
dogs
や cats
のような入れ子構造内のフィールド名では、大文字と小文字が区別されます。
Apache Iceberg™ テーブルのスキーマの進化¶
現在、コネクタは Apache Iceberg™ テーブルのスキーマの進化をサポートしていません。
既知の問題¶
コネクタのプロセスグループには、「Upload Failure」という名前の出力ポートが1つあります。Snowflakeに正常にアップロードされなかった FlowFiles の処理に使用することができます。このポートがコネクタのプロセスグループの外部に接続されていないと警告サインが表示されますが、無視することができます。
すべてのプロセッサーは、停止しているときに、一度だけ実行するように命令することができます。ConsumeKinesisStream プロセッサーは、内部アーキテクチャのため、一度だけ実行を命じられても有意義な仕事はしません。プロセッサーが動き出すには、始動して約2分間作動させる必要があります。