Snowflake High Performance connector for Kafka の動作方法

このトピックでは、コネクタのさまざまな側面、テーブルとパイプの動作方法、およびコネクタの構成方法について説明します。

コネクタがテーブルとパイプで動作する方法

Kafka用の高性能Snowflakeコネクタでは、宛先テーブルを手動で作成する必要があります。コネクタは、各Kafka記録をSnowflakeテーブルに挿入される行として扱います。たとえば、以下のように構造化されたメッセージのコンテンツを持つKafkaトピックがある場合:

{
  "order_id": 12345,
  "customer_name": "John",
  "order_total": 100.00,
  "isPaid": true
}
Copy

JSON キーに一致する列を持つテーブルを作成し、 {tableName}-STREAMING という名前のデフォルトパイプに依存できます。これは、記録コンテンツの第1レベルのキーを、名前(大文字と小文字の区別なし)で一致するテーブル列に自動的にマッピングします。

CREATE TABLE ORDERS (
  record_metadata VARIANT,
  order_id NUMBER,
  customer_name VARCHAR,
  order_total NUMBER,
  ispaid BOOLEAN
);
Copy

独自のパイプを作成することを選択した場合は、パイプの COPY INTO ステートメントでデータ変換ロジックを定義できます。必要に応じて列の名前を変更し、必要に応じてデータ型をキャストできます。例:

CREATE TABLE ORDERS (
  order_id VARCHAR,
  customer_name VARCHAR,
  order_total VARCHAR,
  ispaid VARCHAR
);
Copy
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
  $1:order_id::STRING,
  $1:customer_name,
  $1:order_total::STRING,
  $1:isPaid::STRING
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

または

CREATE TABLE ORDERS (
 topic VARCHAR,
 partition VARCHAR,
 order_id VARCHAR,
 customer_name VARCHAR,
 order_total VARCHAR,
 ispaid VARCHAR
);
Copy
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
  $1:RECORD_METADATA.topic::STRING AS topic,
  $1:RECORD_METADATA.partition::STRING AS partition,
  $1['order_id']::STRING AS order_id,
  $1['customer_name']::STRING as customer_name,
  CONCAT($1['order_total']::STRING, ' USD') AS order_total,
  $1['isPaid']::STRING AS ispaid
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

独自のパイプを定義する場合は、宛先テーブルの列が JSON キーと一致する必要はありません。列の名前を希望の名前に変更し、必要に応じてデータ型をキャストできます。

トピック名、テーブル名、パイプ名

構成設定に応じて、コネクタは宛先テーブルに異なる名前を使用します。宛先テーブル名は、常にトピック名から派生します。

コネクタがトピック名を宛先テーブルにマップする方法

Kafkaコネクタは、Kafkaトピック名をSnowflakeテーブル名にマッピングするための2つのモードを提供します。

  • 静的マッピング:コネクタは、Kafkaトピック名のみを使用して宛先テーブル名を導出します。

  • 明示的なトピックからテーブルへのマッピングモード:snowflake.topic2table.map 構成パラメーターを使用して、トピックとテーブル間のカスタムマッピングを指定します。

静的マッピング

snowflake.topic2table.map パラメーターを構成しない場合、コネクタは常にトピック名からテーブル名を導出します。

テーブル名の生成:

コネクタは、次のルールを使用してトピック名から宛先テーブル名を導出します。

  1. トピック名が有効なSnowflake識別子(文字またはアンダースコアで始まり、文字、数字、アンダースコア、またはドル記号のみを含む)の場合、コネクタはトピック名をテーブル名として使用します(大文字に変換)。

  2. トピック名に無効な文字が含まれている場合、コネクタは:

    • 無効な文字をアンダースコアに置き換えます

    • 一意性を確保するために、アンダースコアの後にハッシュコードを追加します

    • たとえば、トピック my-topic.dataMY_TOPIC_DATA_<ハッシュ> になります

パイプ名の決定:

コネクタは、次のロジックに基づいて、どのパイプを使用するかを決定します。

  1. コネクタは、宛先テーブル名と同じ名前のパイプが存在するかどうかを確認します。

  2. その名前のユーザー作成パイプが存在する場合、コネクタはそのパイプ(ユーザー定義パイプモード)を使用します。

  3. そうでない場合、コネクタは {tableName}-STREAMING という名前のデフォルトのパイプを使用します。

注釈

Snowflakeは、予測可能なテーブル名を確保するために、Snowflake識別子名のルールに従うトピック名を選択することをお勧めします。

RECORD_METADATA を理解する

コネクタは、Kafka記録に関するメタデータを含む RECORD_METADATA 構造を入力します。このメタデータは、Snowpipe Streamingデータソースを通じてSnowflakeに送信され、Snowflakeでは $1:RECORD_METADATA アクセサーを使用してパイプ変換で利用可能になります。 RECORD_METADATA 構造は、ユーザー定義パイプとデフォルトパイプモードの両方で使用できます。その内容は、 VARIANT 型の列に保存できます。または個々のフィールドを抽出して、個別の列に保存できます。

変換とメタデータを含むパイプの例:

CREATE PIPE ORDERS AS
COPY INTO ORDERS_TABLE
FROM (
  SELECT
    $1:order_id::NUMBER,
    $1:customer_name,
    $1:order_total,
    $1:RECORD_METADATA.topic AS source_topic,
    $1:RECORD_METADATA.offset::NUMBER AS kafka_offset,
    $1:RECORD_METADATA.SnowflakeConnectorPushTime::BIGINT AS ingestion_time
  FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

この例では:

  • パイプは、Kafkaメッセージから特定のフィールド(order_id、customer_name、order_total)を抽出します。

  • また、メタデータフィールド(トピック、オフセット、インジェスションタイムスタンプ)もキャプチャします。

  • 値は、必要に応じてキャストおよび/または変換できます

メタデータフィールドの入力方法

コネクタは、Kafka記録プロパティとコネクタ構成に基づいてメタデータフィールドを自動的に入力します。これらの構成パラメーターを使用して、どのメタデータフィールドを含めるかを制御できます。

  • snowflake.metadata.topic (デフォルト: true) - トピック名を含む

  • snowflake.metadata.offset.and.partition (デフォルト: true) - オフセットとパーティションを含む

  • snowflake.metadata.createtime (デフォルト: true) - Kafka記録のタイムスタンプを含む

  • snowflake.metadata.all (デフォルト: true) - 利用可能なすべてのメタデータを含む

snowflake.metadata.all=true (デフォルト)の場合、すべてのメタデータフィールドが入力されます。個々のメタデータフラグを false に設定すると、 RECORD_METADATA 構造から特定のフィールドを除外します。

注釈

SnowflakeConnectorPushTime フィールドは常に利用可能で、コネクタが記録をインジェスションバッファーにプッシュした時刻を表します。これは、エンドツーエンドのインジェスションレイテンシを計算するのに便利です。

RECORD_METADATA 構造には、デフォルトで次の情報が含まれています。

フィールド

データ型

説明

topic

String

レコードが由来するKafkaトピックの名前です。

partition

String

トピック内のパーティションの番号です。(これはSnowflakeマイクロパーティションではなく、Kafkaパーティションであることに注意してください。)

offset

number

そのパーティションのオフセットです。

CreateTime / . LogAppendTime

number

これは、Kafkaトピックのメッセージに関連付けられたタイムスタンプです。値は、1970年1月1日午前0時 UTCからのミリ秒です。詳細については、https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/ProducerRecord.htmlをご参照ください。

SnowflakeConnectorPushTime

number

記録がインジェスト SDK バッファにプッシュされたときのタイムスタンプ。値は、1970年1月1日午前0時 UTC からのミリ秒数です。詳細については、 インジェスチョンレイテンシの推定 を参照してください。

key

String

メッセージがKafka KeyedMessageの場合、これはそのメッセージのキーです。コネクタが RECORD_METADATA にキーを保存するには、 Kafka構成プロパティ のkey.converterパラメーターを「org.apache.kafka.connect.storage.StringConverter」に設定する必要があります。そうでない場合、コネクターはキーを無視します。

headers

オブジェクト

ヘッダーは、レコードに関連付けられたユーザー定義のキーと値のペアです。 各レコードには、0、1、または複数のヘッダーを含めることができます。

The amount of metadata recorded in the RECORD_METADATA column is configurable using optional Kafka configuration properties.

フィールド名と値は大文字と小文字が区別されます。

インジェスション前のKafka記録の変換方法

各行がSnowpipe Streamingに渡される前に、コネクタはKafka Connectの記録値を Map<文字列、オブジェクト> に変換します。このキーはターゲット列名と一致する必要があります(または、ユーザー定義のパイプ内で変換できます)。プリミティブ文字列、バイト配列、または数値は(たとえば HoistField SMT を使用して)ラップする必要があります。それにより、コネクタは構造化オブジェクトを受け取れるようになります。コンバーターは次のルールを適用します。

  • Null値はtombstoneとして扱われます。behavior.on.null.values=IGNORE または、空の JSON オブジェクトとして取り込まれる場合はスキップされます。

  • 数値およびブールフィールドはそのまま渡されます。精度が38より大きい10進値は、Snowflakeの NUMBER の制限内にとどまる文字列としてシリアル化されます。

  • byte[] および ByteBuffer ペイロードはBase64でエンコードされた文字列であるため、 VARIANT または VARCHAR 列に保存します。

  • 配列は配列のままであり、ネストされたオブジェクトはネストされたマップのままです。デフォルトのパイプに依存してネストされたデータをそのままにする場合、 VARIANT 列を宣言します。

  • 非文字列キーを持つマップは、 [key, value] ペアの配列として出力されます。これは、Snowflakeの列名はテキストである必要があるためです。

  • 関連するメタデータフラグが有効になっている場合は常に、記録のヘッダーとキーは RECORD_METADATA にコピーされます。

メッセージ本文全体を単一の列として保持する必要がある場合は、 SMTs を使用して新しいトップレベルフィールドにラップします。 変換パターンについては レガシー RECORD_CONTENT 列 をご参照ください。

ユーザー定義パイプモードとデフォルトのパイプモード

コネクタは、データインジェスションを管理するために2つのモードをサポートしています。

ユーザー定義のパイプモード

このモードでは、データ変換と列マッピングをフルコントロールできます。

このモードを使用する場合:

  • JSON フィールド名とは異なるカスタム列名が必要です。

  • データ変換(型キャスト、マスキング、フィルタリング)を適用する必要があります

  • データが列にどのようにマッピングされるかをフルコントロールしたい場合

デフォルトのパイプモード

このモードでは、コネクタは {tableName}-STREAMING という名前のデフォルトのパイプを使用し、kafkaレコードフィールドを名前(大文字と小文字の区別なし)で一致するテーブル列にマップします。

このモードを使用する場合:

  • kafkaレコードのキー名は希望する列名と一致します

  • カスタムデータ変換は必要ありません

  • 構成を簡素化したい場合

デフォルトのパイプモードを使用したテーブル列へのkafka記録キーのマッピング

デフォルトパイプモードを使用すると、コネクタは {tableName}-STREAMING という名前のデフォルトパイプを使用し、大文字と小文字を区別しない一致を使用して、コンテンツの第1レベルキーをテーブル列に直接マッピングします。

デフォルトパイプモードの使用 - 例

例1:

次のkafka記録コンテンツペイロードを検討してください。

{
  "city": "New York",
  "age": 30,
  "married": true,
  "has cat": true,
  "@&$#* includes special characters": true,
  "skills": ["sitting", "standing", "eating"],
  "family": {"son": "Jack", "daughter": "Anna"}
}
Copy

JSON キーに一致する列を持つテーブルを作成します(大文字と小文字を区別しない、特殊文字を含む)。

CREATE TABLE PERSON_DATA (
  record_metadata VARIANT,
  city VARCHAR,
  age NUMBER,
  married BOOLEAN,
  "has cat" BOOLEAN,
  "!@&$#* includes special characters" BOOLEAN,
  skills VARIANT,
  family VARIANT
);
Copy

一致する動作:

  • "city" (kafka)→ city または CITY または City (列) - 大文字と小文字を区別しない

  • "has cat" (kafka)→ ``"has cat"``(列) - スペースがあるため引用符で囲む必要がある

  • "!@&$#* includes special characters" (kafka)→ ``"!@&$#* includes special characters"``(列) - 特殊文字の保持

  • skills および family のようなネストされたオブジェクトを VARIANT 列に自動でマップする

ユーザー定義パイプモードの使用 - 例

この例では、カスタムデータ変換でユーザー定義のパイプを構成して使用する方法を示します。

例1:

希望のスキーマでテーブルを作成します。

CREATE TABLE ORDERS (
  order_id NUMBER,
  customer_name VARCHAR,
  order_total NUMBER,
  order_date TIMESTAMP_NTZ,
  source_topic VARCHAR
);
Copy

テーブルスキーマに合わせてKafkaの受信記録を変換するパイプを作成します。

CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
    $1:order_id::NUMBER,
    $1:customer_name,
    $1:order_total::NUMBER,
    $1:order_date::TIMESTAMP_NTZ,
    $1:RECORD_METADATA.topic
  FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

パイプ名( ORDERS )は、テーブル名( ORDERS )と一致することに注意してください。パイプ定義は、 $1:field_name 構文を使用した JSON ペイロードからフィールドを抽出し、テーブル列にそれらをマップします。

注釈

ネストされた JSON フィールドと、 $1['field name'] または $1['has cat'] など括弧表記を使用した特殊文字を含むフィールドにアクセスできます。

トピックをテーブルマッピングに構成します。

snowflake.topic2table.map=kafka-orders-topic:ORDERS
Copy

この構成は、Kafkaトピック kafka-orders-topicORDERS という名前の既存のテーブルとパイプにマップします。

例2:

従来の名前を持たないコンテンツのキーにアクセスする必要がある場合は、次の構文を使用します。

  • 単純なフィールド: $1:field_name

  • スペースまたは特殊文字を含むフィールド: $1['field name'] または $1['has cat']

  • Unicode文字を含むフィールド: $1[' @&$#* has Łułósżź']

  • ネストされたフィールド: $1:parent.child または $1:parent['child field']

Kafkaからの JSON ペイロードを考慮します。

{
  "city": "New York",
  "age": 30,
  "married": true,
  "has cat": true,
  " @&$#* has Łułósżź": true,
  "skills": ["sitting", "standing", "eating"],
  "family": {"son": "Jack", "daughter": "Anna"}
}
Copy

選択した列名で宛先テーブルを作成します。

CREATE TABLE PERSON_DATA (
  city VARCHAR,
  age NUMBER,
  married BOOLEAN,
  has_cat BOOLEAN,
  weird_field_name BOOLEAN,
  skills VARIANT,
  family VARIANT
);
Copy

次に、マッピングを定義する同じ名前のパイプを作成します。

CREATE PIPE PERSON_DATA AS
COPY INTO PERSON_DATA
FROM (
  SELECT
    $1:city,
    $1:age,
    $1:married,
    $1['has cat'] AS has_cat,
    $1[' @&$#* has Łułósżź'] AS weird_field_name,
    $1:skills,
    $1:family
  FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

重要なポイント:

  • 列名を制御します(例: "has cat" から has_cat に名前を変更する)

  • 必要に応じてデータ型をキャストできます(例: $1:age::NUMBER

  • 必要に応じて、フィールドを含むことも除外することもできます

  • メタデータフィールドを追加できます(例: $1:RECORD_METADATA.topic

  • VARIANT 列は、ネストされた JSON 構造を自動的に処理します。

例3:インタラクティブテーブルの使用

インタラクティブテーブルは、低レイテンシの高同時実行性クエリ向けに最適化されたSnowflakeテーブルの特別なタイプです。インタラクティブテーブルの詳細については、 インタラクティブテーブルドキュメント をご参照ください。

注釈

インタラクティブテーブルは現在、選ばれたアカウントでのみ利用できるプレビュー機能です。

  1. インタラクティブテーブルを作成します。

    CREATE INTERACTIVE TABLE REALTIME_METRICS (
      metric_name VARCHAR,
      metric_value NUMBER,
      source_topic VARCHAR,
      timestamp TIMESTAMP_NTZ
    ) AS (SELECT
          $1:M_NAME::VARCHAR,
          $1:M_VALUE::NUMBER,
          $1:RECORD_METADATA.topic::VARCHAR,
          $1:RECORD_METADATA.timestamp::TIMESTAMP_NTZ
          from TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
    
    Copy
  2. トピックをテーブルマッピングに構成します。

    snowflake.topic2table.map=metrics-topic:REALTIME_METRICS
    
    Copy

重要な考慮事項:

  • インタラクティブテーブルには特定の制限とクエリ制限があります。コネクタで使用する前に インタラクティブテーブルドキュメント をご確認ください。

  • インタラクティブテーブルの場合、必要な変換はテーブル定義で処理する必要があります。

  • インタラクティブテーブルを効率的にクエリするには、インタラクティブウェアハウスが必要です。

明示的なトピックからテーブルへのマッピング

snowflake.topic2table.map パラメーターを構成するときは、コネクタは明示的マッピングモードで動作します。このモードでは、次を実行できます。

  • 複数のKafkaトピックを単一のSnowflakeテーブルにマッピングする

  • トピック名とは異なるカスタムテーブル名を使用する

  • 複数のトピックに正規表現パターンを適用する

構成形式:

snowflake.topic2table.map パラメーターは、トピックからテーブルへのマッピングのコンマ区切りリストを以下の形式で受け入れます。

topic1:table1,topic2:table2,topic3:table3
Copy

構成例:

直接トピックマッピング

snowflake.topic2table.map=orders:ORDER_TABLE,customers:CUSTOMER_TABLE
Copy

正規表現パターンマッチング

snowflake.topic2table.map=.*_cat:CAT_TABLE,.*_dog:DOG_TABLE
Copy

この構成は、 _catorange_catcalico_cat など)で終わるすべてのトピックを CAT_TABLE テーブルに、 _dog で終わるすべてのトピックを DOG_TABLE テーブルにマップします。

1つのテーブルへの多数のトピック

snowflake.topic2table.map=topic1:shared_table,topic2:shared_table,topic3:other_table
Copy

この構成は topic1 および topic2 の両方を shared_table にマップする一方で、 topic3other_table にマップします。

重要

  • マッピング内の正規表現パターンは重複できません。各トピックは、最大で1つのパターンに一致する必要があります。

  • マッピングのテーブル名は、文字またはアンダースコアで始まる2文字以上の有効なSnowflake識別子である必要があります。

  • 複数のトピックを1つのテーブルにマップできます。

レガシー RECORD_CONTENT 列

コネクタの古いバージョンでスキーマ化機能が無効になっていた場合、コネクタは RECORD_CONTENT および RECORD_METADATA の2つの列を持つ宛先テーブルを作成しました。RECORD_CONTENT 列には、 VARIANT 型 の列にKafkaメッセージのコンテンツ全体が含まれていました。RECORD_METADATA 列は引き続きサポートされていますが、 RECORD_CONTENT 列はコネクタによって作成されなくなりました。同じ機能は SMT 変換を使用して実現できます(このセクションの後半の例をご参照ください)。RECORD_CONTENT キーも PIPE 変換では利用できなくなりました。たとえば、この PIPE 定義はデフォルトでは機能しません。

注釈

このパイプ定義は、追加の SMT 変換なしでは機能しません。

CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
    $1:RECORD_CONTENT
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

Kafkaメッセージのコンテンツ全体を単一の列に保存する必要がある場合や、 PIPE 変換でKafkaメッセージのコンテンツ全体の処理が必要な場合、Kafkaメッセージのコンテンツ全体を希望のカスタムフィールドにラップする、次の SMT 変換を使用できます。

transforms=wrapKafkaMessageContent
transforms.wrapKafkaMessageContent.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.wrapKafkaMessageContent.field=your_top_level_field_name
Copy

この変換は、Kafkaメッセージのコンテンツ全体を your_top_level_field_name という名前のカスタムフィールドにラップします。その後は、 PIPE 変換の $1:your_top_level_field_name アクセサーを使用して、Kafkaメッセージのコンテンツ全体にアクセスできます。

CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
    $1:your_top_level_field_name
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

または、デフォルトのパイプを使用してメタデータとコンテンツの両方を1つのテーブルに保存する場合は、カスタムパイプを作成しないでください。代わりに、 RECORD_CONTENT および your_top_level_field_name の2つの列を持つテーブルのみを作成します。

CREATE TABLE ORDERS (
  record_metadata VARIANT,
  your_top_level_field_name VARIANT
);
Copy

HoistField$Valueの変換の詳細を読むには、 Kafkaドキュメント を参照してください。

警告

Kafkaメッセージのコンテンツとメタデータ全体をテーブルに保存すると、インジェスションコスト、パイプラインの速度、レイテンシに悪影響を与える可能性があります。最高のパフォーマンスが必要な場合は、Kafka記録コンテンツの最上位からアクセスできる場合に必要なデータのみを保存することを検討するか、 SMT 変換を使用して深くネストされたフィールドから最上位フィールドにデータを抽出します。

ストリーミングチャネルエラーと配信不能キューの処理

バージョン 4.0.0-rc4 では、コネクタはオフセットをコミットする前にSnowpipe Streamingチャネルのステータスを検査します。Snowflakeが拒否された行をレポートする場合( rowsErrorCount > 0), the connector now raises a fatal error (ERROR_5030) when errors.tolerance=none so that data issues cannot go unnoticed. To allow ingestion to continue while you triage bad rows, set errors.tolerance=all

errors.tolerance=all
Copy

スキーマの進化

重要

Kafka用の高性能Snowflakeコネクタでは、 スキーマの進化はサポートされていません 。宛先テーブルへのスキーマの変更は手動で管理する必要があります。

コネクタは、スキーマの変更を自動的に検出したり、Kafkaの受信記録に基づいてテーブルスキーマを進化させたりしません。列を追加したり、データ型を変更したり、その他のスキーマを変更したりする必要がある場合は、以下を実行する必要があります。

  1. データインジェスションを停止するには、 コネクタを一時停止 します

  2. ALTER TABLE を使用して、またはテーブルを再作成して テーブルスキーマを手動で変更 します

  3. ユーザー定義のパイプを使用しており、変換ロジックを変更する必要がある場合は、 パイプ定義を更新 します

  4. データインジェスションを再開するには、 コネクタを再起動 します

注釈

スキーマ進化のサポートは、将来のリリースで追加される予定です。

フォールトトレランス

コネクタのフォールトトレランスの制限

Kafkaトピックは、ストレージスペースまたは保持時間の制限で構成できます。

  • 保持時間を超えてシステムがオフラインしている場合、期限切れの記録はロードされません。同様に、Kafkaのストレージスペースの制限を超えると、一部のメッセージは配信されません。

  • Kafkaトピックのメッセージが削除された場合、これらの変更はSnowflakeテーブルに反映されません。

次のステップ

タスクの設定