Snowpipe Streaming

Snowpipe Streaming API(「API」)を呼び出すと、Snowflake Ingest SDK と独自のマネージドアプリケーションコードを使用して、ストリーミングデータ行の低遅延ロードが促されます。ストリーミングインジェスト API は、ステージングされたファイルにデータを書き込む一括データロードやSnowpipeとは異なり、データ行をSnowflakeテーブルに書き込みます。このアーキテクチャにより、ロード遅延が短縮され、同様の量のデータをロードするためのコストが削減されるため、リアルタイムデータストリームを処理するための強力なツールになります。

このトピックでは、API を呼び出すカスタムのクライアントアプリケーションの概念について説明します。関連するKafka用Snowflakeコネクタ(「Kafkaコネクタ」)の手順については、 Snowpipe StreamingでのKafka用Snowflakeコネクタの使用 をご参照ください。

このトピックの内容:

Snowpipe Streaming API とSnowpipeの対比

API はSnowpipeを置き換えるものではなく、Snowpipeを補完することを目的としています。データがファイルに書き込まれるのではなく、行(例: Apache Kafkaトピック)を介してストリーミングされるストリーミングシナリオでは、Snowpipe Streaming API を使用します。API は、記録を生成または受信する既存のカスタムJavaアプリケーションを含むインジェストワークフローに適合します。API により、Snowflakeテーブルにデータをロードするためのファイルを作成する必要がなくなり、データが利用可能になると、Snowflakeへのデータストリームの自動継続的なロードが可能になります。

Snowpipe Streaming

次のテーブルでは、Snowpipe StreamingとSnowpipeの違いについて説明します。

カテゴリ

Snowpipe Streaming

Snowpipe

ロードするデータの形式

行。

ファイル。既存のデータパイプラインがBLOBストレージにファイルを生成する場合は、 API ではなくSnowpipeを使用することをお勧めします。

サードパーティのソフトウェア要件

Snowflake Ingest SDK 用のカスタムJavaアプリケーションのコードラッパー。

なし。

データの順序

各チャネル内の順序付き挿入。

サポート対象外です。Snowpipeは、クラウドストレージのファイル作成タイムスタンプとは異なる順序でファイルからデータをロードできます。

ロード履歴

SNOWPIPE_STREAMING_FILE_MIGRATION_HISTORY ビュー (Account Usage)に記録されたロード履歴。

LOAD_HISTORY ビュー (Account Usage)と COPY_HISTORY 関数 (Information Schema)に記録されたロード履歴。

パイプオブジェクト

パイプオブジェクトは必要ありません。API は、記録をターゲットテーブルに直接書き込みます。

ステージングされたファイルデータをキューに入れ、ターゲットテーブルにロードするパイプオブジェクトが必要です。

ソフトウェア要件

Java SDK

Snowpipe Streamingサービスは現在、Snowflake Ingest SDK 用 APIs のセットとして実装されています。SDK は、Maven Central Repository(https://mvnrepository.com/artifact/net.snowflake/snowflake-ingest-sdk)からダウンロードできます。SDK はJavaバージョン8(またはそれ以降)をサポートし、 Java暗号化拡張(JCE)無制限強度の管轄ポリシーファイル が必要です。

重要

SDK は、Snowflakeに対して REST API を呼び出します。接続を許可するには、ネットワークのファイアウォールの規則調整が必要になる場合があります。

カスタムのクライアントアプリケーション

API には、データ行をポンピングし、発生したエラーを処理できるカスタムJavaアプリケーションインターフェイスが必要です。アプリケーションが継続的に実行され、障害から回復できるようにする責任はご自身にあります。特定の行セットについて、 API は ON_ERROR = CONTINUE | ABORT と同等のものをサポートします。 ABORT は、最初のエラーが見つかった後にバッチ全体を中止します。これがデフォルト設定です。 CONTINUE は、エラーが見つかった場合にデータのロードを続行します。

アプリケーションは、 insertRow (単一行)または insertRows (行のセット)メソッドからの応答を使用してエラーをキャプチャする必要があります。

チャネル

API は、1つ以上の チャネル を介して行をインジェストします。チャネルは、データをテーブルにロードするためのSnowflakeへの論理的な名前付きストリーミング接続を表します。単一のチャネルは、Snowflakeの1つのテーブルに必ずマップされます。ただし、同じテーブルを複数のチャネルがポイントすることはできます。クライアント SDK は複数のチャネルを複数のテーブルに開くことができますが、 SDK はアカウント間でチャネルを開くことはできません。行の順序とそれに対応するオフセットトークンはチャネル内で保持されますが、同じテーブルをポイントするチャネル間では保持されません。

Snowpipe streaming client channel table mapping

注釈

非アクティブなチャネルとそのオフセットトークンは、14日後に自動的に削除されます。

オフセットトークン

オフセットトークン は、チャネルごとにインジェスチョンの進行状況を追跡するために、クライアントが insertRow (単一行)または insertRows (行のセット)メソッドリクエストに含めることができる文字列です。クライアントは定期的に getLatestCommittedOffsetToken メソッドリクエストを作成して、特定のチャネルに対してコミットされた最新のオフセットトークンを取得し、それを使用してインジェスチョンの進行状況を判断できます。このトークンは、Snowflakeが重複除外を実行するためには使用 しない ことに注意してください。ただし、クライアントはこのトークンにより、カスタムコードを使用して重複除外を実行することはできます。

クライアントがチャネルを再度開くと、最新の永続化されたオフセットトークンが返されます。クライアントは、トークンを使用してデータソース内の位置をリセットし、同じデータを2回送信することを回避できます。チャネルの再開イベントが発生すると、Snowflakeにバッファーされたデータはコミットを回避するために破棄されることに注意してください。

たとえば、Kafkaコネクタはトピックから <パーティション>:<オフセット> などのオフセットトークンを読み取ることができます。または、パーティションがチャネル名にエンコードされている場合は、単に <オフセット> を読み取ることができます。次のシナリオを検討してください。

  1. Kafkaコネクタがオンラインになり、Kafkaトピック TPartition 1 に対応するチャネルが T:P1 というチャネル名で開きます。

  2. コネクタは、Kafkaパーティションからの記録の読み取りを開始します。コネクタは API を呼び出し、記録に関連付けられたオフセットをオフセットトークンとして使用して insertRows メソッドリクエストを作成します。たとえば、オフセットトークンは 10 で、Kafkaパーティションの10番目の記録を参照します。

  3. コネクタは定期的に getLatestCommittedOffsetToken メソッドリクエストを実行し、インジェストの進行状況を判断します。

Kafkaコネクタがクラッシュした場合は、次のフローを完了して、Kafkaパーティションの正しいオフセットから記録の読み取りを再開できます。

  1. Kafkaコネクタがオンラインに戻り、以前と同じ名前を使用してチャネルを再度開きます。

  2. コネクタは API を呼び出し、 getLatestCommittedOffsetToken メソッドリクエストを作成して、パーティションの最新のコミット済みオフセットを取得します。たとえば、最新の永続オフセットトークンが 20 であるとします。

  3. コネクタは、Kafka読み取り APIs を使用して、オフセット+1(この例では 21)に対応するカーソルをリセットします。

  4. コネクタは記録の読み取りを再開します。読み取りカーソルが正常に再配置されると、重複するデータは取得されません。

別の例として、ディレクトリからログを読み取り、Snowpipe Streaming Client SDK を使用してそれらのログをSnowflakeにエクスポートするアプリケーションがあるとします。次を実行するログエクスポートアプリケーションをビルドできます。

  1. ログディレクトリ内のファイルをリストする。ログフレームワークが生成するログファイルは辞書順に並べることができ、新しいログファイルはこの順序の最後に配置されるとします。

  2. ログファイルを1行ずつ読み取り、 API を呼び出して、ログファイル名と行数またはバイト位置に対応するオフセットトークンを使用して insertRows メソッドリクエストを作成する。たとえば、オフセットトークンは messages_1.log:20 のようになります。ここで、 messages_1.log はログファイルの名前で、 20 は行番号です。

アプリケーションがクラッシュしたり、再起動が必要になったりすると、 API が呼び出され、最後にエクスポートされたログファイルと行に対応するオフセットトークンを取得する getLatestCommittedOffsetToken メソッドリクエストが作成されます。例を続けると、これは messages_1.log:20 になります。その後、アプリケーションは messages_1.log を開き、行 21 を検索して、同じログ行が2回インジェストされないようにします。

最適化されたファイルへの移行

API は、チャネルからの行をクラウドストレージのBLOBに書き込み、ターゲットテーブルにコミットします。最初に、ターゲットテーブルに書き込まれたストリームデータは、仮の中間ファイル形式で格納されます。この段階では、テーブルは「混合テーブル」と見なされます。これは、ネイティブファイルと中間ファイルが混ざった状態でパーティションされたデータが格納されているためです。自動化されたバックグラウンドプロセスにより、必要に応じて、アクティブな中間ファイルから、クエリおよび DML 操作用に最適化されたネイティブファイルにデータが移行されます。

挿入のみの操作

API は現在、行の挿入に限られています。データを変更、削除、または結合するには、「生」記録を1つ以上のステージングされたテーブルに書き込みます。 継続的なデータパイプライン を使用してデータをマージ、結合、または変換し、変更されたデータを宛先のレポートテーブルに挿入します。

クラスおよびインターフェイス

クラスとインターフェイスのドキュメントについては、 Snowflake Ingest SDK API をご参照ください。

サポートされているJavaデータ型

次のテーブルは、Snowflake列へのインジェスチョンでサポートされているJavaデータ型をまとめたものです。

Snowflake列型

許可されたJavaデータ型

  • CHAR

  • VARCHAR

  • 文字列

  • プリミティブデータ型(int、boolean、charなど)

  • BigInteger、 BigDecimal

  • BINARY

  • byte[]

  • 文字列(16進数エンコード)

  • NUMBER

  • 数値型(BigInteger、 BigDecimal、byte、int、doubleなど)

  • 文字列

  • FLOAT

  • 数値型(BigInteger、 BigDecimal、byte、int、doubleなど)

  • 文字列

  • BOOLEAN

  • boolean

  • 数値型(BigInteger、 BigDecimal、byte、int、doubleなど)

  • 文字列

ブール変換の詳細 をご参照ください。

  • TIME

  • java.time.LocalTime

  • java.time.OffsetTime

  • 文字列

    • 整数格納時刻

    • HH24:MI:SS.FFTZH:TZM (例: 20:57:01.123456789+07:00

    • HH24:MI:SS.FF (例: 20:57:01.123456789

    • HH24:MI:SS (例: 20:57:01

    • HH24:MI (例: 20:57

  • DATE

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.time.Instant

  • 文字列

    • 整数格納日付

    • YYYY-MM-DD (例: 2013-04-28

    • YYYY-MM-DDTHH24:MI:SS.FFTZH:TZM (例: 2013-04-28T20:57:01.123456789+07:00

    • YYYY-MM-DDTHH24:MI:SS.FF (例: 2013-04-28T20:57:01.123456

    • YYYY-MM-DDTHH24:MI:SS (例: 2013-04-28T20:57:01

    • YYYY-MM-DDTHH24:MI (例: 2013-04-28T20:57

    • YYYY-MM-DDTHH24:MI:SSTZH:TZM (例: 2013-04-28T20:57:01-07:00

    • YYYY-MM-DDTHH24:MITZH:TZM (例: 2013-04-28T20:57-07:00

  • TIMESTAMP_NTZ

  • TIMESTAMP_LTZ

  • TIMESTAMP_TZ

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.time.Instant

  • 文字列

    • 整数格納タイムスタンプ

    • YYYY-MM-DD (例: 2013-04-28

    • YYYY-MM-DDTHH24:MI:SS.FFTZH:TZM (例: 2013-04-28T20:57:01.123456789+07:00

    • YYYY-MM-DDTHH24:MI:SS.FF (例: 2013-04-28T20:57:01.123456

    • YYYY-MM-DDTHH24:MI:SS (例: 2013-04-28T20:57:01

    • YYYY-MM-DDTHH24:MI (例: 2013-04-28T20:57

    • YYYY-MM-DDTHH24:MI:SSTZH:TZM (例: 2013-04-28T20:57:01-07:00

    • YYYY-MM-DDTHH24:MITZH:TZM (例: 2013-04-28T20:57-07:00

  • VARIANT

  • ARRAY

  • 文字列(有効な JSON にする必要あり)

  • プリミティブデータ型とその配列

  • BigInteger、 BigDecimal

  • java.time.LocalTime

  • java.time.OffsetTime

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.util.Map<文字列, T>。Tは有効な VARIANT 型

  • T[]。Tは有効な VARIANT 型

  • List<T>。Tは有効な VARIANT 型

  • OBJECT

  • 文字列(有効な JSON オブジェクトにする必要あり)

  • Map<文字列, T>。Tは有効なバリアント型

  • GEOGRAPHY

  • サポート対象外

  • GEOMETRY

  • サポート対象外

制限事項

  • 次の列設定の いずれか を持つテーブルはサポートされていません。

    • AUTOINCREMENT または IDENTITY

    • NULL ではないデフォルトの列値。

  • GEOGRAPHY および GEOMETRY データ型はサポートされていません。

  • 列の照合順序設定はサポートされていません。

  • Snowpipe Streamingは、データ暗号化に256ビット AES キーの使用のみをサポートします。

  • TRANSIENT または TEMPORARY テーブルはサポートされていません。