Openflow Connector for Snowflake to Kafka を設定する

注釈

コネクタには、 コネクタ利用規約 が適用されます。

このトピックでは、 Openflow Connector for Snowflake to Kafka を設定する手順について説明します。

前提条件

  1. Openflow Connector for Snowflake to Kafka について を確認してください。

  2. Openflowの設定 - BYOC または Openflowの設定 - Snowflakeデプロイメント - タスク概要 があることを確認してください。

  3. 変更をクエリするSnowflakeストリームを作成します。

  4. Snowflakeストリームから CDC メッセージを受信するKafkaトピックを作成します。

Snowflakeアカウントを設定する

Snowflakeアカウント管理者として、以下のタスクを実行します。

  1. コネクタが CDC イベントの読み取りに使用するデータベース、ソーステーブル、ストリームオブジェクトを作成します。例:

    create database stream_db;
    use database stream_db;
    create table stream_source (user_id varchar, data varchar);
    create stream stream_on_table on table stream_source;
    
    Copy
  2. 新しいロールを作成するか、既存のロールを使用し、ストリームとストリームのソースオブジェクトに SELECT 権限を付与します。コネクタには、ストリームとストリームのソースオブジェクトを含むデータベースとスキーマの USAGE 権限も必要です。例:

    create role stream_reader;
    grant usage on database stream_db to role stream_reader;
    grant usage on schema stream_db.public to role stream_reader;
    grant select on stream_source to role stream_reader;
    grant select on stream_on_table to role stream_reader;
    
    Copy
  3. タイプを SERVICE として、新しいSnowflakeサービスユーザーを作成します。例:

    create user stream_user type = service;
    
    Copy
  4. Snowflakeサービスユーザーに、前の手順で作成したロールを付与します。例:

    grant role stream_reader to user stream_user;
    
    Copy
  5. ステップ3のSnowflake SERVICE ユーザー向けの キーペア認証 で構成します。

  6. Snowflakeではこの手順を強く推奨します。Openflowがサポートするシークレットマネージャ(AWS、Azure、Hashicorpなど)を構成し、公開キーと秘密キーを秘密ストアに格納します。ただし、ステップ4で生成した秘密キーは、コネクタ構成の構成パラメーターとして直接使用できることに注意してください。このような場合、秘密キーはOpenflowランタイム構成に格納されます。

    注釈

    何らかの理由でシークレットマネージャを使用したくない場合は、組織のセキュリティポリシーに従って、キーペア認証に使用する公開キーと秘密キーファイルを保護する責任があります。

    1. シークレットマネージャを構成したら、その認証方法を決定します。AWS 上では、Openflowに関連付けられた EC2 インスタンスロールが推奨されます。こうすることで、他の秘密を永続化する必要がなくなるからです。

    2. Openflowで、右上のハンバーガーメニューから、このシークレットマネージャーに関連付けられたパラメータープロバイダーを構成します。Controller Settings » Parameter Provider に移動してから、パラメーター値を取得します。

    3. この時点で、すべての認証情報を関連するパラメーターパスで参照することができるため、機密性の高い値をOpenflow内で永続化する必要はありません。

  7. コネクタが使用するウェアハウスを指定します。1つのコネクタで1つのテーブルを1つのKafkaトピックに複製できます。このような処理には、最も小さいウェアハウスを選択することができます。

コネクタを設定する

データエンジニアとして、以下のタスクを実行してコネクタをインストールおよび構成します。

  1. Openflowの概要ページに移動します。Featured connectors セクションで、 View more connectors を選択します。

  2. Openflowコネクタのページで、コネクタが通信すべきKafkaブローカーインスタンスの種類に応じてコネクタを検索して選択します。

    • mTLS バージョン: SSL (相互 TLS)セキュリティプロトコルを使用している場合、または SASL_SSL プロトコルを使用し、自己署名証明書を使用しているブローカーに接続している場合は、このコネクタを選択します。

    • SASL バージョン: 他のセキュリティプロトコルを使用している場合は、このコネクタを選択します。

  3. Add to runtime を選択します。

  4. Select runtime ダイアログで、 Available runtimes ドロップダウンリストからランタイムを選択します。

  5. Add を選択します。

  6. Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。

  7. Snowflakeアカウント認証情報でランタイムを認証します。

    コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。

  8. インポートしたプロセスグループを右クリックし、 Parameters を選択します。

  9. フローパラメーター の説明に従って、必要なパラメーター値を入力します。

フローパラメーター

このセクションでは、以下のパラメーターコンテキストに基づいて構成できるフローパラメーターについて説明します。

Kafka Sinkソースパラメーター

パラメーター

説明

必須

Snowflakeアカウント識別子

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:データが永続化される[organization-name]-[account-name]形式のSnowflakeアカウント名。

有り

Snowflake認証ストラテジー

以下を使用する場合:

  • Snowflake Openflow Deployment:SNOWFLAKE_SESSION_TOKEN を使用します。このトークンはSnowflakeによって自動的に管理されます。

  • BYOC:認証戦略の値として KEY_PAIR を使用します。

有り

ソースデータベース

ソースデータベース。このデータベースには、消費されるSnowflake Streamオブジェクトが含まれている必要があります。

有り

Snowflake秘密キーパスワード

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:Snowflakeプライベートキーファイルに関連付けられたパスワードを提供します。

無し

Snowflakeロール

以下を使用する場合

  • Session Token Authentication Strategy:ランタイムロールを使用します。ランタイムの View Details に移動すると、Openflow UI でランタイムロールを見つけることができます。

  • KEY_PAIR Authentication Strategy:サービスユーザーのために構成された有効なロールを使用します。

有り

Snowflakeのユーザー名

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR: Snowflakeインスタンスへの接続に使用するユーザー名を提供します。

有り

Snowflake秘密キー

認証戦略にセッショントークンを使用する場合は、空白のままにします。KEY_PAIR を使用する場合は、認証に使用する RSA プライベートキーを提供します。RSA キーは PKCS8 標準に従ってフォーマットされ、標準の PEM ヘッダーとフッターを含んでいる必要があります。 SnowflakeプライベートキーファイルまたはSnowflakeプライベートキーのいずれかを定義する必要があることに注意してください。

有り

Snowflake秘密キーファイル

認証戦略にセッショントークンを使用する場合は、空白のままにします。KEY_PAIR を使用する場合、Snowflakeへの認証に使用される RSA プライベートキーを含むファイルをアップロードします。これは、PKCS8 標準に従ってフォーマットされ、標準の PEM ヘッダーとフッターを含んでいる必要があります。ヘッダー行は で始まります -----BEGIN PRIVATE.選択する Reference asset 秘密キーファイルをアップロードするチェックボックス。

無し

ソーススキーマ

ソーススキーマ。このスキーマには、消費されるSnowflakeストリームオブジェクトが含まれている必要があります。

有り

Snowflakeウェアハウス

クエリの実行に使用されるSnowflakeウェアハウス

有り

Kafka Sink宛先パラメーター

パラメーター

説明

必須

Kafkaブートストラップサーバー

データ送信先のKafkaブローカーのコンマ区切りリスト。

有り

Kafka SASL メカニズム

認証に使用する SASL メカニズム。Kafka Client sasl.mechanism プロパティに対応します。可能な値:

  • PLAIN

  • SCRAM-SHA-256

  • SCRAM-SHA-512

  • AWS_MSK_IAM

有り

Kafka SASL ユーザー名

Kafkaを認証するためのユーザー名。

有り

Kafka SASL パスワード

Kafkaを認証するためのパスワード。

有り

Kafkaセキュリティプロトコル

ブローカーとの通信に使用されるセキュリティプロトコル。Kafka Client security.protocol プロパティに対応します。可能な値:

  • PLAINTEXT

  • SASL_PLAINTEXT

  • SASL_SSL

  • SSL

有り

Kafkaトピック

Snowflake ストリームから CDCs を送信するKafkaトピック

有り

Kafkaメッセージキーフィールド

Kafkaメッセージキーとして使用するデータベースの列名を指定します。指定しない場合、メッセージキーは設定されません。指定すると、この列の値がメッセージキーとして使用されます。このパラメーターの値は大文字と小文字を区別します。

無し

Kafkaキーストアファイル名

mTLS 認証方法のクライアントキーと証明書を格納するキーストアへのフルパス。mTLS 認証およびセキュリティプロトコルが SSL の場合に必要です。

無し

Kafkaキーストアタイプ

キーストアのタイプ。mTLS 認証に必要です。可能な値:

  • PKCS12

  • JKS

  • BCFKS

無し

Kafkaキーストアのパスワード

キーストアファイルの保護に使用されるパスワード。

無し

Kafkaキーパスワード

キーストアに格納されている秘密キーのパスワード。mTLS 認証に必要です。

無し

Kafkaトラストストアファイル名

ブローカー証明書を格納するトラストストアへのフルパス。クライアントはこのトラストストアの証明書を使用して、ブローカーのアイデンティティを検証します。

無し

Kafkaトラストストアのタイプ

トラストストアファイルのタイプ。可能な値:

  • PKCS12

  • JKS

  • BCFKS

無し

Kafkaトラストストアのパスワード

トラストストアファイルのパスワード。

無し

Kafka Sink取り込みパラメーター

パラメーター

説明

必須

Snowflake FQN ストリーム名

完全修飾されたSnowflakeストリーム名。

有り

フローを実行する

  1. プレーンを右クリックし、 Enable all Controller Services を選択します。

  2. インポートしたプロセスグループを右クリックし、 Start を選択します。コネクタがデータの取り込みを開始します。