Snowflake High Performance connector for Kafka

このトピックでは、|KAFKAOFHP|の基本概念、ユースケース、利点、主な機能、および制限について説明します。

注釈

|KAFKAOFHP|は、Kafkaトピックからデータを読み取り、そのデータをSnowflakeテーブルにロードする :emph:` シンクコネクタ` です。Kafka Connectとそのフレームワークの詳細については、 Apache KafkaおよびKafka Connectフレームワーク をご参照ください。

利点

|KAFKAOFHP|は、ほぼリアルタイムの分析情報を必要とする現代のデータ集約型組織向けに設計された、Snowflakeの 高性能Snowpipe Streamingアーキテクチャ を活用します。この次世代アーキテクチャは、Snowflakeへのリアルタイムインジェスチョンのスループット、効率、柔軟性を大幅に向上させます。

高性能アーキテクチャには、いくつかの重要な利点があります。

  • 優れたスループットとレイテンシ:テーブルあたり最大10GB/秒の取り込み速度をサポートするように設計されており、エンドツーエンドの取り込みからクエリまでのレイテンシは5~10秒以内で、ほぼリアルタイムの分析が可能になります。

  • 請求の簡略化:透明性が高くスループットに基づいて請求を行うため、コストの予測と理解がしやすくなります。

  • パフォーマンスの強化:以前の実装と比較して、クライアント側のパフォーマンス向上とリソース使用量の削減を実現するRustベースのクライアントコアを使用します。

  • 同時進行の変換:PIPEオブジェクト内のCOPYコマンド構文を使用して、取り込み中にデータのクレンジングと再形成をサポートし、データがターゲットテーブルに到達する前に変換できるようにします。

  • サーバー側のスキーマ検証:PIPEオブジェクトを介してスキーマ検証をクライアント側からサーバー側に移動するため、データの品質を確保しクライアントの複雑さを軽減します。

  • 事前クラスタリング機能:ターゲットテーブルにクラスタリングキーが定義されている場合は、インジェスチョン中にデータをクラスタリングできるため、取り込み後のメンテナンスを必要とせずにクエリパフォーマンスが向上します。

コネクタは、インジェスチョンを管理するための中心コンポーネントとしてSnowflake PIPE オブジェクトを使用します。PIPEオブジェクトは、すべてのストリーミングデータのエントリポイントおよび定義レイヤーとして機能し、データがターゲットテーブルにコミットされる前にどのように処理、変換、検証されるかを定義します。コネクタがテーブルとパイプで動作する方法の詳細については、 コネクタがテーブルとパイプで動作する方法 をご参照ください。

コネクタのバージョンの選択

KafkaコネクタはKafka Connectクラスターで実行され、Kafkaトピックからデータを読み取り、Snowflakeテーブルに書き込みます。

Snowflakeは、次の2つのバージョンのコネクタを提供します。どちらのバージョンのコネクタも、KafkaからSnowflakeにデータをストリーミングするコア機能は同じです。

  • コネクタのConfluentバージョン

    Kafka向け高性能Snowflakeコネクタは、Confluent Cloudではまだ利用できません。Confluent Cloudを使用している場合は、カスタムプラグインコネクタとしてコネクタを手動でインストールする必要があります。

    Confluentバージョンは、Confluent HubまたはConfluent Control Centerを介して簡単にインストールできるようにパッケージ化されており、Confluentプラットフォーム環境の最適化が含まれています。

    Confluent Platform、Confluent Kafka Dockerイメージ、またはConfluent Cloudを使用している場合は、このバージョンを選択してください。

    Confluentバージョンのコネクタを入手してインストールするには、Snowflakeサポートにお問い合わせください。

    Kafka Connectの詳細については、https://docs.confluent.io/current/connect/をご参照ください。

  • open source software (OSS) Apache Kafka package https://mvnrepository.com/artifact/com.snowflake/snowflake-kafka-connector/ - コネクタのOSS Apache Kafkaバージョン。

    Apacheバージョンは標準のJARファイルとして配布されており、Apache Kafka Connectクラスターに手動でインストールする必要があります。Apache Kafkaを実行している場合は、このバージョンを選択してください。

    Apache Kafkaの詳細については、https://kafka.apache.org/をご参照ください。

制限事項

|KAFKAOFHP|には、以下の制限があります。

テーブル作成:

宛先テーブルは、コネクタを開始する前に手動で作成する必要があります。コネクタはテーブルを自動作成しません。

バージョン3.x以前からの移行

バージョン3.x以前のパイプラインから新しいコネクタに手動で移行することができます。既存のパイプラインが、新しいコネクタでまだ利用できない機能に依存していないことを確認してください。

構成の安定性

構成パラメーター名は、Private Previewフェーズ中に変更される場合があります。使用する構成パラメーターは、Public Previewの前に名前変更や再構築が行われる場合があります。Snowflakeは、パラメーター名の変更の際に移行ガイダンスを提供します。

Kafkaコネクタの制限

バージョン3.x以前の既存パイプラインの移行

コネクタは、バージョン3.x以前の既存のパイプラインの移行をサポートしていません。既存のパイプラインを新しいコネクタに手動で移行する必要があります。

単一メッセージ変換(SMTs):

コミュニテ コンバーターを使用する場合、ほとんどの単一メッセージ変換(SMTs)がサポートされますが、現在サポートされていない regex.router は例外です。

SMTsの詳細については、 Confluent CloudまたはConfluent Platform向けKafka Connect単一メッセージ変換の参照情報 をご参照ください。

サポートされているKafkaバージョン

重要

コネクタには、一部サポートされていないバージョンがあります。サポートされているバージョンと、プレリリースおよびリリース候補については、以下のテーブルをご参照ください。

リリースシリーズ

ステータス

メモ

4.x.x

プライベートプレビュー

早期アクセス。現在、3.xおよび2.xバージョンからの移行はサポートされていません。

3.x.x

正式にサポート

最新バージョンで、強くお勧めします。

2.x.x

正式にサポート

アップグレードをお勧めします。

1.x.x

サポート対象外

このリリースシリーズは使用しないでください。

サポートされていない機能

以下の機能はサポートされていません。

スキーマの進化

スキーマの進化はサポートされていません。スキーマの変更は手動で管理する必要があります。詳細については、 スキーマの進化 をご参照ください。

Icebergテーブル

Icebergテーブルへのインジェスチョンはサポートされていません。

テーブルの自動作成

コネクタはテーブルを自動作成しません。コネクタを開始する前に、テーブルを手動で作成する必要があります。

破損した記録は、コネクタが配信不能キュー(DLQ)に送信しません。

errors.tolerance=all および errors.deadletterqueue.topic.name を設定すると、変換できないレコードのみがKafka ConnectレベルのエラーハンドラーによってDLQに送信されます。記録がコネクタに渡されても、Snowflakeへのインジェストに失敗した場合、DLQに送信されません。これは既存のSnowpipe Streaming高パフォーマンスの制限です。コネクタは、どの記録がSnowflakeにインジェストされなかったかを検出できません。特定の量の記録がインジェストされなかったことのみを検出できます。このため errors.tolerance=all パラメーターを使用すると、コネクタは 最大で1回 の配信のみを保証します。

インジェストに失敗した破損した記録は、手動で再試行する必要があります

errors.tolerance=none を設定した場合、コネクタはチャネルステータスで rows_error_count が0より大きいことを検出するとすぐにタスクを失敗させます。破損した記録を再試行するには、ユーザーはチャネル履歴を確認して記録を見つける必要があります。破損した記録とインジェスチョンエラーのトラブルシューティングの詳細については、 エラー処理 をご参照ください。メタデータオフセットを使用したエラーの検出と復旧 で説明されているギャップ検出テクニックを使用することもできます。この手法を使用するために必要なKafkaオフセット情報は、RECORD_METADATA 列で確認できます。

シークレットの外部化

Snowflakeは、秘密キーなどの秘密を外部化し、暗号化された形式、または AWS Key Management Service(KMS)、Microsoft Azure Key Vault、 HashiCorp Vaultなどのキー管理サービスで保存することを強くお勧めします。これは、Kafka Connectクラスターで ConfigProvider 実装を使用して実現できます。

詳細については、この サービス のConfluent説明をご参照ください。

テストとプロトタイピングのためのキャッシュに関する考慮事項

コネクタはテーブルとパイプの存在チェックをキャッシュし、パーティションの再バランス調整中のパフォーマンスを向上させます。ただし、テストおよびプロトタイピング中は、このキャッシュ動作により、コネクタが手動で作成されたテーブルまたはパイプをすぐに検出しない可能性があります。

問題: コネクタの実行中にテーブルやパイプを手動で作成した場合、コネクタはデフォルトで最大5分間、キャッシュされた存在チェック結果(オブジェクトが存在しないことを示すこともあります)を使用し続ける可能性があります。これにより、テスト中に予期しないエラーや動作が発生する可能性があります。

テストの推奨: テストおよびプロトタイピング中にキャッシュ関連の問題を回避するには、両方のキャッシュ有効期限パラメーターを最小値の 1 ミリ秒に設定するか、キャッシュを無効にします。

snowflake.cache.table.exists.expire.ms=1
snowflake.cache.pipe.exists.expire.ms=1
Copy

この構成により、パーティションの再バランスごとにコネクタが最新の存在チェックを実行でき、手動で作成されたテーブルやパイプの効果をすぐに確認できるようになります。

重要

これらの最小限のキャッシュ設定は、 テストとプロトタイプ作成のみ に推奨されます。実稼働環境では、デフォルトのキャッシュ有効期限値(5分以上)を使用して、Snowflakeへのメタデータクエリを最小限に抑え、特に多数のパーティションを処理する場合に再バランスのパフォーマンスを向上させます。

Private Previewバージョンの破壊的変更

破壊的変更のリストについては、Private Previewバージョンのリリースノートをご参照ください。

次のステップ

|KAFKAOFHP|をセットアップする手順については、 Snowflake High Performance connector for Kafka のタスクの設定 のトピックをご確認ください。 . コネクタがテーブルおよびパイプでどのように機能するかの詳細については、 コネクタの動作 のトピックをご参照ください。