Snowpipe StreamingでのKafka用Snowflakeコネクタの使用¶
必要に応じて、KafkaからのデータのロードチェーンでSnowpipeを Snowpipe Streaming に置き換えます。指定されたフラッシュバッファーのしきい値(時間、メモリ、またはメッセージ数)に達すると、仮のステージングされたファイルにデータを書き込むSnowpipeとは異なり、コネクタはSnowpipe Streaming API (「API」)を呼び出して、データの行をSnowflakeテーブルから書き込みます。このアーキテクチャにより、ロード遅延が短縮され、同様の量のデータをロードするためのコストが削減されます。
Snowpipe Streamingで使用するには、Kafkaコネクタのバージョン1.9.1(またはそれ以上)が必要です。Snowpipe Streamingで使用するKafkaコネクタには、Snowflake Ingest SDK が含まれており、Apache Kafkaトピックからターゲットテーブルに直接行をストリーミングできます。
注釈
現在、Snowpipe Streamingを使用するKafkaコネクタは、スキーマ検出またはスキーマ進化をサポートしていません。Snowpipeで使用されるものと同じ テーブルのスキーマ を使用します。
このトピックの内容:
必要最小バージョン¶
Kafkaコネクタバージョン1.9.1(またはそれ以上)は、Snowpipe Streamingをサポートしています。
Kafka構成プロパティ¶
接続設定をKafkaコネクタのプロパティファイルに保存します。詳細については、 Kafkaコネクタの構成 をご参照ください。
必須のプロパティ¶
Kafkaコネクタのプロパティファイルで接続設定を追加または編集します。詳細については、 Kafkaコネクタの構成 をご参照ください。
snowflake.ingestion.method
ストリーミングインジェストクライアントとしてKafkaコネクタを使用する場合にのみ必要です。 Kafkaトピックデータをロードするために、Snowpipe Streamingまたは標準Snowpipeのいずれを使用するかを指定します。サポートされている値は次のとおりです。
SNOWPIPE_STREAMING
SNOWPIPE
(デフォルト)
トピックデータをキューに入れてロードするために、バックエンドサービスを選択する追加の設定は必要ありません。通常どおり、Kafkaコネクタのプロパティファイルで追加のプロパティを構成します。
snowflake.role.name
テーブルに行を挿入するときに使用するアクセス制御ロール。
バッファーおよびポーリングのプロパティ¶
buffer.flush.time
バッファーフラッシュ間の秒数。フラッシュごとに、バッファーされた記録の挿入操作が実行されます。Kafkaコネクタは、各フラッシュの後にSnowpipe Streaming API(「API」)を1回呼び出します。
buffer.flush.time
プロパティでサポートされる最小値は1
(秒単位)です。データの平均フローレートを高くするには、遅延を低減するためにデフォルト値を下げることをお勧めします。遅延よりもコストが重要な場合は、バッファーのフラッシュ時間を増やすことができます。メモリ不足の例外を回避するため、バッファーがいっぱいになる前にKafkaメモリバッファーをフラッシュするように注意してください。- 値
1
- 上限なし。- デフォルト
10
buffer.count.records
Snowflakeにインジェストされる前に、Kafkaパーティションごとにメモリにバッファされる記録の数。
- 値
1
- 上限なし。- デフォルト
10000
buffer.size.bytes
データファイルとしてSnowflakeに取り込まれる前に、Kafkaパーティションごとにメモリーにバッファーされたレコードの累積サイズ(バイト単位)。
レコードは、データファイルに書き込まれるときに圧縮されます。その結果、バッファー内のレコードのサイズは、レコードから作成されたデータファイルのサイズよりも大きくなる可能性があります。
- 値
1
- 上限なし。- デフォルト
20000000
(20 MB)
Kafkaコネクタのプロパティに加えて、Kafkaコンシューマー max.poll.records
のプロパティに注意してください。これは、1回のポーリングでKafkaからKafka Connectに返される記録の最大数を制御します。 500
のデフォルト値は増やすことができますが、メモリの制約に注意してください。このプロパティの詳細については、Kafkaパッケージのドキュメントをご参照ください。
エラー処理および DLQ プロパティ¶
errors.tolerance
Kafkaコネクタで発生したエラーの処理方法を指定します。
このプロパティは、次の値をサポートします。
NONE
最初のエラーが発生したときにデータのロードを停止します。
ALL
すべてのエラーを無視して、データのロードを続行します。
- デフォルト
NONE
errors.log.enable
Kafka Connectログファイルにエラーメッセージを書き込むかどうかを指定します。
このプロパティは、次の値をサポートします。
TRUE
エラーメッセージを書き込みます。
FALSE
エラーメッセージを書き込みません。
- デフォルト
FALSE
errors.deadletterqueue.topic.name
SnowflakeテーブルにインジェストできなかったメッセージをKafkaに配信するために、Kafkaの DLQ(配信不能キュー)トピック名を指定します。詳細については、 配信不能キュー (このトピック内)をご参照ください。
- 値
カスタムテキスト文字列
- デフォルト
なし。
必ず1回のセマンティクス¶
必ず1回のセマンティクスにより、重複やデータ損失なしでKafkaメッセージの配信が保証されます。この配信保証は、Snowpipe Streamingを使用するKafkaコネクタに対してデフォルトで設定されています。
コンバーター¶
Snowpipe Streamingを使用するKafkaコネクタは、次の key.converter
または value.converter
値をサポートしていません。
com.snowflake.kafka.connector.records.SnowflakeJsonConverter
com.snowflake.kafka.connector.records.SnowflakeAvroConverter
com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry
カスタムSnowflakeコンバーターは、ファイルをテーブルステージに移動することにより、データのロードを妨げるエラーを処理します。このワークフローは、Snowpipe Streamingの 配信不能キュー と競合します。
配信不能キュー¶
Snowpipe Streamingを使用するKafkaコネクタは、壊れた記録または障害のために正常に処理できない記録の配信不能キュー(DLQ)をサポートしています。
モニタリングの詳細については、Apache Kafkaの ドキュメント をご参照ください。
請求および使用状況¶
Snowpipe Streamingの課金情報については、 Snowpipe Streamingの請求 をご参照ください。