Snowpipe StreamingでのKafka用Snowflakeコネクタの使用¶
必要に応じて、KafkaからのデータのロードチェーンでSnowpipeを Snowpipe Streaming に置き換えます。指定されたフラッシュバッファーのしきい値(時間、メモリ、またはメッセージ数)に達すると、仮のステージングされたファイルにデータを書き込むSnowpipeとは異なり、コネクタはSnowpipe Streaming API (「API」)を呼び出して、データの行をSnowflakeテーブルから書き込みます。このアーキテクチャにより、ロード遅延が短縮され、同様の量のデータをロードするためのコストが削減されます。
Snowpipe Streamingで使用するには、Kafkaコネクタのバージョン1.9.1(またはそれ以上)が必要です。Snowpipe Streamingで使用するKafkaコネクタには、Snowflake Ingest SDK が含まれており、Apache Kafkaトピックからターゲットテーブルに直接行をストリーミングできます。
注釈
現在、Snowpipe Streamingを使用するKafkaコネクタは、スキーマ検出またはスキーマ進化をサポートしていません。Snowpipeで使用されるものと同じ テーブルのスキーマ を使用します。
このトピックの内容:
必要最小バージョン¶
Kafkaコネクタバージョン1.9.1(またはそれ以上)は、Snowpipe Streamingをサポートしています。
Kafka構成プロパティ¶
接続設定をKafkaコネクタのプロパティファイルに保存します。詳細については、 Kafkaコネクタの構成 をご参照ください。
必須のプロパティ¶
Kafkaコネクタのプロパティファイルで接続設定を追加または編集します。詳細については、 Kafkaコネクタの構成 をご参照ください。
snowflake.ingestion.method
ストリーミングインジェストクライアントとしてKafkaコネクタを使用する場合にのみ必要です。 Kafkaトピックデータをロードするために、Snowpipe Streamingまたは標準Snowpipeのいずれを使用するかを指定します。サポートされている値は次のとおりです。
SNOWPIPE_STREAMING
SNOWPIPE
(デフォルト)
トピックデータをキューに入れてロードするために、バックエンドサービスを選択する追加の設定は必要ありません。通常どおり、Kafkaコネクタのプロパティファイルで追加のプロパティを構成します。
snowflake.role.name
テーブルに行を挿入するときに使用するアクセス制御ロール。
バッファーおよびポーリングのプロパティ¶
buffer.flush.time
バッファーフラッシュ間の秒数。フラッシュごとに、バッファーされた記録の挿入操作が実行されます。Kafkaコネクタは、各フラッシュの後にSnowpipe Streaming API(「API」)を1回呼び出します。
buffer.flush.time
プロパティでサポートされる最小値は1
(秒単位)です。データの平均フローレートを高くするには、遅延を低減するためにデフォルト値を下げることをお勧めします。遅延よりもコストが重要な場合は、バッファーのフラッシュ時間を増やすことができます。メモリ不足の例外を回避するため、バッファーがいっぱいになる前にKafkaメモリバッファーをフラッシュするように注意してください。- 値
1
- 上限なし。- デフォルト
10
buffer.count.records
Snowflakeにインジェストされる前に、Kafkaパーティションごとにメモリにバッファされる記録の数。
- 値
1
- 上限なし。- デフォルト
10000
buffer.size.bytes
データファイルとしてSnowflakeに取り込まれる前に、Kafkaパーティションごとにメモリーにバッファーされたレコードの累積サイズ(バイト単位)。
レコードは、データファイルに書き込まれるときに圧縮されます。その結果、バッファー内のレコードのサイズは、レコードから作成されたデータファイルのサイズよりも大きくなる可能性があります。
- 値
1
- 上限なし。- デフォルト
20000000
(20 MB)
Kafkaコネクタのプロパティに加えて、Kafkaコンシューマー max.poll.records
のプロパティに注意してください。これは、1回のポーリングでKafkaからKafka Connectに返される記録の最大数を制御します。 500
のデフォルト値は増やすことができますが、メモリの制約に注意してください。このプロパティの詳細については、Kafkaパッケージのドキュメントをご参照ください。
エラー処理および DLQ プロパティ¶
errors.tolerance
Kafkaコネクタで発生したエラーの処理方法を指定します。
このプロパティは、次の値をサポートします。
NONE
最初のエラーが発生したときにデータのロードを停止します。
ALL
すべてのエラーを無視して、データのロードを続行します。
- デフォルト
NONE
errors.log.enable
Kafka Connectログファイルにエラーメッセージを書き込むかどうかを指定します。
このプロパティは、次の値をサポートします。
TRUE
エラーメッセージを書き込みます。
FALSE
エラーメッセージを書き込みません。
- デフォルト
FALSE
errors.deadletterqueue.topic.name
SnowflakeテーブルにインジェストできなかったメッセージをKafkaに配信するために、Kafkaの DLQ(配信不能キュー)トピック名を指定します。詳細については、 配信不能キュー (このトピック内)をご参照ください。
- 値
カスタムテキスト文字列
- デフォルト
なし。
必ず1回のセマンティクス¶
必ず1回のセマンティクスにより、重複やデータ損失なしでKafkaメッセージの配信が保証されます。この配信保証は、Snowpipe Streamingを使用するKafkaコネクタに対してデフォルトで設定されています。
Kafkaコネクタは、パーティションとチャネル間の1対1マッピングを採用し、2つの異なるオフセットを利用します。
コンシューマーオフセット: これは、コンシューマーによって消費された最新のオフセットを追跡し、Kafkaによって管理されます。
オフセットトークン: これは、Snowflakeでコミットされた最新のオフセットを追跡し、Snowflakeによって管理されます。
Kafkaコネクタは、次のベストプラクティスを実装することで、1回限りの配信を実現します。
チャネルの開始/再開:
特定のパーティションのチャネルを開始したり、再開したりする場合、Kafkaコネクタは、
getLatestCommittedOffsetToken
API を通じてSnowflakeから取得した、最新のコミットされたオフセットトークンを信頼できる情報源として使用し、それに応じてKafkaのコンシューマーオフセットをリセットします。コンシューマーオフセットがデータ保持期間内にない場合は、例外がスローされ、実行する適切なアクションを決定できます。
KafkaコネクタがKafkaのコンシューマーオフセットをリセットせず、それを信頼できる情報源として使用する唯一のシナリオは、Snowflakeからのオフセットトークンが NULL である場合です。この場合、コネクタはKafkaによって送信されたオフセットを受け入れ、その後オフセットトークンが更新されます。
記録の処理:
Kafkaの潜在的なバグによって発生する可能性のある非連続オフセットに対する追加の安全層を確保するために、Snowflakeは最新の処理されたオフセットを追跡するメモリ内変数を維持します。Snowflakeは、現在の行のオフセットが、最後に処理されたオフセットに1を加えた値に等しい場合にのみ行を受け入れます。これにより、インジェスチョンプロセスが継続的かつ正確であることを保証するための追加の保護層が得られます。
例外、障害、クラッシュ復旧への対処:
復旧プロセスの一環として、Snowflakeは、チャネルを再開し、最新のコミットされたオフセットトークンを使用してコンシューマーオフセットをリセットすることにより、前に概説したチャネルの開始/再開ロジックを一貫して遵守します。これにより、Kafkaは、コミットされた最新のオフセットトークンより1つ大きいオフセット値からデータを送信するよう通知され、データ損失を最小限またはまったく発生させずに、障害発生時点からインジェスチョンを再開できるようになります。
再試行メカニズムの実装:
潜在的な一時的問題に対処するために、Snowflakeには API 呼び出しに再試行メカニズムが組み込まれています。Snowflakeは、成功の可能性を高め、インジェスチョンプロセスに影響を与える断続的な障害のリスクを軽減するために、これらの API 呼び出しを複数回再試行します。
コンシューマーオフセットの提出:
Snowflakeは、一定の間隔で、最新のコミットされたオフセットトークンを使用してコンシューマーオフセットを提出し、インジェスチョンプロセスが確実にSnowflake内のデータの最新状態と継続的に一致するようにします。
コンバーター¶
Snowpipe Streamingを使用するKafkaコネクタは、次の key.converter
または value.converter
値をサポートしていません。
com.snowflake.kafka.connector.records.SnowflakeJsonConverter
com.snowflake.kafka.connector.records.SnowflakeAvroConverter
com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry
カスタムSnowflakeコンバーターは、ファイルをテーブルステージに移動することにより、データのロードを妨げるエラーを処理します。このワークフローは、Snowpipe Streamingの 配信不能キュー と競合します。
配信不能キュー¶
Snowpipe Streamingを使用するKafkaコネクタは、壊れた記録または障害のために正常に処理できない記録の配信不能キュー(DLQ)をサポートしています。
モニタリングの詳細については、Apache Kafkaの ドキュメント をご参照ください。
請求および使用状況¶
Snowpipe Streamingの課金情報については、 Snowpipe Streamingコスト をご参照ください。