Openflow Connector for Kafkaを設定する

注釈

このコネクタは、 Snowflakeコネクタ規約 に従うものとします。

前提条件

  1. Kafka用Snowflake Openflowコネクタ を確認してください。

  2. :doc:`/user-guide/data-integration/openflow/setup-openflow-byoc`または:doc:`Openflow のセットアップ - Snowflake デプロイメント</user-guide/data-integration/openflow/setup-openflow-spcs>`があることを確認してください。

  3. Openflow - Snowflakeデプロイを使用する場合は、必要なドメインの構成 を確認して、Kafkaコネクタに必要なドメインへのアクセス権を付与していることを確認してください。コネクタは、クラスタ内のすべてのKafkaブローカーに接続できる必要があります。

Snowflakeアカウントを設定する

Snowflakeアカウント管理者として、以下のタスクを実行します。

  1. タイプを SERVICE として、新しいSnowflakeサービスユーザーを作成します。

  2. 新しいロールを作成するか、既存のロールを使用して:ref:データベース権限 <label-database_privileges> を付与します。

    コネクタでは、ユーザーが宛先テーブルを作成する必要があります。ユーザーにSnowflakeオブジェクトの管理に必要な権限が付与されていることを確認します。

    オブジェクト

    権限

    メモ

    データベース

    USAGE

    スキーマ

    USAGE

    テーブル

    OWNERSHIP

    コネクタがデータをテーブルに取り込むために必要です。

    Snowflakeは、アクセス制御を改善するために、各Kafkaクラスターに個別のユーザーとロールを作成することをお勧めします。

    以下のスクリプトを使用して、カスタムロールを作成および構成できます(SECURITYADMIN または同等のロールが必要です)。

    USE ROLE securityadmin;
    CREATE ROLE openflow_kafka_connector_role_1;
    
    GRANT USAGE ON DATABASE kafka_db TO ROLE openflow_kafka_connector_role_1;
    GRANT USAGE ON SCHEMA kafka_schema TO ROLE openflow_kafka_connector_role_1;
    

    注釈

    権限はコネクタロールに直接付与する必要があり、継承することはできません。

  3. 宛先テーブルの構成

    Snowflakeは、スキーマの変更にサーバー側のスキーマ進化を使用し、DML エラーのロギングにエラーテーブル を使用することを強くお勧めします。次の例は、テーブルを作成し、適切な OWNERSHIP 権限を追加する方法を示しています。

    USE ROLE openflow_kafka_connector_role_1;
    
    CREATE TABLE kafka_db.kafka_schema.<DESTINATION_TABLE_NAME> (
      kafkaMetadata variant
    )
    ENABLE_SCHEMA_EVOLUTION = TRUE
    ERROR_LOGGING = TRUE;
    
    USE ROLE securityadmin;
    GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE openflow_kafka_connector_role_1;
    

    このコネクタは、スキーマの自動検出と進化をサポートしています。Snowflakeのテーブルの構造は、コネクタによってロードされた新しいデータの構造をサポートするために、自動的に定義され、進化します。記録コンテンツの第1レベルのキーを、名前で一致するテーブル列に自動的にマッピングします(大文字と小文字の区別なし)。

    スキーマ進化を有効にすると、Snowflakeは受信ストリームで検出された新しい列を追加し、NOT NULL 制約をドロップすることで、宛先テーブルを自動的に拡張して、新しいデータパターンに対応することができます。詳細については、 テーブルスキーマの進化 をご参照ください。

    ENABLE_SCHEMA_EVOLUTION が有効になっていない場合は、テーブル定義を拡張してスキーマを手動で作成する必要があります。コネクタは、記録コンテンツの第1レベルのキーを名前でテーブル列と一致させようとします。JSON のキーがテーブル列と一致しない場合、コネクタはキーを無視します。

  4. (オプション)シークレットマネージャーの構成

    Snowflakeではこの手順を強く推奨します。Openflowがサポートするシークレットマネージャ(AWS、Azure、Hashicorpなど)を構成し、公開キーと秘密キーを秘密ストアに格納します。

    1. シークレットマネージャーを構成した後に、構成したマネージャーに対してどのように認証するかを決定します。AWS では、SnowflakeはOpenflowに関連付けられている EC2 インスタンスロールを使用して、他のシークレットを永続化する必要がないようにすることをお勧めします。

    2. Openflowで、右上のハンバーガーメニューから、このシークレットマネージャーに関連付けられたパラメータープロバイダーを構成します。コントローラー設定 > パラメータープロバイダーに移動し、パラメーター値を取得します。

    3. 関連付けられているパラメーターパスですべての認証情報を参照するため、Openflow内で機密性の高い値を永続化する必要はありません。

  5. ユーザーへのアクセス権の付与

    コネクタによって取り込まれた未加工のデータへのアクセスを必要とする(たとえば、Snowflakeでのカスタム処理のため)他のSnowflakeユーザーについては、ステップ1で作成したロールをそれらのユーザーに付与します。

コネクタを設定する

データエンジニアとして、以下のタスクを実行してコネクタをインストールおよび構成します。

コネクタをインストールする

コネクタをインストールするには、次を実行します。

  1. Openflow概要ページに移動します。特集コネクタセクション で、その他のコネクタを表示 を選択します。

  2. Openflowのコネクタページでコネクタを探し、**ランタイムに追加**を選択します。

  3. ランタイムを選択ダイアログで、**利用可能なランタイム**ドロップダウンリストからランタイムを選択し、**追加**を選択します。

    注釈

    コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベース、スキーマおよびテーブルをSnowflakeで作成したことを確認します。

  4. Snowflakeアカウント認証情報でデプロイに対する認証を行い、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたら、許可する を選択します。コネクタのインストールプロセスは数分で完了します。

  5. Snowflakeアカウント認証情報でランタイムを認証します。

    コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。

コネクタを構成する

  1. 必要に応じて、組み込みパラメーターを構成する前に、コネクタ構成をカスタマイズします。

  2. プロセスグループパラメーターを入力する

    1. インポートしたプロセスグループを右クリックし、パラメーターを選択します。

    2. 必要なパラメーター値を入力する

パラメーター

次のテーブルは、Openflow Connector for Kafkaのパラメーターを示しています。

パラメーター

説明

必須

Kafka自動オフセットリセット

Kafkaの auto.offset.reset プロパティに対応する以前のコンシューマーオフセットが見つからない場合は、自動オフセット構成が適用されます。

設定可能な値: 最も古い: オフセットを以前のオフセットに自動的にリセット、最新: オフセットを最新のオフセットに自動的にリセット、なし: コンシューマーグループに対して以前のオフセットが見つからない場合に、コンシューマーに例外をスロー。

デフォルト: 最新

有り

Kafkaブートストラップサーバー

Kafkaブートストラップサーバーのコンマ区切りリストには、ポート(例: kafka-broker:9092)が含まれている必要があります。

有り

Kafkaコンシューマーグループ ID

コネクタが使用するコンシューマーグループの ID。任意ですが、一意でなければなりません。

有り

Kafka SASL パスワード

SASL512 SCRAM メカニズムを使用するときに、構成されたパスワードによって指定されるパスワード

Kafka SASL ユーザー名

SASL512 SCRAM メカニズムを使用するときに、構成されたパスワードによって指定されるユーザー名

Kafkaトピック形式

次のいずれか: 名前/パターン。指定される「Kafkaトピック」が、名前のコンマ区切りリストまたは単一の正規表現であるかどうかを指定します。

有り

Kafkaトピック

Kafkaトピックのコンマ区切りリスト、または正規表現。

有り

Snowflake宛先データベース

データが永続化されるデータベース。Snowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。

有り

Snowflake宛先スキーマ

データが永続化されるスキーマ。Snowflakeにすでに存在している必要があります。これはSnowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。

次の例をご参照ください。

CREATE SCHEMA SCHEMA_NAME または CREATE SCHEMA schema_name: SCHEMA_NAME を使用します。

CREATE SCHEMA "schema_name" または CREATE SCHEMA "SCHEMA_NAME": それぞれ schema_name または SCHEMA_NAME を使用します。

有り

Snowflake宛先テーブル

データが永続化されるテーブル。Snowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。

有り

コネクタを起動します。

  1. プレーンを右クリックし、 Enable all Controller Services を選択します。

  2. プレーンを右クリックし、 Start を選択します。コネクタがデータの取り込みを開始します。

KAFKAMETADATA 列についての理解

コネクタは、Kafka記録に関するメタデータを含む KAFKAMETADATA 構造を入力します。構造には次の情報が含まれています。

フィールド

データ型

説明

topic

String

レコードが由来するKafkaトピックの名前です。

partition

number

トピック内のパーティションの番号です。(これはSnowflakeマイクロパーティションではなく、Kafkaパーティションであることに注意してください。)

offset

number

そのパーティションのオフセットです。

timestamp

number

記録がKafkaに追加された時点のタイムスタンプ。

key

String

メッセージがKafka KeyedMessageの場合、これはそのメッセージのキーです。コネクタが RECORD_METADATA にキーを保存するためには、Kafka構成の key.converter パラメーターを org.apache.kafka.connect.storage.StringConverter に設定する必要があります。そのように設定されていない場合、コネクタはキーを無視します。

headers

オブジェクト

ヘッダーは、レコードに関連付けられたユーザー定義のキーと値のペアです。各レコードには、0、1、または複数のヘッダーを含めることができます。

取り込みのレイテンシの測定

行の変更時間に基づいて変更の追跡、増分処理、およびTime Travelクエリを実行するには、ROW_TIMESTAMP 機能を使用します。

宛先テーブルで次のコマンドを実行して有効にしてください。

ALTER TABLE <DESTINATION_TABLE> SET ROW_TIMESTAMP = TRUE;

行のタイムスタンプが有効になると、テーブルは METADATA$ROW_LAST_COMMIT_TIME 列を公開します。これは、各行が最後に変更された時点のタイムスタンプを返します。

詳細については、 METADATA$ROW_LAST_COMMIT_TIME をご参照ください。

注釈

行のタイムスタンプはインタラクティブテーブルでは使用できません。詳細については、 Snowflakeインタラクティブテーブルとインタラクティブウェアハウス をご参照ください。

Apache Iceberg™テーブルでのコネクタの使用

コネクタは、Snowflakeが管理するApache Iceberg™テーブルにデータを取り込むことができますが、以下の要件を満たしている必要があります。

  • Apache Iceberg™テーブルに関連付けられた外部ボリュームに対する USAGE 権限を付与されている必要があります。

  • コネクタを実行する前にApache Iceberg™テーブルを作成する必要があります。

外部ボリュームの使用許可

たとえば、Icebergテーブルが kafka_external_volume 外部ボリュームを使用し、コネクタがロール openflow_kafka_connector_role を使用する場合、次のステートメントを実行します:

USE ROLE ACCOUNTADMIN;
GRANT USAGE ON EXTERNAL VOLUME kafka_external_volume TO ROLE openflow_kafka_connector_role;

取り込み用のApache Iceberg™テーブルの作成

コネクタは自動的にIcebergテーブルを作成せず、スキーマの進化をサポートしません。コネクタを実行する前に、Icebergテーブルを手動で作成する必要があります。

Icebergテーブルを作成する場合、Icebergデータタイプ(VARIANTなど)または 互換性のあるSnowflakeデータタイプ を使用できます。

例えば、次のようなメッセージを考えてみましょう:

{
  "id": 1,
  "name": "Steve",
  "body_temperature": 36.6,
  "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
  "animals_possessed": {
    "dogs": true,
    "cats": false
  },
  "options": {
    "can_walk": true,
    "can_talk": false
  },
  "date_added": "2024-10-15"
}

例のメッセージ用にIcebergテーブルを作成するには、以下のステートメントの1つを使用します:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
  kafkaMetadata OBJECT(
    topic STRING,
    partition INTEGER,
    offset INTEGER,
    key STRING,
    headers variant,
    timestamp INTEGER
  ),
  id INT,
  name string,
  body_temperature float,
  approved_coffee_types array(string),
  animals_possessed variant,
  date_added date,
  options object(can_walk boolean, can_talk boolean)
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table'
ICEBERG_VERSION = 3;

インタラクティブテーブルでのコネクタの使用

インタラクティブテーブルは、低レイテンシの高同時実行性クエリ向けに最適化されたSnowflakeテーブルの特別なタイプです。詳細については、 Snowflakeインタラクティブテーブルとインタラクティブウェアハウス をご参照ください。

  1. インタラクティブテーブルを作成します。

    CREATE INTERACTIVE TABLE REALTIME_METRICS (
      metric_name VARCHAR,
      metric_value NUMBER,
      source_topic VARCHAR,
      timestamp TIMESTAMP_NTZ
    ) AS (SELECT
          $1:M_NAME::VARCHAR,
          $1:M_VALUE::NUMBER,
          $1:RECORD_METADATA.topic::VARCHAR,
          $1:RECORD_METADATA.timestamp::TIMESTAMP_NTZ
          from TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
    

重要な考慮事項:

  • インタラクティブテーブルには特定の制限とクエリ制限があります。コネクタで使用する前に Snowflakeインタラクティブテーブルとインタラクティブウェアハウス を確認してください。

  • インタラクティブテーブルの場合、必要な変換はテーブル定義で処理する必要があります。

  • インタラクティブテーブルを効率的にクエリするには、インタラクティブウェアハウスが必要です。

宛先テーブルに対する顧客定義スキーマでコネクタを使用

コネクタは、各Kafka記録をSnowflakeテーブルに挿入される行として扱います。たとえば、以下のJSONのように構造化されたメッセージのコンテンツを持つKafkaトピックがある場合:

{
  "order_id": 12345,
  "customer_name": "John",
  "order_total": 100.00,
  "isPaid": true
}

デフォルトでは、ENABLE_SCHEMA_EVOLUTION = TRUE 機能のメリットにより、JSON からすべてのフィールドを指定する必要はありません。ただし、静的スキーマを優先的に使用する場合は、次の処理を実行して作成できます。

CREATE TABLE ORDERS (
  kafkaMetadata OBJECT,
  order_id NUMBER,
  customer_name VARCHAR,
  order_total FLOAT,
  ispaid BOOLEAN
);

顧客定義の PIPE でのコネクタの使用

独自のパイプを作成することを選択した場合は、パイプの COPY INTO ステートメントでデータ変換ロジックを定義できます。必要に応じて列の名前を変更しデータ型をキャストすることができます。例:

CREATE TABLE ORDERS (
  order_id VARCHAR,
  customer_name VARCHAR,
  order_total VARCHAR,
  ispaid VARCHAR
);

CREATE PIPE ORDERS AS
COPY INTO ORDERS
SELECT
  $1:order_id::STRING,
  $1:customer_name,
  $1:order_total::STRING,
  $1:isPaid::STRING
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'));

独自のパイプを定義する場合は、宛先テーブルの列が JSON キーと一致する必要はありません。列の名前を希望の名前に変更し、必要に応じてデータ型をキャストできます。

カスタムパイプで動作するようにコネクタを調整するには、次のタスクを実行します。

  1. OpenflowキャンバスのKafka取り込みフローで使用される PublishSnowpipeStreaming プロセッサーを右クリックします。

  2. コンテキストメニューから[構成する]を選択します。

  3. プロパティタブに移動します。

  4. 宛先タイプフィールドで、[パイプ]を選択します。

  5. パイプフィールドに、PIPE の名前を入力します。

  6. [適用]を選択し、構成を保存します。

エラー処理のカスタマイズ

エラー処理は、Snowpipe Streamingサービス内のOpenflow側の失敗とサーバー側の失敗に分割されます。

  • Openflowエラー(クライアント側の失敗):記録がSnowflakeに到達する前に、解析不可のペイロードやカスタム変換の失敗などのエラーが発生します。デフォルトでは、これらの記録は破棄されます。Openflowでこれらのエラーを処理することが可能です。そのためには、ConsumeKafka プロセッサーで解析失敗関係から FlowFiles を使用します。

  • Snowpipe Streamingエラー(サーバー側の失敗):Snowflakeには正常に到達したものの、宛先テーブルのスキーマと互換性のない(例: 型の不一致)記録のエラーは、Snowflakeインフラストラクチャによってキャプチャされます。宛先テーブルでエラーのロギングが有効になっている(error_logging = true)場合、これらの失敗した行は宛先のエラーテーブルに自動的に取り込まれます。

パフォーマンスのチューニング

Openflow Connector for Kafkaのパフォーマンスチューニング