Openflow Connector for Snowflake to Kafka を設定する¶
注釈
コネクタには、 コネクタ利用規約 が適用されます。
このトピックでは、 Openflow Connector for Snowflake to Kafka を設定する手順について説明します。
前提条件¶
Openflowを設定した ことを確認します。
変更をクエリするSnowflakeストリームを作成します。
Snowflakeストリームから CDC メッセージを受信するKafkaトピックを作成します。
Snowflakeアカウントを設定する¶
Snowflakeアカウント管理者として、以下のタスクを実行します。
コネクタが CDC イベントの読み取りに使用するデータベース、ソーステーブル、ストリームオブジェクトを作成します。例:
create database stream_db; use database stream_db; create table stream_source (user_id varchar, data varchar); create stream stream_on_table on table stream_source;
新しいロールを作成するか、既存のロールを使用し、ストリームとストリームのソースオブジェクトに SELECT 権限を付与します。コネクタには、ストリームとストリームのソースオブジェクトを含むデータベースとスキーマの USAGE 権限も必要です。例:
create role stream_reader; grant usage on database stream_db to role stream_reader; grant usage on schema stream_db.public to role stream_reader; grant select on stream_source to role stream_reader; grant select on stream_on_table to role stream_reader;
タイプを SERVICE として、新しいSnowflakeサービスユーザーを作成します。例:
create user stream_user type = service;
Snowflakeサービスユーザーに、前の手順で作成したロールを付与します。例:
grant role stream_reader to user stream_user;
ステップ3のSnowflake SERVICE ユーザー向けの キーペア認証 で構成します。
Snowflakeではこの手順を強く推奨します。Openflowがサポートするシークレットマネージャ(AWS、Azure、Hashicorpなど)を構成し、公開キーと秘密キーを秘密ストアに格納します。ただし、ステップ4で生成した秘密キーは、コネクタ構成の構成パラメーターとして直接使用できることに注意してください。このような場合、秘密キーはOpenflowランタイム構成に格納されます。
注釈
何らかの理由でシークレットマネージャを使用したくない場合は、組織のセキュリティポリシーに従って、キーペア認証に使用する公開キーと秘密キーファイルを保護する責任があります。
シークレットマネージャを構成したら、その認証方法を決定します。AWS 上では、Openflowに関連付けられた EC2 インスタンスロールが推奨されます。こうすることで、他の秘密を永続化する必要がなくなるからです。
Openflowで、右上のハンバーガーメニューから、このシークレットマネージャに関連付けられたParameter Providerを構成します。 Controller Settings » Parameter Provider に移動し、パラメーターの値をフェッチします。
この時点で、すべての認証情報を関連するパラメーターパスで参照することができるため、機密性の高い値をOpenflow内で永続化する必要はありません。
コネクタが使用するウェアハウスを指定します。1つのコネクタで1つのテーブルを1つのKafkaトピックに複製できます。このような処理には、最も小さいウェアハウスを選択することができます。
コネクタを設定する¶
データエンジニアとして、以下のタスクを実行してコネクタをインストールおよび構成します。
Openflowの概要ページに移動します。 Featured connectors セクションで、 View more connectors を選択します。
Openflowコネクタのページで、コネクタが通信すべきKafkaブローカーインスタンスの種類に応じてコネクタを検索して選択します。
mTLS バージョン: SSL (相互 TLS)セキュリティプロトコルを使用している場合、または SASL_SSL プロトコルを使用し、自己署名証明書を使用しているブローカーに接続している場合は、このコネクタを選択します。
SASL バージョン: 他のセキュリティプロトコルを使用している場合は、このコネクタを選択します。
Add to runtime を選択します。
Select runtime ダイアログで、 Available runtimes ドロップダウンリストからランタイムを選択します。
Add を選択します。
Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。
Snowflakeアカウント認証情報でランタイムを認証します。
コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。
インポートしたプロセスグループを右クリックし、 Parameters を選択します。
フローパラメーター の説明に従って、必要なパラメーター値を入力します。
フローパラメーター¶
このセクションでは、以下のパラメーターコンテキストに基づいて構成できるフローパラメーターについて説明します。
Kafka Sinkソースパラメーター¶
パラメーター |
説明 |
必須 |
---|---|---|
Snowflakeアカウント識別子 |
データが永続化されるSnowflakeアカウント名(形式: [organization-name]- [account-name])。例: |
有り |
Snowflake認証ストラテジー |
Snowflakeへの認証のストラテジー。可能な値:
|
有り |
ソースデータベース |
ソースデータベース。このデータベースには、消費されるSnowflakeストリームオブジェクトが含まれている必要があります。 |
有り |
Snowflake秘密キーパスワード |
Snowflake秘密キーに関連付けられたパスワード。使用する秘密キーがパスワードで保護されていない場合は不要です。 |
無し |
Snowflakeロール |
クエリ実行時に使用されるSnowflakeロール |
有り |
Snowflakeのユーザー名 |
Snowflakeインスタンスへの接続に使用するユーザー名 |
有り |
Snowflakeウェアハウス |
クエリの実行に使用されるSnowflakeウェアハウス |
有り |
Snowflake秘密キー |
認証に使用される RSA 秘密キー。RSA キーは、 PKCS8 標準に従って書式設定され、標準的な PEM ヘッダーとフッターを持つ必要があります。Snowflake秘密キーファイルまたはSnowflake秘密キーのいずれかを定義する必要があることに注意してください。 |
有り |
Snowflake秘密キーファイル |
Snowflakeへの認証に使用される RSA 秘密キーを含むファイル。 PKCS8 標準に従って書式設定され、標準的な PEM ヘッダーとフッターを持ちます。ヘッダー行は |
無し |
ソーススキーマ |
ソーススキーマ。このスキーマには、消費されるSnowflakeストリームオブジェクトが含まれている必要があります。 |
有り |
Kafka Sink宛先パラメーター¶
パラメーター |
説明 |
必須 |
---|---|---|
Kafkaブートストラップサーバー |
データ送信先のKafkaブローカーのコンマ区切りリスト。 |
有り |
Kafka SASL メカニズム |
認証に使用する SASL メカニズム。Kafka Client
|
有り |
Kafka SASL ユーザー名 |
Kafkaを認証するためのユーザー名。 |
有り |
Kafka SASL パスワード |
Kafkaを認証するためのパスワード。 |
有り |
Kafkaセキュリティプロトコル |
ブローカーとの通信に使用されるセキュリティプロトコル。Kafka Client
|
有り |
Kafkaトピック |
Snowflake ストリームから CDCs を送信するKafkaトピック |
有り |
Kafkaメッセージキーフィールド |
Kafkaメッセージキーとして使用するデータベースの列名を指定します。指定しない場合、メッセージキーは設定されません。指定すると、この列の値がメッセージキーとして使用されます。このパラメーターの値は大文字と小文字を区別します。 |
無し |
Kafkaキーストアファイル名 |
mTLS 認証方法のクライアントキーと証明書を格納するキーストアへのフルパス。mTLS 認証およびセキュリティプロトコルが SSL の場合に必要です。 |
無し |
Kafkaキーストアタイプ |
キーストアのタイプ。mTLS 認証に必要です。可能な値:
|
無し |
Kafkaキーストアのパスワード |
キーストアファイルの保護に使用されるパスワード。 |
無し |
Kafkaキーパスワード |
キーストアに格納されている秘密キーのパスワード。mTLS 認証に必要です。 |
無し |
Kafkaトラストストアファイル名 |
ブローカー証明書を格納するトラストストアへのフルパス。クライアントはこのトラストストアの証明書を使用して、ブローカーのアイデンティティを検証します。 |
無し |
Kafkaトラストストアのタイプ |
トラストストアファイルのタイプ。可能な値:
|
無し |
Kafkaトラストストアのパスワード |
トラストストアファイルのパスワード。 |
無し |
Kafka Sink取り込みパラメーター¶
パラメーター |
説明 |
必須 |
---|---|---|
Snowflake FQN ストリーム名 |
完全修飾されたSnowflakeストリーム名。 |
有り |
フローを実行する¶
プレーンを右クリックし、 Enable all Controller Services を選択します。
インポートしたプロセスグループを右クリックし、 Start を選択します。コネクタがデータの取り込みを開始します。