Openflow Connector for Kafkaを設定する

注釈

コネクタには、 コネクタ利用規約 が適用されます。

前提条件

  1. Openflow Connector for Kafka を確認してください。

  2. Openflowを設定した ことを確認します。

コネクタタイプ

Openflow Connector for Kafkaは3つの異なる構成で利用可能です。それぞれ特定のユースケースに最適化されています。これらのコネクタ定義は、コネクタギャラリーからダウンロードできます。

Apache Kafka for JSON データ形式

スキーマの進化とトピックからテーブルへのマッピングを備えた JSON メッセージ取り込み用の簡易コネクタ

Apache Kafka for AVRO データ形式

スキーマの進化とトピックからテーブルへのマッピングを備えた AVRO メッセージ取り込み用の簡易コネクタ

DLQ とメタデータを使用したApache Kafka

配信不能キュー(DLQ)サポート、メタデータ処理、機能パリティ :doc:`Kafka用Snowflakeコネクタ </user-guide/kafka-connector-overview>`のあるフル機能のコネクタ

特定のコネクタタイプの詳しい構成については、次をご参照ください。

どのコネクタを選ぶべきでしょうか?

データ形式、運用要件、機能のニーズに最適なコネクタバリアントを選択します。

用のApache Kafka JSON または AVRO データ形式 の場合に選択します:

  • Kafkaメッセージは JSON または AVRO 形式です

  • 基本的なスキーマ進化機能が必要です

  • 最小限の構成によるシンプルな設定が必要です

  • 高度なエラー処理や配信不能キュー機能は不要です

  • 新しい統合を設定し、すぐに開始したい場合

形式固有の考慮事項:

  • JSON 形式さまざまなデータ構造に対してより柔軟で、デバッグと検査が容易です

  • AVRO 形式組み込みスキーマレジストリ統合による強力な型付けデータ、構造化データパイプライン向け

次の場合、:doc:` Apache Kafka DLQ およびメタデータ </user-guide/data-integration/openflow/connectors/kafka/kafka-dlq-metadata>` を選択してください:

  • Kafka用Snowflakeコネクタから へ移行中であり、互換性のある機能を備えた機能パラメーターが必要です。

  • 失敗メッセージには堅牢なエラー処理と配信不能キューのサポートが必要です

  • メッセージのインジェスチョンに関する詳細なメタデータ(タイムスタンプ、オフセット、ヘッダー)が必要です。

移行に関する考慮事項

現在、Kafka用のSnowflakeコネクタを使用している場合は、機能の互換性を備えたシームレスな移行体験のため、**Apache Kafka with DLQ and metadata**コネクタを選択してください 。

フィールド名処理の違い:Kafka用Openflowコネクタは、Kafka用Snowflakeコネクタとは異なる方法で、フィールド名の特殊文字を処理します。移行後、Kafka用Openflowコネクタは、これらの命名規則の違いにより、異なる名前の新しいSnowflake列を作成する場合があります。フィールド名の変換方法の詳細については、 フィールド名のマッピングと特殊文字の処理 をご参照ください。

パフォーマンスの考慮事項

  • JSON および AVRO 形式のコネクタは、合理化された設計により、単純なユースケースのパフォーマンスが向上します

  • その DLQ および メタデータコネクタは、わずかに高いリソース使用コストで、より包括的なモニターとエラー処理を提供します。

Snowflakeアカウントを設定する

Snowflakeアカウント管理者として、以下のタスクを実行します。

  1. タイプを SERVICE として、新しいSnowflakeサービスユーザーを作成します。

  2. 新しいロールを作成するか、既存のロールを使用して データベース権限 を付与します。

    コネクタには、宛先テーブルがまだ存在しない場合に自動的に作成する機能があるため、ユーザーがSnowflakeオブジェクトの作成および管理に必要な権限を持っていることを確認してください。

    オブジェクト

    権限

    メモ

    データベース

    USAGE

    スキーマ

    USAGE . CREATE TABLE .

    スキーマレベルのオブジェクトが作成された後に、 CREATE object 権限を取り消すことができます。

    テーブル

    OWNERSHIP

    Kafkaコネクタを使用してデータを 既存の テーブルに取り込む場合にのみ必要です。. コネクタがKafkaトピックの記録の新しいターゲットテーブルを作成する場合、構成で指定されたユーザーのデフォルトのロールがテーブル所有者になります。

    Snowflakeは、アクセス制御を改善するために、各Kafkaインスタンスに個別のユーザーとロールを作成することをお勧めします。

    以下のスクリプトを使用して、カスタムロールを作成および構成できます(SECURITYADMIN または同等のロールが必要です)。

    USE ROLE securityadmin;
    
    CREATE ROLE kafka_connector_role_1;
    GRANT USAGE ON DATABASE kafka_db TO ROLE kafka_connector_role_1;
    GRANT USAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
    GRANT CREATE TABLE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
    
    -- Only for existing tables
    GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE kafka_connector_role_1;
    
    Copy

    権限はコネクタロールに直接付与する必要があり、継承できないことに注意してください。

  3. Snowflakeサービスユーザーに、前の手順で作成したロールを付与します。

    ロールは、ユーザーの既定のロールとして割り当てる必要があります:

    GRANT ROLE kafka_connector_role_1 TO USER kafka_connector_user_1;
    ALTER USER kafka_connector_user_1 SET DEFAULT_ROLE = kafka_connector_role_1;
    
    Copy
  4. ステップ1の Snowflake SERVICEユーザーに対して、キーペア認証 を構成します。

  5. Snowflakeではこの手順を強く推奨します。Openflowがサポートするシークレットマネージャ(AWS、Azure、Hashicorpなど)を構成し、公開キーと秘密キーを秘密ストアに格納します。

    注釈

    何らかの理由でシークレットマネージャを使用したくない場合は、組織のセキュリティポリシーに従って、キーペア認証に使用する公開キーと秘密キーファイルを保護する責任があります。

    1. シークレットマネージャを構成したら、その認証方法を決定します。AWS 上では、Openflowに関連付けられた EC2 インスタンスロールが推奨されます。こうすることで、他の秘密を永続化する必要がなくなるからです。

    2. Openflowで、右上のハンバーガーメニューから、このシークレットマネージャに関連付けられたParameter Providerを構成します。Controller Settings » Parameter Provider に移動し、パラメーターの値をフェッチします。

    3. この時点で、すべての認証情報を関連するパラメーターパスで参照することができるため、機密性の高い値をOpenflow内で永続化する必要はありません。

  6. 他のSnowflakeユーザーが、コネクタによって取り込まれた生の取り込みドキュメントやとテーブルへのアクセスを必要とする場合は(Snowflakeでのカスタム処理のためなど)、それらのユーザーにステップ1で作成したロールを付与します。

コネクタを設定する

データエンジニアとして、以下のタスクを実行してコネクタをインストールおよび構成します。

コネクタをインストールする

  1. Openflowの概要ページに移動します。Featured connectors セクションで、 View more connectors を選択します。

  2. Openflowのコネクタページでコネクタを探し、 Add to runtime を選択します。

  3. Select runtime ダイアログで、 Available runtimes ドロップダウンリストからランタイムを選択します。

  4. Add を選択します。

    注釈

    コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベースとスキーマをSnowflakeで作成したことを確認します。

  5. Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。

  6. Snowflakeアカウント認証情報でランタイムを認証します。

コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。

コネクタを構成する

  1. プロセスグループパラメーターを入力する

    1. インポートしたプロセスグループを右クリックし、 Parameters を選択します。

    2. 共通パラメーター の説明に従って、必要なパラメーター値を入力します。

共通パラメーター

すべてのKafkaコネクタのバリアントは、基本的な接続と認証のための共通パラメーターコンテキストを共有しています。

Snowflake宛先パラメーター

パラメーター

説明

必須

宛先データベース

データが永続化されるデータベース。既にSnowflakeに存在している必要があります

有り

宛先スキーマ

データが永続化されるスキーマ。Snowflakeにすでに存在している必要があります。このパラメーターは大文字と小文字を区別します。スキーマが二重引用符で囲まれた名前で作成されていない限り、このパラメーターを大文字で指定し、大文字と小文字が一致することを確認しますが、二重引用符は含めないでください。次の例をご参照ください。

  • CREATE SCHEMA SCHEMA_NAME または CREATE SCHEMA schema_name - ``SCHEMA_NAME``を使用します

  • CREATE SCHEMA "schema_name" または CREATE SCHEMA "SCHEMA_NAME" - それぞれ schema_name または SCHEMA_NAME を使用します

有り

Snowflakeアカウント識別子

データが永続化されるSnowflakeアカウント名(形式: [organization-name]- [account-name])。

有り

Snowflake認証ストラテジー

Snowflakeへの認証のストラテジー。可能な値: SPCS 上でフローを実行している場合は SNOWFLAKE_SESSION_TOKEN、 秘密キーを使ったアクセスを設定したい場合は KEY_PAIR

有り

Snowflake秘密キー

認証に使用される RSA 秘密キー。RSA キーは、 PKCS8 標準に従って書式設定され、標準的な PEM ヘッダーとフッターを持つ必要があります。Snowflake秘密キーファイルまたはSnowflake秘密キーのどちらかが定義されている必要があります。

無し

Snowflake秘密キーファイル

Snowflakeへの認証に使用される RSA 秘密キーを含むファイル。PKCS8 標準に従って書式設定され、標準的な PEM ヘッダーとフッターを持ちます。ヘッダー行は -----BEGIN PRIVATE で始まります。Reference asset チェックボックスを選択し、秘密キーファイルをアップロードします。

無し

Snowflake秘密キーパスワード

Snowflake秘密キーファイルに関連付けられたパスワード

無し

Snowflakeロール

クエリ実行時に使用されるSnowflakeロール

有り

Snowflakeのユーザー名

Snowflakeインスタンスへの接続に使用するユーザー名

有り

Kafkaソースパラメーター(SASL 認証)

パラメーター

説明

必須

Kafkaセキュリティプロトコル

ブローカーとの通信に使用されるセキュリティプロトコル。Kafka Client security.protocolプロパティに対応します。次のいずれかです: SASL_PLAINTEXT / SASL_SSL

有り

Kafka SASL メカニズム

認証に使用する SASL メカニズム。Kafka Client sasl.mechanismプロパティに対応します。次のいずれかです: PLAIN / SCRAM-SHA-256 / SCRAM-SHA-512

有り

Kafka SASL ユーザー名

Kafkaを認証するためのユーザー名。

有り

Kafka SASL パスワード

Kafkaを認証するためのパスワード。

有り

Kafkaブートストラップサーバー

データをフェッチするKafkaブローカーのコンマ区切りリスト。kafka-broker:9092のように、ポートが含まれる必要があります。DLQ トピックにも同じインスタンスが使用されます。

有り

Kafka取り込みパラメーター

パラメーター

説明

必須

Kafkaトピック形式

次のいずれかです: names / pattern。提供される「Kafkaトピック」がコンマで区切られた名前のリストであるか、単一の正規表現であるかを指定します。

有り

Kafkaトピック

Kafkaトピックのコンマ区切りリスト、または正規表現。

有り

KafkaグループID

コネクタが使用するコンシューマーグループの ID。任意ですが、一意でなければなりません。

有り

Kafka自動オフセットリセット

Kafka auto.offset.reset プロパティに対応する以前のコンシューマーオフセットが見つからない場合に適用される自動オフセット構成。次のいずれかです: earliest / latest. Default: latest

有り

トピックからテーブルへのマッピング

このオプションのパラメーターにより、ユーザーはどのトピックをどのテーブルにマッピングするかを指定できます。各トピックとそのテーブル名はコロンで区切る必要があります(以下を参照)。このテーブル名は、引用符で囲まれていないSnowflakeの有効な識別子である必要があります。正規表現はあいまいであってはならず、一致するトピックは単一のターゲットテーブルのみに一致しなければなりません。空または一致するものがない場合は、トピック名がテーブル名として使用されます。注意: マッピングでは、コンマの後にスペースを含めることはできません。

無し

Topic To Table Map 値の例:

  • topic1:low_range,topic2:low_range,topic5:high_range,topic6:high_range

  • topic[0-4]:low_range,topic[5-9]:high_range

  • .*:destination_table - すべてのトピックを destination_table にマッピングします

バリアント固有の設定を構成する

共通パラメーターを構成した後、選択したコネクタバリアントに固有の設定を構成する必要があります。

Apache Kafka for JSON データ形式 および Apache Kafka for AVRO データ形式 コネクタの場合:

JSON/AVRO 固有のパラメーターについては、 JSON/AVRO データ形式用Apache Kafka をご参照ください。

DLQ とメタデータ コネクタを持つ Apache Kafkaの場合:

DLQ 構成、スキーマ化の設定、Icebergテーブルのサポート、メッセージ形式オプションなどの高度なパラメーターについては、 DLQ とメタデータを使用したApache Kafka をご参照ください。

認証

Kafkaソースパラメーター(SASL 認証) で説明されているように、すべてのコネクタバリアントは、パラメーターコンテキストを介して構成された SASL 認証をサポートしています。

mTLS や AWS MSK IAM を含むその他の認証方法については、 Openflow Connector for Kafkaのその他の認証方法を構成する をご参照ください。

フローを実行する

  1. プレーンを右クリックし、 Enable all Controller Services をクリックします。

  2. プレーンを右クリックし、 Start をクリックします。コネクタがデータの取り込みを開始します。