Snowpipe StreamingでのKafka用Snowflakeコネクタの使用

KafkaからのデータのロードチェーンでSnowpipeを Snowpipe Streaming に置き換えることができます。指定されたフラッシュバッファーのしきい値(時間、メモリ、またはメッセージ数)に達すると、仮のステージングされたファイルにデータを書き込むSnowpipeとは異なり、コネクタはSnowpipe Streaming API (「 API 」)を呼び出して、データの行をSnowflakeテーブルから書き込みます。このアーキテクチャにより、ロード遅延が短縮され、同様の量のデータをロードするためのコストが削減されます。

Snowpipe Streamingで使用するには、Kafkaコネクタのバージョン2.0.0 (またはそれ以降) が必要です。Snowpipe Streamingで使用するKafkaコネクタには、Snowflake Ingest SDK が含まれており、Apache Kafkaトピックからターゲットテーブルに直接行をストリーミングできます。

Kafkaコネクタを使用したSnowpipe Streaming

このトピックの内容:

必要最小バージョン

Snowpipe StreamingをサポートするKafkaコネクタの最小必要バージョンは2.0.0です。

Kafka構成プロパティ

接続設定をKafkaコネクタのプロパティファイルに保存します。詳細については、 Kafkaコネクタの構成 をご参照ください。

必須のプロパティ

Kafkaコネクタのプロパティファイルで接続設定を追加または編集します。詳細については、 Kafkaコネクタの構成 をご参照ください。

snowflake.ingestion.method

ストリーミングインジェストクライアントとしてKafkaコネクタを使用する場合にのみ必要です。 Kafkaトピックデータをロードするために、Snowpipe Streamingまたは標準Snowpipeのいずれを使用するかを指定します。サポートされている値は次のとおりです。

  • SNOWPIPE_STREAMING

  • SNOWPIPE (デフォルト)

トピックデータをキューに入れてロードするために、バックエンドサービスを選択する追加の設定は必要ありません。通常どおり、Kafkaコネクタのプロパティファイルで追加のプロパティを構成します。

snowflake.role.name

テーブルに行を挿入するときに使用するアクセス制御ロール。

クライアント最適化プロパティ

enable.streaming.client.optimization

ワンクライアント最適化を有効にするかどうかを指定します。このプロパティは、Kafkaコネクタのリリースバージョン2.1.2以降でサポートされています。デフォルトで有効になっています。

ワンクライアント最適化では、Kafkaコネクタごとに複数のトピックパーティションに対して1つのクライアントのみが作成されます。この機能により、より大きなファイルを作成することによって、クライアントの実行時間を短縮し、移行コストを削減することができます。

:
  • true

  • false

デフォルト:

true

高スループットのシナリオ (例: コネクタあたり50 MB/s) でこのプロパティを有効にすると、遅延やコストの上昇につながる可能性があることに注意してください。スループットの高いシナリオでは、このプロパティを無効にすることをお勧めします。

バッファーおよびポーリングのプロパティ

buffer.flush.time

バッファーフラッシュ間の秒数。フラッシュごとに、バッファーされた記録の挿入操作が実行されます。Kafkaコネクタは、各フラッシュの後にSnowpipe Streaming API を1回呼び出します。

buffer.flush.time プロパティでサポートされる最小値は 1 (秒単位)です。データの平均フローレートを高くするには、遅延を低減するためにデフォルト値を下げることをお勧めします。遅延よりもコストが重要な場合は、バッファーのフラッシュ時間を増やすことができます。メモリ不足の例外を回避するため、バッファーがいっぱいになる前にKafkaメモリバッファーをフラッシュするように注意してください。

:
  • 最小: 1

  • 最大: 上限なし

デフォルト:

10

Snowpipe Streamingは自動的に1秒ごとにデータをフラッシュしますが、これはKafkaコネクタのバッファフラッシュ時間とは異なることに注意してください。Kafkaバッファのフラッシュ時間に達した後、データはSnowpipe Streamingを通じてSnowflakeに1秒の遅延で送信されます。詳細については、 Snowpipe Streamingの遅延 をご参照ください。

buffer.count.records

Snowflakeにインジェストされる前に、Kafkaパーティションごとにメモリにバッファされる記録の数。

:
  • 最小: 1

  • 最大: 上限なし

デフォルト:

10000

buffer.size.bytes

データファイルとしてSnowflakeに取り込まれる前に、Kafkaパーティションごとにメモリーにバッファーされた記録の累積サイズ(バイト単位)。

記録は、データファイルに書き込まれるときに圧縮されます。その結果、バッファー内の記録のサイズは、記録から作成されたデータファイルのサイズよりも大きくなる可能性があります。

:
  • 最小: 1

  • 最大: 上限なし

デフォルト:

20000000 (20 MB)

Kafkaコネクタのプロパティに加えて、Kafkaコンシューマー max.poll.records のプロパティに注意してください。これは、1回のポーリングでKafkaからKafka Connectに返される記録の最大数を制御します。 500 のデフォルト値は増やすことができますが、メモリの制約に注意してください。このプロパティの詳細については、Kafkaパッケージのドキュメントをご参照ください。

エラー処理および DLQ プロパティ

errors.tolerance

Kafkaコネクタで発生したエラーの処理方法を指定します。

このプロパティは、次の値をサポートします。

:
  • NONE: 最初のエラーが発生したときにデータのロードを停止します。

  • ALL: すべてのエラーを無視して、データのロードを続行します。

デフォルト:

NONE

errors.log.enable

Kafka Connectログファイルにエラーメッセージを書き込むかどうかを指定します。

このプロパティは、次の値をサポートします。

:
  • TRUE: エラーメッセージを書き込みます。

  • FALSE: エラーメッセージを書き込みません。

デフォルト:

FALSE

errors.deadletterqueue.topic.name

SnowflakeテーブルにインジェストできなかったメッセージをKafkaに配信するために、Kafkaの DLQ (配信不能キュー) トピック名を指定します。詳細については、 配信不能キュー (このトピック内)をご参照ください。

:

カスタムテキスト文字列

デフォルト:

なし

必ず1回のセマンティクス

必ず1回のセマンティクスにより、重複やデータ損失なしでKafkaメッセージの配信が保証されます。この配信保証は、Snowpipe Streamingを使用するKafkaコネクタに対してデフォルトで設定されています。

Kafkaコネクタは、パーティションとチャネル間の1対1マッピングを採用し、2つの異なるオフセットを使用します。

  • コンシューマーオフセット: これは、コンシューマーによって消費された最新のオフセットを追跡し、Kafkaによって管理されます。

  • オフセットトークン: これは、Snowflakeでコミットされた最新のオフセットを追跡し、Snowflakeによって管理されます。

Kafkaコネクタは、常にオフセットの欠落を処理するわけではないことに注意してください。Snowflakeは、すべての記録が順次増加するオフセットを持つことを期待しています。オフセットの欠落は、特定のユースケースにおいてKafkaコネクタを破壊します。 NULL の記録ではなく、tombstoneの記録を使用することをお勧めします。

Kafkaコネクタは、次のベストプラクティスを実装することで、1回限りの配信を実現します。

チャネルの開始/再開:

  • 特定のパーティションのチャネルを開始したり、再開したりする場合、Kafkaコネクタは、 getLatestCommittedOffsetToken API を通じてSnowflakeから取得した、最新のコミットされたオフセットトークンを信頼できる情報源として使用し、それに応じてKafkaのコンシューマーオフセットをリセットします。

  • コンシューマーオフセットがデータ保持期間内にない場合は、例外がスローされ、実行する適切なアクションを決定できます。

  • KafkaコネクタがKafkaのコンシューマーオフセットをリセットせず、それを信頼できる情報源として使用する唯一のシナリオは、Snowflakeからのオフセットトークンが NULL である場合です。この場合、コネクタはKafkaによって送信されたオフセットを受け入れ、その後オフセットトークンが更新されます。

記録の処理:

  • Kafkaの潜在的なバグによって発生する可能性のある非連続オフセットに対する追加の安全層を確保するために、Snowflakeは最新の処理されたオフセットを追跡するメモリ内変数を維持します。Snowflakeは、現在の行のオフセットが、最後に処理されたオフセットに1を加えた値に等しい場合にのみ行を受け入れます。これにより、インジェスチョンプロセスが継続的かつ正確であることを保証するための追加の保護層が得られます。

例外、障害、クラッシュ復旧への対処:

  • 復旧プロセスの一環として、Snowflakeは、チャネルを再開し、最新のコミットされたオフセットトークンを使用してコンシューマーオフセットをリセットすることにより、前に概説したチャネルの開始/再開ロジックを一貫して遵守します。これにより、Snowflakeは、コミットされた最新のオフセットトークンより1つ大きいオフセット値からデータを送信するようKafkaに通知し、データ損失をまったく発生させずに、障害発生時点からインジェスチョンを再開できるようになります。

再試行メカニズムの実装:

  • 潜在的な一時的問題に対処するために、Snowflakeには API 呼び出しに再試行メカニズムが組み込まれています。Snowflakeは、成功の可能性を高め、インジェスチョンプロセスに影響を与える断続的な障害のリスクを軽減するために、これらの API 呼び出しを複数回再試行します。

コンシューマーオフセットの提出:

  • Snowflakeは、一定の間隔で、最新のコミットされたオフセットトークンを使用してコンシューマーオフセットを提出し、インジェスチョンプロセスが確実にSnowflake内のデータの最新状態と継続的に一致するようにします。

コンバーター

Snowpipe Streamingを使用するKafkaコネクタは、次の key.converter または value.converter 値をサポートしていません。

  • com.snowflake.kafka.connector.records.SnowflakeJsonConverter

  • com.snowflake.kafka.connector.records.SnowflakeAvroConverter

  • com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry

カスタムSnowflakeコンバーターは、ファイルをテーブルステージに移動することにより、データのロードを妨げるエラーを処理します。このワークフローは、Snowpipe Streamingの 配信不能キュー と競合します。

配信不能キュー

Snowpipe Streamingを使用するKafkaコネクタは、壊れた記録または障害のために正常に処理できない記録の配信不能キュー(DLQ)をサポートしています。

モニタリングの詳細については、Apache Kafkaの ドキュメント をご参照ください。

スキーマ検出およびスキーマ進化

Snowpipe Streamingを使用したKafkaコネクタは、スキーマ検出および進化をサポートしています。Snowflakeのテーブルの構造を自動的に定義して進化させ、Kafkaコネクタによりロードされた新しいSnowpipe Streamingデータの構造をサポートすることができます。Snowpipe Streamingを使用したKafkaコネクタのスキーマ検出と進化を有効にするには、以下のKafkaプロパティを構成します。

  • snowflake.ingestion.method

  • snowflake.enable.schematization

  • schema.registry.url

詳細については、 Snowpipe Streamingを使用したKafkaコネクタのスキーマ検出および進化 をご参照ください。

請求および使用状況

Snowpipe Streamingの課金情報については、 Snowpipe Streamingコスト をご参照ください。