Openflow Connector for Kafkaを設定する¶
注釈
コネクタには、 コネクタ利用規約 が適用されます。
前提条件¶
Openflow Connector for Kafka を確認してください。
Openflowを設定した ことを確認します。
コネクタタイプ¶
Openflow Connector for Kafkaは3つの異なる構成で利用可能です。それぞれ特定のユースケースに最適化されています。これらのコネクタ定義は、コネクタギャラリーからダウンロードできます。
- Apache Kafka for JSON データ形式
スキーマの進化とトピックからテーブルへのマッピングを備えた JSON メッセージ取り込み用の簡易コネクタ
- Apache Kafka for AVRO データ形式
スキーマの進化とトピックからテーブルへのマッピングを備えた AVRO メッセージ取り込み用の簡易コネクタ
- DLQ とメタデータを使用したApache Kafka
デッドレターキュー(DLQ)のサポート、メタデータ処理、従来のSnowflake connector for Kafkaと同等の機能を備えたフル機能のコネクタ
特定のコネクタタイプの詳しい構成については、次をご参照ください。
JSON/AVRO データ形式用Apache Kafka - JSON/AVRO データ形式コネクタ
DLQ とメタデータを使用したApache Kafka - DLQ およびメタデータコネクタ
Snowflakeアカウントを設定する¶
Snowflakeアカウント管理者として、以下のタスクを実行します。
新しいロールを作成するか、既存のロールを使用して データベース権限 を付与します。
タイプを SERVICE として、新しいSnowflakeサービスユーザーを作成します。
Snowflakeサービスユーザーに、前の手順で作成したロールを付与します。
ステップ2のSnowflake SERVICE ユーザーを key-pair auth で構成します。
Snowflakeではこの手順を強く推奨します。Openflowがサポートするシークレットマネージャ(AWS、Azure、Hashicorpなど)を構成し、公開キーと秘密キーを秘密ストアに格納します。
注釈
何らかの理由でシークレットマネージャを使用したくない場合は、組織のセキュリティポリシーに従って、キーペア認証に使用する公開キーと秘密キーファイルを保護する責任があります。
シークレットマネージャを構成したら、その認証方法を決定します。AWS 上では、Openflowに関連付けられた EC2 インスタンスロールが推奨されます。こうすることで、他の秘密を永続化する必要がなくなるからです。
Openflowで、右上のハンバーガーメニューから、このシークレットマネージャに関連付けられたParameter Providerを構成します。 Controller Settings » Parameter Provider に移動し、パラメーターの値をフェッチします。
この時点で、すべての認証情報を関連するパラメーターパスで参照することができるため、機密性の高い値をOpenflow内で永続化する必要はありません。
他のSnowflakeユーザーが、コネクタによって取り込まれた生の取り込みドキュメントやとテーブルへのアクセスを必要とする場合は(Snowflakeでのカスタム処理のためなど)、それらのユーザーにステップ1で作成したロールを付与します。
コネクタが使用するウェアハウスを指定します。まずは最小のウェアハウスサイズから始め、複製するテーブルの数や転送するデータ量に応じて異なるサイズを試してみてください。テーブル数が大きい場合は、通常、ウェアハウスのサイズを大きくするよりも、 マルチクラスターウェアハウス を使用した方がスケーリングが向上します。
コネクタを設定する¶
データエンジニアとして、以下のタスクを実行してコネクタをインストールおよび構成します。
コネクタをインストールする¶
Openflowの概要ページに移動します。 Featured connectors セクションで、 View more connectors を選択します。
Openflowのコネクタページでコネクタを探し、 Add to runtime を選択します。
Select runtime ダイアログで、 Available runtimes ドロップダウンリストからランタイムを選択します。
Add を選択します。
注釈
コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベースとスキーマをSnowflakeで作成したことを確認します。
Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。
Snowflakeアカウント認証情報でランタイムを認証します。
コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。
コネクタを構成する¶
プロセスグループパラメーターを入力する
インポートしたプロセスグループを右クリックし、 Parameters を選択します。
共通パラメーター の説明に従って、必要なパラメーター値を入力します。
共通パラメーター¶
すべてのKafkaコネクタのバリアントは、基本的な接続と認証のための共通パラメーターコンテキストを共有しています。
Snowflake宛先パラメーター¶
パラメーター |
説明 |
必須 |
---|---|---|
宛先データベース |
データが永続化されるデータベース。既にSnowflakeに存在している必要があります |
有り |
宛先スキーマ |
データが永続化されるスキーマ。既にSnowflakeに存在している必要があります |
有り |
Snowflakeアカウント識別子 |
データが永続化されるSnowflakeアカウント名(形式: [organization-name]- [account-name])。 |
有り |
Snowflake認証ストラテジー |
Snowflakeへの認証のストラテジー。可能な値: SPCS 上でフローを実行している場合は SNOWFLAKE_SESSION_TOKEN、 秘密キーを使ったアクセスを設定したい場合は KEY_PAIR |
有り |
Snowflake秘密キー |
認証に使用される RSA 秘密キー。RSA キーは、 PKCS8 標準に従って書式設定され、標準的な PEM ヘッダーとフッターを持つ必要があります。Snowflake秘密キーファイルまたはSnowflake秘密キーのどちらかが定義されている必要があります。 |
無し |
Snowflake秘密キーファイル |
Snowflakeへの認証に使用される RSA 秘密キーを含むファイル。 PKCS8 標準に従って書式設定され、標準的な PEM ヘッダーとフッターを持ちます。ヘッダー行は |
無し |
Snowflake秘密キーパスワード |
Snowflake秘密キーファイルに関連付けられたパスワード |
無し |
Snowflakeロール |
クエリ実行時に使用されるSnowflakeロール |
有り |
Snowflakeのユーザー名 |
Snowflakeインスタンスへの接続に使用するユーザー名 |
有り |
Snowflakeウェアハウス |
クエリの実行に使用されるSnowflakeウェアハウス |
有り |
Kafkaソースパラメーター(SASL 認証)¶
パラメーター |
説明 |
必須 |
---|---|---|
Kafkaセキュリティプロトコル |
ブローカーとの通信に使用されるセキュリティプロトコル。Kafka Client security.protocolプロパティに対応します。次のいずれかです: SASL_PLAINTEXT / SASL_SSL |
有り |
Kafka SASL メカニズム |
認証に使用する SASL メカニズム。Kafka Client sasl.mechanismプロパティに対応します。次のいずれかです: PLAIN / SCRAM-SHA-256 / SCRAM-SHA-512 |
有り |
Kafka SASL ユーザー名 |
Kafkaを認証するためのユーザー名。 |
有り |
Kafka SASL パスワード |
Kafkaを認証するためのパスワード。 |
有り |
Kafkaブートストラップサーバー |
データをフェッチするKafkaブローカーのコンマ区切りリスト。kafka-broker:9092のように、ポートが含まれる必要があります。DLQ トピックにも同じインスタンスが使用されます。 |
有り |
Kafka取り込みパラメーター¶
パラメーター |
説明 |
必須 |
---|---|---|
Kafkaトピック形式 |
次のいずれかです: names / pattern。提供される「Kafkaトピック」がコンマで区切られた名前のリストであるか、単一の正規表現であるかを指定します。 |
有り |
Kafkaトピック |
Kafkaトピックのコンマ区切りリスト、または正規表現。 |
有り |
KafkaグループID |
コネクタが使用するコンシューマーグループの ID。任意ですが、一意でなければなりません。 |
有り |
Kafka自動オフセットリセット |
Kafka |
有り |
トピックからテーブルへのマッピング |
このオプションのパラメーターにより、ユーザーはどのトピックをどのテーブルにマッピングするかを指定できます。各トピックとそのテーブル名はコロンで区切る必要があります(以下を参照)。このテーブル名は、引用符で囲まれていないSnowflakeの有効な識別子である必要があります。正規表現はあいまいであってはならず、一致するトピックは単一のターゲットテーブルのみに一致しなければなりません。空または一致するものがない場合は、トピック名がテーブル名として使用されます。注意: マッピングでは、コンマの後にスペースを含めることはできません。 |
無し |
Topic To Table Map
値の例:
topic1:low_range,topic2:low_range,topic5:high_range,topic6:high_range
topic[0-4]:low_range,topic[5-9]:high_range
.*:destination_table
- すべてのトピックを destination_table にマッピングします
バリアント固有の設定を構成する¶
共通パラメーターを構成した後、選択したコネクタバリアントに固有の設定を構成する必要があります。
- Apache Kafka for JSON データ形式 および Apache Kafka for AVRO データ形式 コネクタの場合:
JSON/AVRO 固有のパラメーターについては、 JSON/AVRO データ形式用Apache Kafka をご参照ください。
- DLQ とメタデータ コネクタを持つ Apache Kafkaの場合:
DLQ 構成、スキーマ化の設定、Icebergテーブルのサポート、メッセージ形式オプションなどの高度なパラメーターについては、 DLQ とメタデータを使用したApache Kafka をご参照ください。
認証¶
Kafkaソースパラメーター(SASL 認証) で説明されているように、すべてのコネクタバリアントは、パラメーターコンテキストを介して構成された SASL 認証をサポートしています。
mTLS や AWS MSK IAM を含むその他の認証方法については、 Openflow Connector for Kafkaのその他の認証方法を構成する をご参照ください。
フローを実行する¶
プレーンを右クリックし、 Enable all Controller Services をクリックします。
プレーンを右クリックし、 Start をクリックします。コネクタがデータの取り込みを開始します。