Openflow Connector for Kafkaを設定する

注釈

コネクタには、 コネクタ利用規約 が適用されます。

前提条件

  1. Openflow Connector for Kafka を確認してください。

  2. Openflowを設定した ことを確認します。

コネクタタイプ

Openflow Connector for Kafkaは3つの異なる構成で利用可能です。それぞれ特定のユースケースに最適化されています。これらのコネクタ定義は、コネクタギャラリーからダウンロードできます。

Apache Kafka for JSON データ形式

スキーマの進化とトピックからテーブルへのマッピングを備えた JSON メッセージ取り込み用の簡易コネクタ

Apache Kafka for AVRO データ形式

スキーマの進化とトピックからテーブルへのマッピングを備えた AVRO メッセージ取り込み用の簡易コネクタ

DLQ とメタデータを使用したApache Kafka

デッドレターキュー(DLQ)のサポート、メタデータ処理、従来のSnowflake connector for Kafkaと同等の機能を備えたフル機能のコネクタ

特定のコネクタタイプの詳しい構成については、次をご参照ください。

Snowflakeアカウントを設定する

Snowflakeアカウント管理者として、以下のタスクを実行します。

  1. 新しいロールを作成するか、既存のロールを使用して データベース権限 を付与します。

  2. タイプを SERVICE として、新しいSnowflakeサービスユーザーを作成します。

  3. Snowflakeサービスユーザーに、前の手順で作成したロールを付与します。

  4. ステップ2のSnowflake SERVICE ユーザーを key-pair auth で構成します。

  5. Snowflakeではこの手順を強く推奨します。Openflowがサポートするシークレットマネージャ(AWS、Azure、Hashicorpなど)を構成し、公開キーと秘密キーを秘密ストアに格納します。

    注釈

    何らかの理由でシークレットマネージャを使用したくない場合は、組織のセキュリティポリシーに従って、キーペア認証に使用する公開キーと秘密キーファイルを保護する責任があります。

    1. シークレットマネージャを構成したら、その認証方法を決定します。AWS 上では、Openflowに関連付けられた EC2 インスタンスロールが推奨されます。こうすることで、他の秘密を永続化する必要がなくなるからです。

    2. Openflowで、右上のハンバーガーメニューから、このシークレットマネージャに関連付けられたParameter Providerを構成します。 Controller Settings » Parameter Provider に移動し、パラメーターの値をフェッチします。

    3. この時点で、すべての認証情報を関連するパラメーターパスで参照することができるため、機密性の高い値をOpenflow内で永続化する必要はありません。

  6. 他のSnowflakeユーザーが、コネクタによって取り込まれた生の取り込みドキュメントやとテーブルへのアクセスを必要とする場合は(Snowflakeでのカスタム処理のためなど)、それらのユーザーにステップ1で作成したロールを付与します。

  7. コネクタが使用するウェアハウスを指定します。まずは最小のウェアハウスサイズから始め、複製するテーブルの数や転送するデータ量に応じて異なるサイズを試してみてください。テーブル数が大きい場合は、通常、ウェアハウスのサイズを大きくするよりも、 マルチクラスターウェアハウス を使用した方がスケーリングが向上します。

コネクタを設定する

データエンジニアとして、以下のタスクを実行してコネクタをインストールおよび構成します。

コネクタをインストールする

  1. Openflowの概要ページに移動します。 Featured connectors セクションで、 View more connectors を選択します。

  2. Openflowのコネクタページでコネクタを探し、 Add to runtime を選択します。

  3. Select runtime ダイアログで、 Available runtimes ドロップダウンリストからランタイムを選択します。

  4. Add を選択します。

    注釈

    コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベースとスキーマをSnowflakeで作成したことを確認します。

  5. Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。

  6. Snowflakeアカウント認証情報でランタイムを認証します。

コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。

コネクタを構成する

  1. プロセスグループパラメーターを入力する

    1. インポートしたプロセスグループを右クリックし、 Parameters を選択します。

    2. 共通パラメーター の説明に従って、必要なパラメーター値を入力します。

共通パラメーター

すべてのKafkaコネクタのバリアントは、基本的な接続と認証のための共通パラメーターコンテキストを共有しています。

Snowflake宛先パラメーター

パラメーター

説明

必須

宛先データベース

データが永続化されるデータベース。既にSnowflakeに存在している必要があります

有り

宛先スキーマ

データが永続化されるスキーマ。既にSnowflakeに存在している必要があります

有り

Snowflakeアカウント識別子

データが永続化されるSnowflakeアカウント名(形式: [organization-name]- [account-name])。

有り

Snowflake認証ストラテジー

Snowflakeへの認証のストラテジー。可能な値: SPCS 上でフローを実行している場合は SNOWFLAKE_SESSION_TOKEN、 秘密キーを使ったアクセスを設定したい場合は KEY_PAIR

有り

Snowflake秘密キー

認証に使用される RSA 秘密キー。RSA キーは、 PKCS8 標準に従って書式設定され、標準的な PEM ヘッダーとフッターを持つ必要があります。Snowflake秘密キーファイルまたはSnowflake秘密キーのどちらかが定義されている必要があります。

無し

Snowflake秘密キーファイル

Snowflakeへの認証に使用される RSA 秘密キーを含むファイル。 PKCS8 標準に従って書式設定され、標準的な PEM ヘッダーとフッターを持ちます。ヘッダー行は -----BEGIN PRIVATE で始まります。 Reference asset チェックボックスを選択し、秘密キーファイルをアップロードします。

無し

Snowflake秘密キーパスワード

Snowflake秘密キーファイルに関連付けられたパスワード

無し

Snowflakeロール

クエリ実行時に使用されるSnowflakeロール

有り

Snowflakeのユーザー名

Snowflakeインスタンスへの接続に使用するユーザー名

有り

Snowflakeウェアハウス

クエリの実行に使用されるSnowflakeウェアハウス

有り

Kafkaソースパラメーター(SASL 認証)

パラメーター

説明

必須

Kafkaセキュリティプロトコル

ブローカーとの通信に使用されるセキュリティプロトコル。Kafka Client security.protocolプロパティに対応します。次のいずれかです: SASL_PLAINTEXT / SASL_SSL

有り

Kafka SASL メカニズム

認証に使用する SASL メカニズム。Kafka Client sasl.mechanismプロパティに対応します。次のいずれかです: PLAIN / SCRAM-SHA-256 / SCRAM-SHA-512

有り

Kafka SASL ユーザー名

Kafkaを認証するためのユーザー名。

有り

Kafka SASL パスワード

Kafkaを認証するためのパスワード。

有り

Kafkaブートストラップサーバー

データをフェッチするKafkaブローカーのコンマ区切りリスト。kafka-broker:9092のように、ポートが含まれる必要があります。DLQ トピックにも同じインスタンスが使用されます。

有り

Kafka取り込みパラメーター

パラメーター

説明

必須

Kafkaトピック形式

次のいずれかです: names / pattern。提供される「Kafkaトピック」がコンマで区切られた名前のリストであるか、単一の正規表現であるかを指定します。

有り

Kafkaトピック

Kafkaトピックのコンマ区切りリスト、または正規表現。

有り

KafkaグループID

コネクタが使用するコンシューマーグループの ID。任意ですが、一意でなければなりません。

有り

Kafka自動オフセットリセット

Kafka auto.offset.reset プロパティに対応する以前のコンシューマーオフセットが見つからない場合に適用される自動オフセット構成。次のいずれかです: earliest / latest. Default: latest

有り

トピックからテーブルへのマッピング

このオプションのパラメーターにより、ユーザーはどのトピックをどのテーブルにマッピングするかを指定できます。各トピックとそのテーブル名はコロンで区切る必要があります(以下を参照)。このテーブル名は、引用符で囲まれていないSnowflakeの有効な識別子である必要があります。正規表現はあいまいであってはならず、一致するトピックは単一のターゲットテーブルのみに一致しなければなりません。空または一致するものがない場合は、トピック名がテーブル名として使用されます。注意: マッピングでは、コンマの後にスペースを含めることはできません。

無し

Topic To Table Map 値の例:

  • topic1:low_range,topic2:low_range,topic5:high_range,topic6:high_range

  • topic[0-4]:low_range,topic[5-9]:high_range

  • .*:destination_table - すべてのトピックを destination_table にマッピングします

バリアント固有の設定を構成する

共通パラメーターを構成した後、選択したコネクタバリアントに固有の設定を構成する必要があります。

Apache Kafka for JSON データ形式 および Apache Kafka for AVRO データ形式 コネクタの場合:

JSON/AVRO 固有のパラメーターについては、 JSON/AVRO データ形式用Apache Kafka をご参照ください。

DLQ とメタデータ コネクタを持つ Apache Kafkaの場合:

DLQ 構成、スキーマ化の設定、Icebergテーブルのサポート、メッセージ形式オプションなどの高度なパラメーターについては、 DLQ とメタデータを使用したApache Kafka をご参照ください。

認証

Kafkaソースパラメーター(SASL 認証) で説明されているように、すべてのコネクタバリアントは、パラメーターコンテキストを介して構成された SASL 認証をサポートしています。

mTLS や AWS MSK IAM を含むその他の認証方法については、 Openflow Connector for Kafkaのその他の認証方法を構成する をご参照ください。

フローを実行する

  1. プレーンを右クリックし、 Enable all Controller Services をクリックします。

  2. プレーンを右クリックし、 Start をクリックします。コネクタがデータの取り込みを開始します。