Working with the Snowflake High Performance connector for Kafka¶
This topic describes how the connector works with tables and pipes, and how to configure the connector with these elements.
コネクタがテーブルとパイプで動作する方法¶
The connector treats each Kafka record as a row to be inserted into a Snowflake table. For example, if you have a Kafka topic with the content of the message structured like the following JSON:
{
"order_id": 12345,
"customer_name": "John",
"order_total": 100.00,
"isPaid": true
}
デフォルトでは、インジェスチョンを開始する前にテーブルやパイプを作成する必要はありません。コネクタはJSONキーに一致する列を持つテーブルを作成し、 {tableName}-STREAMING という名前のデフォルトパイプに依存できます。これは、記録コンテンツの第1レベルのキーを、名前(大文字と小文字の区別なし)で一致するテーブル列に自動的にマッピングします。JSONキーに一致する列を持つ独自のテーブルを作成することもできます。コネクタは、記録コンテンツの第1レベルのキーを名前でテーブル列と一致させようとします。JSONからのキーがテーブル列と一致しない場合、コネクタはキーを無視します。
CREATE TABLE ORDERS (
record_metadata VARIANT,
order_id NUMBER,
customer_name VARCHAR,
order_total NUMBER,
ispaid BOOLEAN
);
独自のパイプを作成することを選択した場合は、パイプの COPY INTO ステートメントでデータ変換ロジックを定義できます。必要に応じて列の名前を変更し、必要に応じてデータ型をキャストできます。例:
CREATE TABLE ORDERS (
order_id VARCHAR,
customer_name VARCHAR,
order_total VARCHAR,
ispaid VARCHAR
);
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:order_id::STRING,
$1:customer_name,
$1:order_total::STRING,
$1:isPaid::STRING
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
または
CREATE TABLE ORDERS (
topic VARCHAR,
partition VARCHAR,
order_id VARCHAR,
customer_name VARCHAR,
order_total VARCHAR,
ispaid VARCHAR
);
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:RECORD_METADATA.topic::STRING AS topic,
$1:RECORD_METADATA.partition::STRING AS partition,
$1['order_id']::STRING AS order_id,
$1['customer_name']::STRING as customer_name,
CONCAT($1['order_total']::STRING, ' USD') AS order_total,
$1['isPaid']::STRING AS ispaid
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
When you define your own pipe your destination table columns need not need match the JSON keys. You can rename the columns to your desired names and cast the data types if required.
トピック名、テーブル名、パイプ名¶
構成設定に応じて、コネクタは宛先テーブルに異なる名前を使用します。宛先テーブル名は、常にトピック名から派生します。
コネクタがトピック名を宛先テーブルにマップする方法¶
Kafkaコネクタは、Kafkaトピック名をSnowflakeテーブル名にマッピングするための2つのモードを提供します。
静的マッピング:コネクタは、Kafkaトピック名のみを使用して宛先テーブル名を導出します。
明示的なトピックからテーブルへのマッピングモード:
snowflake.topic2table.map構成パラメーターを使用して、トピックとテーブル間のカスタムマッピングを指定します。
静的マッピング¶
snowflake.topic2table.map パラメーターを構成しない場合、コネクタは常にトピック名からテーブル名を導出します。
テーブル名の生成:
コネクタは、次のルールを使用してトピック名から宛先テーブル名を導出します。
If the topic name is a valid Snowflake identifier the connector uses the topic name as the destination table name, converted to uppercase).
トピック名に無効な文字が含まれている場合、コネクタは:
無効な文字をアンダースコアに置き換えます
一意性を確保するために、アンダースコアの後にハッシュコードを追加します
たとえば、トピック
my-topic.dataはMY_TOPIC_DATA_<ハッシュ>になります
パイプ名の決定:
コネクタは、次のロジックに基づいて、どのパイプを使用するかを決定します。
コネクタは、宛先テーブル名と同じ名前のパイプが存在するかどうかを確認します。
その名前のユーザー作成パイプが存在する場合、コネクタはそのパイプ(ユーザー定義パイプモード)を使用します。
そうでない場合、コネクタは
{tableName}-STREAMINGという名前のデフォルトのパイプを使用します。
注釈
Snowflakeは、予測可能なテーブル名を確保するために、Snowflake識別子名のルールに従うトピック名を選択することをお勧めします。
RECORD_METADATA を理解する¶
コネクタは、Kafka記録に関するメタデータを含む RECORD_METADATA 構造を入力します。このメタデータは、Snowpipe Streamingデータソースを通じてSnowflakeに送信され、Snowflakeでは $1:RECORD_METADATA アクセサーを使用してパイプ変換で利用可能になります。RECORD_METADATA 構造は、ユーザー定義パイプとデフォルトパイプモードの両方で使用できます。その内容は、 VARIANT 型の列に保存できます。または個々のフィールドを抽出して、個別の列に保存できます。
変換とメタデータを含むパイプの例:
CREATE PIPE ORDERS AS
COPY INTO ORDERS_TABLE
FROM (
SELECT
$1:order_id::NUMBER,
$1:customer_name,
$1:order_total,
$1:RECORD_METADATA.topic AS source_topic,
$1:RECORD_METADATA.offset::NUMBER AS kafka_offset,
$1:RECORD_METADATA.SnowflakeConnectorPushTime::BIGINT AS ingestion_time
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
この例では:
パイプは、Kafkaメッセージから特定のフィールド(order_id、customer_name、order_total)を抽出します。
また、メタデータフィールド(トピック、オフセット、インジェスションタイムスタンプ)もキャプチャします。
値は、必要に応じてキャストおよび/または変換できます
メタデータフィールドの入力方法¶
コネクタは、Kafka記録プロパティとコネクタ構成に基づいてメタデータフィールドを自動的に入力します。これらの構成パラメーターを使用して、どのメタデータフィールドを含めるかを制御できます。
snowflake.metadata.topic(デフォルト: true) - トピック名を含むsnowflake.metadata.offset.and.partition(デフォルト: true) - オフセットとパーティションを含むsnowflake.metadata.createtime(デフォルト: true) - Kafka記録のタイムスタンプを含むsnowflake.metadata.all(デフォルト: true) - 利用可能なすべてのメタデータを含む
snowflake.metadata.all=true (デフォルト)の場合、すべてのメタデータフィールドが入力されます。個々のメタデータフラグを false に設定すると、 RECORD_METADATA 構造から特定のフィールドを除外します。
注釈
SnowflakeConnectorPushTime フィールドは常に利用可能で、コネクタが記録をインジェスションバッファーにプッシュした時刻を表します。これは、エンドツーエンドのインジェスションレイテンシを計算するのに便利です。
RECORD_METADATA 構造には、デフォルトで次の情報が含まれています。
フィールド |
データ型 |
説明 |
|---|---|---|
topic |
String |
レコードが由来するKafkaトピックの名前です。 |
partition |
String |
トピック内のパーティションの番号です。(これはSnowflakeマイクロパーティションではなく、Kafkaパーティションであることに注意してください。) |
offset |
number |
そのパーティションのオフセットです。 |
CreateTime / . LogAppendTime |
number |
This is the timestamp associated with the message in the Kafka topic. The value is milliseconds since midnight January 1, 1970, UTC. For more information, see: Kafka ProducerRecord documentation. |
SnowflakeConnectorPushTime |
number |
記録がインジェスト SDK バッファにプッシュされたときのタイムスタンプ。値は、1970年1月1日午前0時 UTC からのミリ秒数です。詳細については、 インジェスチョンレイテンシの推定 を参照してください。 |
key |
String |
If the message is a Kafka KeyedMessage, this is the key for that message.
In order for the connector to store the key in the RECORD_METADATA, the |
headers |
オブジェクト |
A header is a user-defined key-value pair associated with the record. Each record can have 0, 1, or multiple headers. |
RECORD_METADATA 列に記録されるメタデータの量は、オプションのKafka構成プロパティを使用して構成できます。
フィールド名と値は大文字と小文字が区別されます。
インジェスション前のKafka記録の変換方法¶
各行がSnowpipe Streamingに渡される前に、コネクタはKafka Connectの記録値を Map<文字列、オブジェクト> に変換します。このキーはターゲット列名と一致する必要があります(または、ユーザー定義のパイプ内で変換できます)。プリミティブ文字列、バイト配列、または数値は(たとえば HoistField SMT を使用して)ラップする必要があります。それにより、コネクタは構造化オブジェクトを受け取れるようになります。コンバーターは次のルールを適用します。
Null値はtombstoneとして扱われます。
behavior.on.null.values=IGNOREまたは、空の JSON オブジェクトとして取り込まれる場合はスキップされます。数値およびブールフィールドはそのまま渡されます。精度が38より大きい10進値は、Snowflakeの
NUMBERの制限内にとどまる文字列としてシリアル化されます。byte[]およびByteBufferペイロードはBase64でエンコードされた文字列であるため、VARIANTまたはVARCHAR列に保存します。配列は配列のままであり、ネストされたオブジェクトはネストされたマップのままです。デフォルトのパイプに依存してネストされたデータをそのままにする場合、
VARIANT列を宣言します。非文字列キーを持つマップは、
[key, value]ペアの配列として出力されます。これは、Snowflakeの列名はテキストである必要があるためです。関連するメタデータフラグが有効になっている場合は常に、記録のヘッダーとキーは
RECORD_METADATAにコピーされます。
メッセージ本文全体を単一の列として保持する必要がある場合は、 SMTs を使用して新しいトップレベルフィールドにラップします。 変換パターンについては レガシー RECORD_CONTENT 列 をご参照ください。
ユーザー定義パイプモードとデフォルトのパイプモード¶
コネクタは、データインジェスションを管理するために2つのモードをサポートしています。
ユーザー定義のパイプモード¶
このモードでは、データ変換と列マッピングをフルコントロールできます。
このモードを使用する場合:
JSON フィールド名とは異なるカスタム列名が必要です。
データ変換(型キャスト、マスキング、フィルタリング)を適用する必要があります
データが列にどのようにマッピングされるかをフルコントロールしたい場合
デフォルトのパイプモード¶
このモードでは、コネクタは {tableName}-STREAMING という名前のデフォルトのパイプを使用し、kafkaレコードフィールドを名前(大文字と小文字の区別なし)で一致するテーブル列にマップします。
このモードを使用する場合:
kafkaレコードのキー名は希望する列名と一致します
カスタムデータ変換は必要ありません
You want a simple configuration
デフォルトのパイプモードを使用したテーブル列へのkafka記録キーのマッピング
デフォルトパイプモードを使用すると、コネクタは {tableName}-STREAMING という名前のデフォルトパイプを使用し、大文字と小文字を区別しない一致を使用して、コンテンツの第1レベルキーをテーブル列に直接マッピングします。
デフォルトパイプモードの使用 - 例¶
例1:¶
次のkafka記録コンテンツペイロードを検討してください。
{
"city": "New York",
"age": 30,
"married": true,
"has cat": true,
"@&$#* includes special characters": true,
"skills": ["sitting", "standing", "eating"],
"family": {"son": "Jack", "daughter": "Anna"}
}
JSON キーに一致する列を持つテーブルを作成します(大文字と小文字を区別しない、特殊文字を含む)。
CREATE TABLE PERSON_DATA (
record_metadata VARIANT,
city VARCHAR,
age NUMBER,
married BOOLEAN,
"has cat" BOOLEAN,
"!@&$#* includes special characters" BOOLEAN,
skills VARIANT,
family VARIANT
);
一致する動作:
ユーザー定義パイプモードの使用 - 例¶
この例では、カスタムデータ変換でユーザー定義のパイプを構成して使用する方法を示します。
例1:¶
希望のスキーマでテーブルを作成します。
CREATE TABLE ORDERS (
order_id NUMBER,
customer_name VARCHAR,
order_total NUMBER,
order_date TIMESTAMP_NTZ,
source_topic VARCHAR
);
テーブルスキーマに合わせてKafkaの受信記録を変換するパイプを作成します。
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:order_id::NUMBER,
$1:customer_name,
$1:order_total::NUMBER,
$1:order_date::TIMESTAMP_NTZ,
$1:RECORD_METADATA.topic
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
パイプ名( ORDERS )は、テーブル名( ORDERS )と一致することに注意してください。パイプ定義は、 $1:field_name 構文を使用した JSON ペイロードからフィールドを抽出し、テーブル列にそれらをマップします。
注釈
ネストされた JSON フィールドと、 $1['field name'] または $1['has cat'] など括弧表記を使用した特殊文字を含むフィールドにアクセスできます。
トピックをテーブルマッピングに構成します。
snowflake.topic2table.map=kafka-orders-topic:ORDERS
この構成は、Kafkaトピック kafka-orders-topic を ORDERS という名前の既存のテーブルとパイプにマップします。
例2:¶
従来の名前を持たないコンテンツのキーにアクセスする必要がある場合は、次の構文を使用します。
単純なフィールド:
$1:field_nameスペースまたは特殊文字を含むフィールド:
$1['field name']または$1['has cat']Unicode文字を含むフィールド:
$1[' @&$#* has Łułósżź']ネストされたフィールド:
$1:parent.childまたは$1:parent['child field']
Kafkaからの JSON ペイロードを考慮します。
{
"city": "New York",
"age": 30,
"married": true,
"has cat": true,
" @&$#* has Łułósżź": true,
"skills": ["sitting", "standing", "eating"],
"family": {"son": "Jack", "daughter": "Anna"}
}
選択した列名で宛先テーブルを作成します。
CREATE TABLE PERSON_DATA (
city VARCHAR,
age NUMBER,
married BOOLEAN,
has_cat BOOLEAN,
weird_field_name BOOLEAN,
skills VARIANT,
family VARIANT
);
次に、マッピングを定義する同じ名前のパイプを作成します。
CREATE PIPE PERSON_DATA AS
COPY INTO PERSON_DATA
FROM (
SELECT
$1:city,
$1:age,
$1:married,
$1['has cat'] AS has_cat,
$1[' @&$#* has Łułósżź'] AS weird_field_name,
$1:skills,
$1:family
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
重要なポイント:
列名を制御します(例:
"has cat"からhas_catに名前を変更する)必要に応じてデータ型をキャストできます(例:
$1:age::NUMBER)必要に応じて、フィールドを含むことも除外することもできます
メタデータフィールドを追加できます(例:
$1:RECORD_METADATA.topic)VARIANT 列は、ネストされた JSON 構造を自動的に処理します。
例3:インタラクティブテーブルの使用¶
インタラクティブテーブルは、低レイテンシの高同時実行性クエリ向けに最適化されたSnowflakeテーブルの特別なタイプです。インタラクティブテーブルの詳細については、 インタラクティブテーブルドキュメント をご参照ください。
インタラクティブテーブルを作成します。
CREATE INTERACTIVE TABLE REALTIME_METRICS ( metric_name VARCHAR, metric_value NUMBER, source_topic VARCHAR, timestamp TIMESTAMP_NTZ ) AS (SELECT $1:M_NAME::VARCHAR, $1:M_VALUE::NUMBER, $1:RECORD_METADATA.topic::VARCHAR, $1:RECORD_METADATA.timestamp::TIMESTAMP_NTZ from TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
トピックをテーブルマッピングに構成します。
snowflake.topic2table.map=metrics-topic:REALTIME_METRICS
重要な考慮事項:
インタラクティブテーブルには特定の制限とクエリ制限があります。コネクタで使用する前に インタラクティブテーブルドキュメント をご確認ください。
インタラクティブテーブルの場合、必要な変換はテーブル定義で処理する必要があります。
インタラクティブテーブルを効率的にクエリするには、インタラクティブウェアハウスが必要です。
明示的なトピックからテーブルへのマッピング¶
snowflake.topic2table.map パラメーターを構成するときは、コネクタは明示的マッピングモードで動作します。このモードでは、次を実行できます。
複数のKafkaトピックを単一のSnowflakeテーブルにマッピングする
トピック名とは異なるカスタムテーブル名を使用する
複数のトピックに正規表現パターンを適用する
構成形式:
snowflake.topic2table.map パラメーターは、トピックからテーブルへのマッピングのコンマ区切りリストを以下の形式で受け入れます。
topic1:table1,topic2:table2,topic3:table3
構成例:
直接トピックマッピング
snowflake.topic2table.map=orders:ORDER_TABLE,customers:CUSTOMER_TABLE
正規表現パターンマッチング
snowflake.topic2table.map=.*_cat:CAT_TABLE,.*_dog:DOG_TABLE
この構成は、 _cat ( orange_cat 、 calico_cat など)で終わるすべてのトピックを CAT_TABLE テーブルに、 _dog で終わるすべてのトピックを DOG_TABLE テーブルにマップします。
1つのテーブルへの多数のトピック
snowflake.topic2table.map=topic1:shared_table,topic2:shared_table,topic3:other_table
この構成は topic1 および topic2 の両方を shared_table にマップする一方で、 topic3 は other_table にマップします。
重要
マッピング内の正規表現パターンは重複できません。各トピックは、最大で1つのパターンに一致する必要があります。
マッピングのテーブル名は、文字またはアンダースコアで始まる2文字以上の有効なSnowflake識別子である必要があります。
複数のトピックを1つのテーブルにマップできます。
レガシー RECORD_CONTENT 列¶
In prior versions of the connector (3.x and earlier), when the schematization feature was disabled, the connector created a destination table with two columns: RECORD_CONTENT and RECORD_METADATA. The RECORD_CONTENT column contained the entire Kafka message content in a column of type VARIANT. The RECORD_METADATA column continues to be supported but the RECORD_CONTENT column is no longer created by the connector. The same functionality can be achieved using SMT transformations (see examples later in this section). The RECORD_CONTENT key is also no longer available in PIPE transformations. For example, this PIPE definition will not work by default:
注釈
このパイプ定義は、追加の SMT 変換なしでは機能しません。
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:RECORD_CONTENT
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Kafkaメッセージのコンテンツ全体を単一の列に保存する必要がある場合や、 PIPE 変換でKafkaメッセージのコンテンツ全体の処理が必要な場合、Kafkaメッセージのコンテンツ全体を希望のカスタムフィールドにラップする、次の SMT 変換を使用できます。
transforms=wrapKafkaMessageContent
transforms.wrapKafkaMessageContent.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.wrapKafkaMessageContent.field=your_top_level_field_name
この変換は、Kafkaメッセージのコンテンツ全体を your_top_level_field_name という名前のカスタムフィールドにラップします。その後は、 PIPE 変換の $1:your_top_level_field_name アクセサーを使用して、Kafkaメッセージのコンテンツ全体にアクセスできます。
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:your_top_level_field_name
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
または、デフォルトのパイプを使用してメタデータとコンテンツの両方を1つのテーブルに保存する場合は、カスタムパイプを作成しないでください。代わりに、 RECORD_CONTENT および your_top_level_field_name の2つの列を持つテーブルのみを作成します。
CREATE TABLE ORDERS (
record_metadata VARIANT,
your_top_level_field_name VARIANT
);
HoistField$Valueの変換の詳細を読むには、 Kafkaドキュメント を参照してください。
警告
Kafkaメッセージのコンテンツとメタデータ全体をテーブルに保存すると、インジェスションコスト、パイプラインの速度、レイテンシに悪影響を与える可能性があります。最高のパフォーマンスが必要な場合は、Kafka記録コンテンツの最上位からアクセスできる場合に必要なデータのみを保存することを検討するか、 SMT 変換を使用して深くネストされたフィールドから最上位フィールドにデータを抽出します。
ストリーミングチャネルエラーと配信不能キューの処理¶
The connector inspects the Snowpipe Streaming channel status before committing offsets in Kafka. If the connector detects that the rowsErrorCount property on channel has increased since the connector was started, it raises a fatal error (ERROR_5030) when errors.tolerance=none so that data issues don't go unnoticed. To allow ingestion to continue while triaging bad rows, set errors.tolerance=all
errors.tolerance=all
スキーマの進化¶
ENABLE_SCHEMA_EVOLUTION=TRUE のあるテーブルの場合、コネクタは受信Kafkaの記録に基づいて、スキーマを自動的に進化させます。 作成されたすべてのコネクタテーブルは、デフォルトで ENABLE_SCHEMA_EVOLUTION=TRUE になります。
スキーマの進化は、以下の操作に制限されています:
新しい列の追加。受信Kafka記録にテーブルに存在しない新しいフィールドが含まれている場合、コネクタはテーブルに新しい列を追加します。
挿入された記録でデータが欠落している列からNOT NULL制約を削除する
Apache Iceberg™ テーブルでのコネクタの使用¶
コネクタは、Snowflakeが管理する Apache Iceberg™ にデータを取り込むことができますが、以下の要件を満たしている必要があります。
Apache Iceberg™ テーブルに関連付けられた外部ボリュームに対するUSAGE権限を付与されている必要があります。
コネクタを実行する前に Apache Iceberg™ テーブルを作成する必要があります。
外部ボリュームの使用許可¶
Kafkaコネクタのロールに Apache Iceberg™ テーブルに関連付けられた外部ボリュームに対するUSAGE権限を付与するには、次のステートメントを実行します。
たとえば、Icebergテーブルが kafka_external_volume 外部ボリュームを使用し、コネクタがロール kafka_connector_role を使用する場合、次のステートメントを実行します:
USE ROLE ACCOUNTADMIN;
GRANT USAGE ON EXTERNAL VOLUME kafka_external_volume TO ROLE kafka_connector_role;
インジェスチョン用の Apache Iceberg™ テーブルを作成する¶
コネクタは自動的にIcebergテーブルを作成せず、スキーマの進化もサポートしません。コネクタを実行する前に、Icebergテーブルを手動で作成する必要があります。
Icebergテーブルを作成する場合、Icebergデータタイプ(VARIANTなど)または 互換性のあるSnowflakeデータタイプ を使用できます。
例えば、次のようなメッセージを考えてみましょう:
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"options":
{
"can_walk": true,
"can_talk": false
},
"date_added": "2024-10-15"
}
例のメッセージ用にIcebergテーブルを作成するには、以下のステートメントの1つを使用します:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table ( id number(38,0), name varchar, body_temperature number(4,2), approved_coffee_types array(varchar), animals_possessed variant, options object(can_walk boolean, can_talk boolean), date_added date ) EXTERNAL_VOLUME = 'my_volume' CATALOG = 'SNOWFLAKE' BASE_LOCATION = 'my_location/my_iceberg_table' ICEBERG_VERSION = 3;CREATE OR REPLACE ICEBERG TABLE my_iceberg_table ( id INT, name string, body_temperature float, approved_coffee_types array(string), animals_possessed variant, date_added date, options object(can_walk boolean, can_talk boolean), ) EXTERNAL_VOLUME = 'my_volume' CATALOG = 'SNOWFLAKE' BASE_LOCATION = 'my_location/my_iceberg_table' ICEBERG_VERSION = 3;
注釈
dogs や cats のような入れ子構造内のフィールド名では、大文字と小文字が区別されます。
次のステップ¶
タスクの設定 。