Snowpipe StreamingでのKafka用Snowflakeコネクタの使用

必要に応じて、KafkaからのデータのロードチェーンでSnowpipeを Snowpipe Streaming に置き換えます。指定されたフラッシュバッファーのしきい値(時間、メモリ、またはメッセージ数)に達すると、仮のステージングされたファイルにデータを書き込むSnowpipeとは異なり、コネクタはSnowpipe Streaming API (「API」)を呼び出して、データの行をSnowflakeテーブルから書き込みます。このアーキテクチャにより、ロード遅延が短縮され、同様の量のデータをロードするためのコストが削減されます。

Snowpipe Streamingで使用するには、Kafkaコネクタのバージョン1.9.1(またはそれ以上)が必要です。Snowpipe Streamingで使用するKafkaコネクタには、Snowflake Ingest SDK が含まれており、Apache Kafkaトピックからターゲットテーブルに直接行をストリーミングできます。

Snowpipe Streaming with Kafka connector

注釈

現在、Snowpipe Streamingを使用するKafkaコネクタは、スキーマ検出またはスキーマ進化をサポートしていません。Snowpipeで使用されるものと同じ テーブルのスキーマ を使用します。

このトピックの内容:

必要最小バージョン

Kafkaコネクタバージョン1.9.1(またはそれ以上)は、Snowpipe Streamingをサポートしています。

Kafka構成プロパティ

接続設定をKafkaコネクタのプロパティファイルに保存します。詳細については、 Kafkaコネクタの構成 をご参照ください。

必須のプロパティ

Kafkaコネクタのプロパティファイルで接続設定を追加または編集します。詳細については、 Kafkaコネクタの構成 をご参照ください。

snowflake.ingestion.method

ストリーミングインジェストクライアントとしてKafkaコネクタを使用する場合にのみ必要です。 Kafkaトピックデータをロードするために、Snowpipe Streamingまたは標準Snowpipeのいずれを使用するかを指定します。サポートされている値は次のとおりです。

  • SNOWPIPE_STREAMING

  • SNOWPIPE (デフォルト)

トピックデータをキューに入れてロードするために、バックエンドサービスを選択する追加の設定は必要ありません。通常どおり、Kafkaコネクタのプロパティファイルで追加のプロパティを構成します。

snowflake.role.name

テーブルに行を挿入するときに使用するアクセス制御ロール。

バッファーおよびポーリングのプロパティ

buffer.flush.time

バッファーフラッシュ間の秒数。フラッシュごとに、バッファーされた記録の挿入操作が実行されます。Kafkaコネクタは、各フラッシュの後にSnowpipe Streaming API(「API」)を1回呼び出します。

buffer.flush.time プロパティでサポートされる最小値は 1 (秒単位)です。データの平均フローレートを高くするには、遅延を低減するためにデフォルト値を下げることをお勧めします。遅延よりもコストが重要な場合は、バッファーのフラッシュ時間を増やすことができます。メモリ不足の例外を回避するため、バッファーがいっぱいになる前にKafkaメモリバッファーをフラッシュするように注意してください。

1 - 上限なし。

デフォルト

10

buffer.count.records

Snowflakeにインジェストされる前に、Kafkaパーティションごとにメモリにバッファされる記録の数。

1 - 上限なし。

デフォルト

10000

buffer.size.bytes

データファイルとしてSnowflakeに取り込まれる前に、Kafkaパーティションごとにメモリーにバッファーされたレコードの累積サイズ(バイト単位)。

レコードは、データファイルに書き込まれるときに圧縮されます。その結果、バッファー内のレコードのサイズは、レコードから作成されたデータファイルのサイズよりも大きくなる可能性があります。

1 - 上限なし。

デフォルト

20000000 (20 MB)

Kafkaコネクタのプロパティに加えて、Kafkaコンシューマー max.poll.records のプロパティに注意してください。これは、1回のポーリングでKafkaからKafka Connectに返される記録の最大数を制御します。 500 のデフォルト値は増やすことができますが、メモリの制約に注意してください。このプロパティの詳細については、Kafkaパッケージのドキュメントをご参照ください。

エラー処理および DLQ プロパティ

errors.tolerance

Kafkaコネクタで発生したエラーの処理方法を指定します。

このプロパティは、次の値をサポートします。

NONE

最初のエラーが発生したときにデータのロードを停止します。

ALL

すべてのエラーを無視して、データのロードを続行します。

デフォルト

NONE

errors.log.enable

Kafka Connectログファイルにエラーメッセージを書き込むかどうかを指定します。

このプロパティは、次の値をサポートします。

TRUE

エラーメッセージを書き込みます。

FALSE

エラーメッセージを書き込みません。

デフォルト

FALSE

errors.deadletterqueue.topic.name

SnowflakeテーブルにインジェストできなかったメッセージをKafkaに配信するために、Kafkaの DLQ(配信不能キュー)トピック名を指定します。詳細については、 配信不能キュー (このトピック内)をご参照ください。

カスタムテキスト文字列

デフォルト

なし。

必ず1回のセマンティクス

必ず1回のセマンティクスにより、重複やデータ損失なしでKafkaメッセージの配信が保証されます。この配信保証は、Snowpipe Streamingを使用するKafkaコネクタに対してデフォルトで設定されています。

コンバーター

Snowpipe Streamingを使用するKafkaコネクタは、次の key.converter または value.converter 値をサポートしていません。

  • com.snowflake.kafka.connector.records.SnowflakeJsonConverter

  • com.snowflake.kafka.connector.records.SnowflakeAvroConverter

  • com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry

カスタムSnowflakeコンバーターは、ファイルをテーブルステージに移動することにより、データのロードを妨げるエラーを処理します。このワークフローは、Snowpipe Streamingの 配信不能キュー と競合します。

配信不能キュー

Snowpipe Streamingを使用するKafkaコネクタは、壊れた記録または障害のために正常に処理できない記録の配信不能キュー(DLQ)をサポートしています。

モニタリングの詳細については、Apache Kafkaの ドキュメント をご参照ください。

請求および使用状況

Snowpipe Streamingの課金情報については、 Snowpipe Streamingの請求 をご参照ください。