Openflow Connector for Snowflake to Kafka を設定する¶
注釈
コネクタには、 コネクタ利用規約 が適用されます。
このトピックでは、 Openflow Connector for Snowflake to Kafka を設定する手順について説明します。
前提条件¶
Openflowの設定 - BYOC または Openflowの設定 - Snowflakeデプロイメント - タスク概要 があることを確認してください。
変更をクエリするSnowflakeストリームを作成します。
Snowflakeストリームから CDC メッセージを受信するKafkaトピックを作成します。
Snowflakeアカウントを設定する¶
Snowflakeアカウント管理者として、以下のタスクを実行します。
コネクタが CDC イベントの読み取りに使用するデータベース、ソーステーブル、ストリームオブジェクトを作成します。例:
create database stream_db; use database stream_db; create table stream_source (user_id varchar, data varchar); create stream stream_on_table on table stream_source;
新しいロールを作成するか、既存のロールを使用し、ストリームとストリームのソースオブジェクトに SELECT 権限を付与します。コネクタには、ストリームとストリームのソースオブジェクトを含むデータベースとスキーマの USAGE 権限も必要です。例:
create role stream_reader; grant usage on database stream_db to role stream_reader; grant usage on schema stream_db.public to role stream_reader; grant select on stream_source to role stream_reader; grant select on stream_on_table to role stream_reader;
タイプを SERVICE として、新しいSnowflakeサービスユーザーを作成します。例:
create user stream_user type = service;
Snowflakeサービスユーザーに、前の手順で作成したロールを付与します。例:
grant role stream_reader to user stream_user;
ステップ3のSnowflake SERVICE ユーザー向けの キーペア認証 で構成します。
Snowflakeではこの手順を強く推奨します。Openflowがサポートするシークレットマネージャ(AWS、Azure、Hashicorpなど)を構成し、公開キーと秘密キーを秘密ストアに格納します。ただし、ステップ4で生成した秘密キーは、コネクタ構成の構成パラメーターとして直接使用できることに注意してください。このような場合、秘密キーはOpenflowランタイム構成に格納されます。
注釈
何らかの理由でシークレットマネージャを使用したくない場合は、組織のセキュリティポリシーに従って、キーペア認証に使用する公開キーと秘密キーファイルを保護する責任があります。
シークレットマネージャを構成したら、その認証方法を決定します。AWS 上では、Openflowに関連付けられた EC2 インスタンスロールが推奨されます。こうすることで、他の秘密を永続化する必要がなくなるからです。
Openflowで、右上のハンバーガーメニューから、このシークレットマネージャーに関連付けられたパラメータープロバイダーを構成します。Controller Settings » Parameter Provider に移動してから、パラメーター値を取得します。
この時点で、すべての認証情報を関連するパラメーターパスで参照することができるため、機密性の高い値をOpenflow内で永続化する必要はありません。
コネクタが使用するウェアハウスを指定します。1つのコネクタで1つのテーブルを1つのKafkaトピックに複製できます。このような処理には、最も小さいウェアハウスを選択することができます。
コネクタを設定する¶
データエンジニアとして、以下のタスクを実行してコネクタをインストールおよび構成します。
Openflowの概要ページに移動します。Featured connectors セクションで、 View more connectors を選択します。
Openflowコネクタのページで、コネクタが通信すべきKafkaブローカーインスタンスの種類に応じてコネクタを検索して選択します。
mTLS バージョン: SSL (相互 TLS)セキュリティプロトコルを使用している場合、または SASL_SSL プロトコルを使用し、自己署名証明書を使用しているブローカーに接続している場合は、このコネクタを選択します。
SASL バージョン: 他のセキュリティプロトコルを使用している場合は、このコネクタを選択します。
Add to runtime を選択します。
Select runtime ダイアログで、 Available runtimes ドロップダウンリストからランタイムを選択します。
Add を選択します。
Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。
Snowflakeアカウント認証情報でランタイムを認証します。
コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。
インポートしたプロセスグループを右クリックし、 Parameters を選択します。
フローパラメーター の説明に従って、必要なパラメーター値を入力します。
フローパラメーター¶
このセクションでは、以下のパラメーターコンテキストに基づいて構成できるフローパラメーターについて説明します。
Kafka Sinkソースパラメーター¶
パラメーター |
説明 |
必須 |
|---|---|---|
Snowflakeアカウント識別子 |
以下を使用する場合:
|
有り |
Snowflake認証ストラテジー |
以下を使用する場合:
|
有り |
ソースデータベース |
ソースデータベース。このデータベースには、消費されるSnowflake Streamオブジェクトが含まれている必要があります。 |
有り |
Snowflake秘密キーパスワード |
以下を使用する場合:
|
無し |
Snowflakeロール |
以下を使用する場合
|
有り |
Snowflakeのユーザー名 |
以下を使用する場合:
|
有り |
Snowflake秘密キー |
認証戦略にセッショントークンを使用する場合は、空白のままにします。KEY_PAIR を使用する場合は、認証に使用する RSA プライベートキーを提供します。RSA キーは PKCS8 標準に従ってフォーマットされ、標準の PEM ヘッダーとフッターを含んでいる必要があります。 SnowflakeプライベートキーファイルまたはSnowflakeプライベートキーのいずれかを定義する必要があることに注意してください。 |
有り |
Snowflake秘密キーファイル |
認証戦略にセッショントークンを使用する場合は、空白のままにします。KEY_PAIR を使用する場合、Snowflakeへの認証に使用される RSA プライベートキーを含むファイルをアップロードします。これは、PKCS8 標準に従ってフォーマットされ、標準の PEM ヘッダーとフッターを含んでいる必要があります。ヘッダー行は で始まります |
無し |
ソーススキーマ |
ソーススキーマ。このスキーマには、消費されるSnowflakeストリームオブジェクトが含まれている必要があります。 |
有り |
Snowflakeウェアハウス |
クエリの実行に使用されるSnowflakeウェアハウス |
有り |
Kafka Sink宛先パラメーター¶
パラメーター |
説明 |
必須 |
|---|---|---|
Kafkaブートストラップサーバー |
データ送信先のKafkaブローカーのコンマ区切りリスト。 |
有り |
Kafka SASL メカニズム |
認証に使用する SASL メカニズム。Kafka Client
|
有り |
Kafka SASL ユーザー名 |
Kafkaを認証するためのユーザー名。 |
有り |
Kafka SASL パスワード |
Kafkaを認証するためのパスワード。 |
有り |
Kafkaセキュリティプロトコル |
ブローカーとの通信に使用されるセキュリティプロトコル。Kafka Client
|
有り |
Kafkaトピック |
Snowflake ストリームから CDCs を送信するKafkaトピック |
有り |
Kafkaメッセージキーフィールド |
Kafkaメッセージキーとして使用するデータベースの列名を指定します。指定しない場合、メッセージキーは設定されません。指定すると、この列の値がメッセージキーとして使用されます。このパラメーターの値は大文字と小文字を区別します。 |
無し |
Kafkaキーストアファイル名 |
mTLS 認証方法のクライアントキーと証明書を格納するキーストアへのフルパス。mTLS 認証およびセキュリティプロトコルが SSL の場合に必要です。 |
無し |
Kafkaキーストアタイプ |
キーストアのタイプ。mTLS 認証に必要です。可能な値:
|
無し |
Kafkaキーストアのパスワード |
キーストアファイルの保護に使用されるパスワード。 |
無し |
Kafkaキーパスワード |
キーストアに格納されている秘密キーのパスワード。mTLS 認証に必要です。 |
無し |
Kafkaトラストストアファイル名 |
ブローカー証明書を格納するトラストストアへのフルパス。クライアントはこのトラストストアの証明書を使用して、ブローカーのアイデンティティを検証します。 |
無し |
Kafkaトラストストアのタイプ |
トラストストアファイルのタイプ。可能な値:
|
無し |
Kafkaトラストストアのパスワード |
トラストストアファイルのパスワード。 |
無し |
Kafka Sink取り込みパラメーター¶
パラメーター |
説明 |
必須 |
|---|---|---|
Snowflake FQN ストリーム名 |
完全修飾されたSnowflakeストリーム名。 |
有り |
フローを実行する¶
プレーンを右クリックし、 Enable all Controller Services を選択します。
インポートしたプロセスグループを右クリックし、 Start を選択します。コネクタがデータの取り込みを開始します。