Openflow Connector for Snowflake to Kafka の設定

注釈

このコネクタは、 Snowflakeコネクタ規約 に従うものとします。

このトピックでは、 Openflow Connector for Snowflake to Kafka を設定する手順について説明します。

前提条件

  1. Openflow Connector for Snowflake to Kafka について を確認してください。

  2. :doc:`/user-guide/data-integration/openflow/setup-openflow-byoc`または:doc:`Openflow のセットアップ - Snowflake デプロイメント</user-guide/data-integration/openflow/setup-openflow-spcs>`があることを確認してください。

  3. 変更をクエリするSnowflakeストリームを作成します。

  4. Snowflakeストリームから CDC メッセージを受信するKafkaトピックを作成します。

Snowflakeアカウントを設定する

Snowflakeアカウント管理者として、以下のタスクを実行します。

  1. コネクタが CDC イベントの読み取りに使用するデータベース、ソーステーブル、ストリームオブジェクトを作成します。例:

    create database stream_db;
    use database stream_db;
    create table stream_source (user_id varchar, data varchar);
    create stream stream_on_table on table stream_source;
    
    Copy
  2. 新しいロールを作成するか、既存のロールを使用し、ストリームとストリームのソースオブジェクトに SELECT 権限を付与します。コネクタには、ストリームとストリームのソースオブジェクトを含むデータベースとスキーマの USAGE 権限も必要です。例:

    create role stream_reader;
    grant usage on database stream_db to role stream_reader;
    grant usage on schema stream_db.public to role stream_reader;
    grant select on stream_source to role stream_reader;
    grant select on stream_on_table to role stream_reader;
    
    Copy
  3. タイプを SERVICE として、新しいSnowflakeサービスユーザーを作成します。例:

    create user stream_user type = service;
    
    Copy
  4. Snowflakeサービスユーザーに、前の手順で作成したロールを付与します。例:

    grant role stream_reader to user stream_user;
    
    Copy
  5. ステップ3のSnowflake SERVICE ユーザー向けの キーペア認証 で構成します。

  6. Snowflakeではこの手順を強く推奨します。Openflowがサポートするシークレットマネージャ(AWS、Azure、Hashicorpなど)を構成し、公開キーと秘密キーを秘密ストアに格納します。ただし、ステップ4で生成した秘密キーは、コネクタ構成の構成パラメーターとして直接使用できることに注意してください。このような場合、秘密キーはOpenflowランタイム構成に格納されます。

    注釈

    何らかの理由でシークレットマネージャを使用したくない場合は、組織のセキュリティポリシーに従って、キーペア認証に使用する公開キーと秘密キーファイルを保護する責任があります。

    1. シークレットマネージャを構成したら、その認証方法を決定します。AWS 上では、Openflowに関連付けられた EC2 インスタンスロールが推奨されます。こうすることで、他の秘密を永続化する必要がなくなるからです。

    2. Openflowで、右上のハンバーガーメニューから、このシークレットマネージャーに関連付けられたパラメータープロバイダーを構成します。Controller Settings » Parameter Provider に移動してから、パラメーター値を取得します。

    3. この時点で、すべての認証情報を関連するパラメーターパスで参照することができるため、機密性の高い値をOpenflow内で永続化する必要はありません。

  7. コネクタが使用するウェアハウスを指定します。1つのコネクタで1つのテーブルを1つのKafkaトピックに複製できます。このような処理には、最も小さいウェアハウスを選択することができます。

コネクタを設定する

データエンジニアとして、以下のタスクを実行してコネクタをインストールおよび構成します。

  1. Openflowの概要ページに移動します。Featured connectors セクションで、 View more connectors を選択します。

  2. Openflowコネクタのページで、コネクタが通信すべきKafkaブローカーインスタンスの種類に応じてコネクタを検索して選択します。

    • mTLS バージョン: SSL (相互 TLS)セキュリティプロトコルを使用している場合、または SASL_SSL プロトコルを使用し、自己署名証明書を使用しているブローカーに接続している場合は、このコネクタを選択します。

    • SASL バージョン: 他のセキュリティプロトコルを使用している場合は、このコネクタを選択します。

  3. Add to runtime を選択します。

  4. Select runtime ダイアログで、 Available runtimes ドロップダウンリストからランタイムを選択します。

  5. Add を選択します。

  6. Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。

  7. Snowflakeアカウント認証情報でランタイムを認証します。

    コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。

  8. インポートしたプロセスグループを右クリックし、 Parameters を選択します。

  9. フローパラメーター の説明に従って、必要なパラメーター値を入力します。

フローパラメーター

このセクションでは、以下のパラメーターコンテキストに基づいて構成できるフローパラメーターについて説明します。

Kafka Sinkソースパラメーター

パラメーター

説明

必須

Snowflakeアカウント識別子

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:データが永続化される[organization-name]-[account-name]形式のSnowflakeアカウント名。

有り

Snowflake認証ストラテジー

以下を使用する場合:

  • Snowflake Openflow Deployment or BYOC: Use SNOWFLAKE_MANAGED_TOKEN. This token is managed automatically by Snowflake. BYOC deployments must have previously configured runtime roles to use SNOWFLAKE_MANAGED_TOKEN.

  • BYOC: 代わりに、BYOC では認証戦略の値として KEY_PAIR を使用できます。

有り

ソースデータベース

ソースデータベース。このデータベースには、消費されるSnowflake Streamオブジェクトが含まれている必要があります。

有り

Snowflake秘密キーパスワード

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:Snowflakeプライベートキーファイルに関連付けられたパスワードを提供します。

無し

Snowflakeロール

以下を使用する場合

  • Session Token Authentication Strategy: Use your Snowflake Role. You can find your Snowflake Role in the Openflow UI, by navigating to View Details for your Runtime.

  • KEY_PAIR Authentication Strategy:サービスユーザーのために構成された有効なロールを使用します。

有り

Snowflakeのユーザー名

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:Snowflakeインスタンスへの接続に使用するユーザー名を提供します。

有り

Snowflake秘密キー

認証戦略にセッショントークンを使用する場合は、空白のままにします。KEY_PAIR を使用する場合は、認証に使用する RSA プライベートキーを提供します。RSA キーは PKCS8 標準に従ってフォーマットされ、標準の PEM ヘッダーとフッターを含んでいる必要があります。SnowflakeプライベートキーファイルまたはSnowflakeプライベートキーのいずれかを定義する必要があることに注意してください。

有り

Snowflake秘密キーファイル

認証戦略にセッショントークンを使用する場合は、空白のままにします。KEY_PAIR を使用する場合、Snowflakeへの認証に使用される RSA プライベートキーを含むファイルをアップロードします。これは、PKCS8 標準に従ってフォーマットされ、標準の PEM ヘッダーとフッターを含んでいる必要があります。ヘッダー行は で始まります -----BEGIN PRIVATE.選択する Reference asset 秘密キーファイルをアップロードするチェックボックス。

無し

ソーススキーマ

ソーススキーマ。このスキーマには、消費されるSnowflakeストリームオブジェクトが含まれている必要があります。

有り

Snowflakeウェアハウス

クエリの実行に使用されるSnowflakeウェアハウス

有り

Kafka Sink宛先パラメーター

パラメーター

説明

必須

Kafkaブートストラップサーバー

データ送信先のKafkaブローカーのコンマ区切りリスト。

有り

Kafka SASL メカニズム

認証に使用する SASL メカニズム。Kafka Client sasl.mechanism プロパティに対応します。可能な値:

  • PLAIN

  • SCRAM-SHA-256

  • SCRAM-SHA-512

  • AWS_MSK_IAM

有り

Kafka SASL ユーザー名

Kafkaを認証するためのユーザー名。

有り

Kafka SASL パスワード

Kafkaを認証するためのパスワード。

有り

Kafkaセキュリティプロトコル

ブローカーとの通信に使用されるセキュリティプロトコル。Kafka Client security.protocol プロパティに対応します。可能な値:

  • PLAINTEXT

  • SASL_PLAINTEXT

  • SASL_SSL

  • SSL

有り

Kafkaトピック

Snowflake ストリームから CDCs を送信するKafkaトピック

有り

Kafkaメッセージキーフィールド

Kafkaメッセージキーとして使用するデータベースの列名を指定します。指定しない場合、メッセージキーは設定されません。指定すると、この列の値がメッセージキーとして使用されます。このパラメーターの値は大文字と小文字を区別します。

無し

Kafkaキーストアファイル名

mTLS 認証方法のクライアントキーと証明書を格納するキーストアへのフルパス。mTLS 認証およびセキュリティプロトコルが SSL の場合に必要です。

無し

Kafkaキーストアタイプ

キーストアのタイプ。mTLS 認証に必要です。可能な値:

  • PKCS12

  • JKS

  • BCFKS

無し

Kafkaキーストアのパスワード

キーストアファイルの保護に使用されるパスワード。

無し

Kafkaキーパスワード

キーストアに格納されている秘密キーのパスワード。mTLS 認証に必要です。

無し

Kafkaトラストストアファイル名

ブローカー証明書を格納するトラストストアへのフルパス。クライアントはこのトラストストアの証明書を使用して、ブローカーのアイデンティティを検証します。

無し

Kafkaトラストストアのタイプ

トラストストアファイルのタイプ。可能な値:

  • PKCS12

  • JKS

  • BCFKS

無し

Kafkaトラストストアのパスワード

トラストストアファイルのパスワード。

無し

Kafka Sink取り込みパラメーター

パラメーター

説明

必須

Snowflake FQN ストリーム名

完全修飾されたSnowflakeストリーム名。

有り

フローを実行する

  1. プレーンを右クリックし、 Enable all Controller Services を選択します。

  2. インポートしたプロセスグループを右クリックし、 Start を選択します。コネクタがデータの取り込みを開始します。