Snowpipe Streaming

Snowpipe Streaming API (「 API 」) を呼び出すと、Snowflake Ingest SDK と独自の管理アプリケーションコードを使用して、ストリーミングデータ行の低遅延ロードが促されます。ストリーミングインジェスト API は、ステージングされたファイルにデータを書き込む一括データロードやSnowpipeとは異なり、データ行をSnowflakeテーブルに書き込みます。このアーキテクチャにより、ロード遅延が短縮され、同様の量のデータをロードするためのコストが削減されるため、リアルタイムデータストリームを処理するための強力なツールになります。

このトピックでは、API を呼び出すカスタムのクライアントアプリケーションの概念について説明します。関連するKafka用Snowflakeコネクタ(「Kafkaコネクタ」)の手順については、 Snowpipe StreamingでのKafka用Snowflakeコネクタの使用 をご参照ください。

このトピックの内容:

Snowpipe Streaming API 対 Snowpipe

API はSnowpipeを置き換えるものではなく、Snowpipeを補完することを目的としています。データがファイルに書き込まれるのではなく、行 (例: Apache Kafkaトピック) を介してストリーミングされるストリーミングシナリオでは、Snowpipe Streaming API を使用します。API は、記録を生成または受信する既存のカスタムJavaアプリケーションを含むインジェストワークフローに適合します。 API により、Snowflakeテーブルにデータをロードするためのファイルを作成する必要がなくなり、データが利用可能になると、Snowflakeへのデータストリームの自動継続的なロードが可能になります。

Snowpipe Streaming

次のテーブルでは、Snowpipe StreamingとSnowpipeの違いについて説明します。

カテゴリ

Snowpipe Streaming

Snowpipe

ロードするデータの形式

ファイル。既存のデータパイプラインがBLOBストレージにファイルを生成する場合は、 API の代わりにSnowpipeを使用することをお勧めします。

サードパーティのソフトウェア要件

Snowflake Ingest SDK 用のカスタムJavaアプリケーションのコードラッパー。

なし

データの順序

各チャネル内の順序付き挿入

サポート対象外です。Snowpipeは、クラウドストレージのファイル作成タイムスタンプとは異なる順序でファイルからデータをロードできます。

ロード履歴

SNOWPIPE_STREAMING_FILE_MIGRATION_HISTORY ビュー (Account Usage) に記録されたロード履歴。

LOAD_HISTORY ビュー (Account Usage) と COPY_HISTORY 関数 (Information Schema) に記録されたロード履歴

パイプオブジェクト

パイプオブジェクトは必要ありません。 APIは、記録をターゲットテーブルに直接書き込みます。

ステージングされたファイルデータをキューに入れ、ターゲットテーブルにロードするパイプオブジェクトが必要です。

ソフトウェア要件

Java SDK

Snowpipe Streamingサービスは現在、Snowflake Ingest SDK 用 APIs のセットとして実装されています。SDK は、 Maven Central Repository からダウンロードできます。Snowflakeは、Snowflake Ingest SDK バージョン2.0.2以降の使用をお勧めします。

SDK はJavaバージョン8 (またはそれ以降) をサポートします。 Java暗号化拡張 (JCE) 無制限強度の管轄ポリシーファイル が必要です。

重要

SDK は、Snowflakeに対して REST API を呼び出します。接続を許可するには、ネットワークのファイアウォールの規則調整が必要になる場合があります。

カスタムのクライアントアプリケーション

API には、データ行をポンピングし、発生したエラーを処理できるカスタムJavaアプリケーションインターフェイスが必要です。アプリケーションが継続的に実行され、障害から回復できるようにする責任はご自身にあります。指定した行のバッチに対して、 API は ON_ERROR = CONTINUE | SKIP_BATCH | ABORT に同等のものをサポートします。

  • CONTINUE: 許容できるデータ行のロードを続け、すべてのエラーを返します。

  • SKIP_BATCH: バッチ全体の行で何らかのエラーが発生した場合、ロードをスキップしてすべてのエラーを返します。

  • ABORT (デフォルト設定): 行のバッチ全体を中止し、最初のエラーが発生したときに例外をスローします。

アプリケーションは、 insertRow (単一行)または insertRows (行のセット)メソッドからの応答を使用してエラーをキャプチャする必要があります。

チャネル

API は、1つ以上の チャネル を介して行をインジェストします。チャネルは、データをテーブルにロードするためのSnowflakeへの論理的な名前付きストリーミング接続を表します。単一のチャネルは、Snowflakeの1つのテーブルに必ずマップされます。ただし、同じテーブルを複数のチャネルがポイントすることはできます。クライアント SDK は複数のチャネルを複数のテーブルに開くことができますが、 SDK はアカウント間でチャネルを開くことはできません。行の順序とそれに対応するオフセットトークンはチャネル内で保持されますが、同じテーブルをポイントするチャネル間では保持されません。

チャネルは、クライアントが活発にデータを挿入していると長い間維持され、オフセットトークン情報が保持されるため再利用されるべきです。チャネル内のデータはデフォルトで1秒ごとに自動的にフラッシュされ、閉じる必要はありません。詳細については、 遅延 をご参照ください。

Snowpipe Streamingクライアントのチャネルテーブルマッピング

SHOW CHANNELS コマンドを実行すると、アクセス権限のあるチャンネルをリストできます。詳細については、 SHOW CHANNELS をご参照ください。

注釈

非アクティブなチャネルとそのオフセットトークンは、30日を過ぎると自動的に削除されます。

オフセットトークン

オフセットトークン は、チャネルごとにインジェスチョンの進行状況を追跡するために、クライアントが insertRow (単一行)または insertRows (行のセット)メソッドリクエストに含めることができる文字列です。トークンはチャネルの作成時に NULL に初期化され、提供されたオフセットトークンを持つ行が非同期プロセスを通じてSnowflakeにコミットされるときに更新されます。クライアントは定期的に getLatestCommittedOffsetToken メソッドリクエストを作成して、特定のチャネルに対してコミットされた最新のオフセットトークンを取得し、それを使用してインジェスチョンの進行状況を判断できます。このトークンは、Snowflakeが重複除外を実行するためには使用 しない ことに注意してください。ただし、クライアントはこのトークンにより、カスタムコードを使用して重複除外を実行することはできます。

クライアントがチャネルを再度開くと、最新の永続化されたオフセットトークンが返されます。クライアントは、トークンを使用してデータソース内の位置をリセットし、同じデータを2回送信することを回避できます。チャネルの再開イベントが発生すると、Snowflakeにバッファーされたデータはコミットを回避するために破棄されることに注意してください。

最新のコミットされたオフセットトークンを使用して、次の一般的な使用例を実行できます。

  • インジェスチョンの進行状況の追跡

  • 特定のオフセットがコミットされているかどうかを、最後にコミットされたオフセットトークンと比較することによって確認します。

  • ソースオフセットの提出と、すでにコミットされたデータのパージ

  • 重複排除の有効化と、データを1回だけ配信することの保証

たとえば、Kafkaコネクタはトピックから <パーティション>:<オフセット> などのオフセットトークンを読み取ることができます。または、パーティションがチャネル名にエンコードされている場合は、単に <オフセット> を読み取ることができます。次のシナリオを検討してください。

  1. Kafkaコネクタがオンラインになり、Kafkaトピック TPartition 1 に対応するチャネルが T:P1 というチャネル名で開きます。

  2. コネクタは、Kafkaパーティションからの記録の読み取りを開始します。

  3. コネクタは API を呼び出し、記録に関連付けられたオフセットをオフセットトークンとして使用して insertRows メソッドリクエストを作成します。

    たとえば、オフセットトークンは 10 で、Kafkaパーティションの10番目の記録を参照します。

  4. コネクタは定期的に getLatestCommittedOffsetToken メソッドリクエストを実行し、インジェストの進行状況を判断します。

Kafkaコネクタがクラッシュした場合は、次のプロシージャを完了して、Kafkaパーティションの正しいオフセットから記録の読み取りを再開できます。

  1. Kafkaコネクタがオンラインに戻り、以前と同じ名前を使用してチャネルを再度開きます。

  2. コネクタは API を呼び出し、 getLatestCommittedOffsetToken メソッドリクエストを作成して、パーティションの最新のコミット済みオフセットを取得します。

    たとえば、最新の永続オフセットトークンが 20 であるとします。

  3. コネクタは、Kafka読み取り APIs を使用して、オフセット+1 (この例では 21)に対応するカーソルをリセットします。

  4. コネクタは記録の読み取りを再開します。読み取りカーソルが正常に再配置されると、重複するデータは取得されません。

別の例では、アプリケーションはディレクトリからログを読み込み、Snowpipe Streaming Client SDK を使用してこれらのログをSnowflakeにエクスポートします。次を実行するログエクスポートアプリケーションをビルドできます。

  1. ログディレクトリ内のファイルをリストする。

    ログフレームワークが生成するログファイルは辞書順に並べることができ、新しいログファイルはこの順序の最後に配置されるとします。

  2. ログファイルを1行ずつ読み取り、 API を呼び出して、ログファイル名と行数またはバイト位置に対応するオフセットトークンを使用して insertRows メソッドリクエストを作成する。

    たとえば、オフセットトークンは messages_1.log:20 のようになります。ここで、 messages_1.log はログファイルの名前で、 20 は行番号です。

アプリケーションがクラッシュしたり、再起動が必要になったりすると、 API が呼び出され、最後にエクスポートされたログファイルと行に該当するオフセットトークンを取得する getLatestCommittedOffsetToken メソッドリクエストが作成されます。例を続けると、これは messages_1.log:20 になります。その後、アプリケーションは messages_1.log を開き、行 21 を検索して、同じログ行が2回インジェストされないようにします。

注釈

オフセットトークンの情報が失われる可能性があります。オフセットトークンはチャネルオブジェクトにリンクされており、チャネルを使用して新しいインジェスチョンが30日間実行されない場合、チャネルは自動的にクリアされます。オフセットトークンの損失を防ぐために、別のオフセットを維持し、必要に応じてチャネルのオフセットトークンをリセットすることを検討してください。

1回限りの配信のベストプラクティス

正確に1回限りの配信を実現することは困難な場合があり、カスタムコードでは次の原則に従うことが重要です。

  • 例外、障害、またはクラッシュから確実かつ適切に復旧するには、常にチャネルを再開し、最新のコミットされたオフセットトークンを使用してインジェスチョンを再起動する必要があります。

  • アプリケーションは独自のオフセットを維持できますが、Snowflakeによって提供される最新のコミットされたオフセットトークンを信頼できる情報源として使用し、それに応じて独自のオフセットをリセットすることが重要です。

  • 独自のオフセットを信頼できる情報源として扱う必要がある唯一の例は、Snowflakeからのオフセットトークンが NULL に設定またはリセットされる場合です。NULL オフセットトークンは通常、次のいずれかを意味します。

    • これは新しいチャネルであるため、オフセットトークンが設定されていない。

    • ターゲットテーブルがドロップされて再作成されたため、チャネルが新規とみなされます。

    • 30日間、チャネルを介したインジェスチョンアクティビティがなかったため、チャネルは自動的にクリーンアップされ、オフセットトークン情報は失われました。

  • 必要に応じて、最新のコミットされたオフセットトークンに基づいてすでにコミットされているソースデータを定期的にパージし、独自のオフセットを提出することができます。

Snowpipe Streamingを使用したKafkaコネクタが1回限りの配信を実現する方法の詳細については、 必ず1回のセマンティクス をご参照ください。

遅延

Snowpipe Streamingは、チャネル内のデータを1秒ごとに自動的にフラッシュします。データをフラッシュするためにチャネルを閉じる必要はありません。

Snowflake Ingest SDK バージョン 2.0.4以降では、オプション max_client_lag を使用して遅延を設定できます。デフォルトのオプションは1秒です。最大遅延時間は10分まで設定できます。詳細については、 MAX_CLIENT_LAG をご参照ください。

Snowpipe Streaming用のKafkaコネクタには独自のバッファがあることに注意してください。Kafkaバッファのフラッシュ時間に達した後、データはSnowpipe Streamingを通じてSnowflakeに1秒の遅延で送信されます。詳細については、 バッファのフラッシュ時間 をご参照ください。

最適化されたファイルへの移行

API は、チャネルからの行をクラウドストレージのBLOBに書き込み、ターゲットテーブルにコミットします。最初に、ターゲットテーブルに書き込まれたストリームデータは、仮の中間ファイル形式で格納されます。このステージでは、テーブルは「混合テーブル」と見なされます。これは、ネイティブファイルと中間ファイルが混ざった状態でパーティションされたデータが格納されているためです。自動化されたバックグラウンドプロセスにより、必要に応じて、アクティブな中間ファイルから、クエリおよび DML 操作用に最適化されたネイティブファイルにデータが移行されます。

複製

Snowpipe Streamingは、Snowpipe Streamingとそれに関連するチャネルオフセットによって入力されたSnowflakeテーブルの 複製とフェールオーバー を、異なる リージョン のソースアカウントとターゲットアカウント間、また クラウドプラットフォーム 間でサポートしています。

詳細については、 複製およびSnowpipe Streaming をご参照ください。

挿入のみの操作

API は現在、行の挿入に限られています。データを変更、削除、または結合するには、「生」記録を1つ以上のステージングされたテーブルに書き込みます。 継続的なデータパイプライン を使用してデータをマージ、結合、または変換し、変更されたデータを宛先のレポートテーブルに挿入します。

クラスおよびインターフェイス

クラスとインターフェイスのドキュメントについては、 Snowflake Ingest SDK API をご参照ください。

サポートされているJavaデータ型

次のテーブルは、Snowflake列へのインジェスチョンでサポートされているJavaデータ型をまとめたものです。

Snowflake列型

許可されたJavaデータ型

  • CHAR

  • VARCHAR

  • 文字列

  • プリミティブデータ型(int、boolean、charなど)

  • BigInteger、 BigDecimal

  • BINARY

  • byte[]

  • 文字列(16進数エンコード)

  • NUMBER

  • 数値型(BigInteger、 BigDecimal、byte、int、doubleなど)

  • 文字列

  • FLOAT

  • 数値型(BigInteger、 BigDecimal、byte、int、doubleなど)

  • 文字列

  • BOOLEAN

  • boolean

  • 数値型(BigInteger、 BigDecimal、byte、int、doubleなど)

  • 文字列

ブール変換の詳細 をご参照ください。

  • TIME

  • java.time.LocalTime

  • java.time.OffsetTime

  • 文字列

    • 整数格納時刻

    • HH24:MI:SS.FFTZH:TZM (例: 20:57:01.123456789+07:00

    • HH24:MI:SS.FF (例: 20:57:01.123456789

    • HH24:MI:SS (例: 20:57:01

    • HH24:MI (例: 20:57

  • DATE

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.time.Instant

  • 文字列

    • 整数格納日付

    • YYYY-MM-DD (例: 2013-04-28

    • YYYY-MM-DDTHH24:MI:SS.FFTZH:TZM (例: 2013-04-28T20:57:01.123456789+07:00

    • YYYY-MM-DDTHH24:MI:SS.FF (例: 2013-04-28T20:57:01.123456

    • YYYY-MM-DDTHH24:MI:SS (例: 2013-04-28T20:57:01

    • YYYY-MM-DDTHH24:MI (例: 2013-04-28T20:57

    • YYYY-MM-DDTHH24:MI:SSTZH:TZM (例: 2013-04-28T20:57:01-07:00

    • YYYY-MM-DDTHH24:MITZH:TZM (例: 2013-04-28T20:57-07:00

  • TIMESTAMP_NTZ

  • TIMESTAMP_LTZ

  • TIMESTAMP_TZ

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.time.Instant

  • 文字列

    • 整数格納タイムスタンプ

    • YYYY-MM-DD (例: 2013-04-28

    • YYYY-MM-DDTHH24:MI:SS.FFTZH:TZM (例: 2013-04-28T20:57:01.123456789+07:00

    • YYYY-MM-DDTHH24:MI:SS.FF (例: 2013-04-28T20:57:01.123456

    • YYYY-MM-DDTHH24:MI:SS (例: 2013-04-28T20:57:01

    • YYYY-MM-DDTHH24:MI (例: 2013-04-28T20:57

    • YYYY-MM-DDTHH24:MI:SSTZH:TZM (例: 2013-04-28T20:57:01-07:00

    • YYYY-MM-DDTHH24:MITZH:TZM (例: 2013-04-28T20:57-07:00

  • VARIANT

  • ARRAY

  • 文字列(有効な JSON にする必要あり)

  • プリミティブデータ型とその配列

  • BigInteger、 BigDecimal

  • java.time.LocalTime

  • java.time.OffsetTime

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.util.Map<文字列, T>。Tは有効な VARIANT 型

  • T[]。Tは有効な VARIANT 型

  • List<T>。Tは有効な VARIANT 型

  • OBJECT

  • 文字列(有効な JSON オブジェクトにする必要あり)

  • Map<文字列, T>。Tは有効なバリアント型

  • GEOGRAPHY

  • サポート対象外

  • GEOMETRY

  • サポート対象外

必要なアクセス権限

Snowpipe Streaming API を呼び出すには、次の権限を持つロールが必要です。

オブジェクト

権限

テーブル

OWNERSHIP または INSERT と EVOLVE SCHEMA の最小 (Snowpipe StreamingでKafkaコネクタのスキーマ進化を使用する場合にのみ必要)

データベース

USAGE

スキーマ

USAGE

制限事項

Snowpipe Streamingは、データ暗号化に256ビット AES キーの使用のみをサポートします。

以下のオブジェクトや型はサポートされて いません

  • GEOGRAPHY および GEOMETRY データ型

  • 列の照合順序設定

  • TRANSIENT または TEMPORARY テーブル

  • 以下のいずれかの列設定を持つテーブル:

    • AUTOINCREMENT または IDENTITY

    • NULL ではないデフォルトの列値