Openflow Connector for Snowflake to Kafka を設定する

注釈

コネクタには、 コネクタ利用規約 が適用されます。

このトピックでは、 Openflow Connector for Snowflake to Kafka を設定する手順について説明します。

前提条件

  1. Openflow Connector for Snowflake to Kafka について を確認してください。

  2. Openflowを設定した ことを確認します。

  3. 変更をクエリするSnowflakeストリームを作成します。

  4. Snowflakeストリームから CDC メッセージを受信するKafkaトピックを作成します。

Snowflakeアカウントを設定する

Snowflakeアカウント管理者として、以下のタスクを実行します。

  1. コネクタが CDC イベントの読み取りに使用するデータベース、ソーステーブル、ストリームオブジェクトを作成します。例:

    create database stream_db;
    use database stream_db;
    create table stream_source (user_id varchar, data varchar);
    create stream stream_on_table on table stream_source;
    
    Copy
  2. 新しいロールを作成するか、既存のロールを使用し、ストリームとストリームのソースオブジェクトに SELECT 権限を付与します。コネクタには、ストリームとストリームのソースオブジェクトを含むデータベースとスキーマの USAGE 権限も必要です。例:

    create role stream_reader;
    grant usage on database stream_db to role stream_reader;
    grant usage on schema stream_db.public to role stream_reader;
    grant select on stream_source to role stream_reader;
    grant select on stream_on_table to role stream_reader;
    
    Copy
  3. タイプを SERVICE として、新しいSnowflakeサービスユーザーを作成します。例:

    create user stream_user type = service;
    
    Copy
  4. Snowflakeサービスユーザーに、前の手順で作成したロールを付与します。例:

    grant role stream_reader to user stream_user;
    
    Copy
  5. ステップ3のSnowflake SERVICE ユーザー向けの キーペア認証 で構成します。

  6. Snowflakeではこの手順を強く推奨します。Openflowがサポートするシークレットマネージャ(AWS、Azure、Hashicorpなど)を構成し、公開キーと秘密キーを秘密ストアに格納します。ただし、ステップ4で生成した秘密キーは、コネクタ構成の構成パラメーターとして直接使用できることに注意してください。このような場合、秘密キーはOpenflowランタイム構成に格納されます。

    注釈

    何らかの理由でシークレットマネージャを使用したくない場合は、組織のセキュリティポリシーに従って、キーペア認証に使用する公開キーと秘密キーファイルを保護する責任があります。

    1. シークレットマネージャを構成したら、その認証方法を決定します。AWS 上では、Openflowに関連付けられた EC2 インスタンスロールが推奨されます。こうすることで、他の秘密を永続化する必要がなくなるからです。

    2. Openflowで、右上のハンバーガーメニューから、このシークレットマネージャに関連付けられたParameter Providerを構成します。 Controller Settings » Parameter Provider に移動し、パラメーターの値をフェッチします。

    3. この時点で、すべての認証情報を関連するパラメーターパスで参照することができるため、機密性の高い値をOpenflow内で永続化する必要はありません。

  7. コネクタが使用するウェアハウスを指定します。1つのコネクタで1つのテーブルを1つのKafkaトピックに複製できます。このような処理には、最も小さいウェアハウスを選択することができます。

コネクタを設定する

データエンジニアとして、以下のタスクを実行してコネクタをインストールおよび構成します。

  1. Openflowの概要ページに移動します。 Featured connectors セクションで、 View more connectors を選択します。

  2. Openflowコネクタのページで、コネクタが通信すべきKafkaブローカーインスタンスの種類に応じてコネクタを検索して選択します。

    • mTLS バージョン: SSL (相互 TLS)セキュリティプロトコルを使用している場合、または SASL_SSL プロトコルを使用し、自己署名証明書を使用しているブローカーに接続している場合は、このコネクタを選択します。

    • SASL バージョン: 他のセキュリティプロトコルを使用している場合は、このコネクタを選択します。

  3. Add to runtime を選択します。

  4. Select runtime ダイアログで、 Available runtimes ドロップダウンリストからランタイムを選択します。

  5. Add を選択します。

  6. Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。

  7. Snowflakeアカウント認証情報でランタイムを認証します。

    コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。

  8. インポートしたプロセスグループを右クリックし、 Parameters を選択します。

  9. フローパラメーター の説明に従って、必要なパラメーター値を入力します。

フローパラメーター

このセクションでは、以下のパラメーターコンテキストに基づいて構成できるフローパラメーターについて説明します。

Kafka Sinkソースパラメーター

パラメーター

説明

必須

Snowflakeアカウント識別子

データが永続化されるSnowflakeアカウント名(形式: [organization-name]- [account-name])。例: example.snowflakecomputing.com

有り

Snowflake認証ストラテジー

Snowflakeへの認証のストラテジー。可能な値:

  • SNOWFLAKE_SESSION_TOKEN: SPCS でコネクタを実行する場合

  • KEY_PAIR: 秘密キーを使ったアクセスを設定する場合

有り

ソースデータベース

ソースデータベース。このデータベースには、消費されるSnowflakeストリームオブジェクトが含まれている必要があります。

有り

Snowflake秘密キーパスワード

Snowflake秘密キーに関連付けられたパスワード。使用する秘密キーがパスワードで保護されていない場合は不要です。

無し

Snowflakeロール

クエリ実行時に使用されるSnowflakeロール

有り

Snowflakeのユーザー名

Snowflakeインスタンスへの接続に使用するユーザー名

有り

Snowflakeウェアハウス

クエリの実行に使用されるSnowflakeウェアハウス

有り

Snowflake秘密キー

認証に使用される RSA 秘密キー。RSA キーは、 PKCS8 標準に従って書式設定され、標準的な PEM ヘッダーとフッターを持つ必要があります。Snowflake秘密キーファイルまたはSnowflake秘密キーのいずれかを定義する必要があることに注意してください。

有り

Snowflake秘密キーファイル

Snowflakeへの認証に使用される RSA 秘密キーを含むファイル。 PKCS8 標準に従って書式設定され、標準的な PEM ヘッダーとフッターを持ちます。ヘッダー行は -----BEGIN PRIVATE で始まります。 Reference asset チェックボックスを選択し、秘密キーファイルをアップロードします。

無し

ソーススキーマ

ソーススキーマ。このスキーマには、消費されるSnowflakeストリームオブジェクトが含まれている必要があります。

有り

Kafka Sink宛先パラメーター

パラメーター

説明

必須

Kafkaブートストラップサーバー

データ送信先のKafkaブローカーのコンマ区切りリスト。

有り

Kafka SASL メカニズム

認証に使用する SASL メカニズム。Kafka Client sasl.mechanism プロパティに対応します。可能な値:

  • PLAIN

  • SCRAM-SHA-256

  • SCRAM-SHA-512

  • AWS_MSK_IAM

有り

Kafka SASL ユーザー名

Kafkaを認証するためのユーザー名。

有り

Kafka SASL パスワード

Kafkaを認証するためのパスワード。

有り

Kafkaセキュリティプロトコル

ブローカーとの通信に使用されるセキュリティプロトコル。Kafka Client security.protocol プロパティに対応します。可能な値:

  • PLAINTEXT

  • SASL_PLAINTEXT

  • SASL_SSL

  • SSL

有り

Kafkaトピック

Snowflake ストリームから CDCs を送信するKafkaトピック

有り

Kafkaメッセージキーフィールド

Kafkaメッセージキーとして使用するデータベースの列名を指定します。指定しない場合、メッセージキーは設定されません。指定すると、この列の値がメッセージキーとして使用されます。このパラメーターの値は大文字と小文字を区別します。

無し

Kafkaキーストアファイル名

mTLS 認証方法のクライアントキーと証明書を格納するキーストアへのフルパス。mTLS 認証およびセキュリティプロトコルが SSL の場合に必要です。

無し

Kafkaキーストアタイプ

キーストアのタイプ。mTLS 認証に必要です。可能な値:

  • PKCS12

  • JKS

  • BCFKS

無し

Kafkaキーストアのパスワード

キーストアファイルの保護に使用されるパスワード。

無し

Kafkaキーパスワード

キーストアに格納されている秘密キーのパスワード。mTLS 認証に必要です。

無し

Kafkaトラストストアファイル名

ブローカー証明書を格納するトラストストアへのフルパス。クライアントはこのトラストストアの証明書を使用して、ブローカーのアイデンティティを検証します。

無し

Kafkaトラストストアのタイプ

トラストストアファイルのタイプ。可能な値:

  • PKCS12

  • JKS

  • BCFKS

無し

Kafkaトラストストアのパスワード

トラストストアファイルのパスワード。

無し

Kafka Sink取り込みパラメーター

パラメーター

説明

必須

Snowflake FQN ストリーム名

完全修飾されたSnowflakeストリーム名。

有り

フローを実行する

  1. プレーンを右クリックし、 Enable all Controller Services を選択します。

  2. インポートしたプロセスグループを右クリックし、 Start を選択します。コネクタがデータの取り込みを開始します。