Snowpipe Streaming移行ガイド¶
このガイドでは、従来のSnowpipe Java SDK から高性能Snowpipe Streaming SDK への移行方法を説明します。 ここで取り上げるアーキテクチャの変更および API の更新は、Python SDK への移行にも適用されます。これは高性能アーキテクチャが両方の言語で利用可能であるためです。このドキュメントのコード例はJavaのものですが、移行のコア原則は言語間で一貫しています。
主なアーキテクチャの変更¶
次のテーブルは、高性能Snowpipe Streaming SDK における最も重要なアーキテクチャの変更をまとめたものです。SDKs の詳細な比較については、高性能のSnowpipe Streamingと従来の SDKs の比較 をご参照ください。
エリア |
従来(snowflake-ingest-java) |
高性能(snowpipe-streaming SDK) |
|---|---|---|
エントリーポイント |
データは直接テーブルにインジェストされます。 |
データは PIPE オブジェクトを介してインジェストされます。これは変換とスキーマの強制をサポートしています。 |
SDK / コア |
Java SDK のみ。 |
共有Rust Coreを使用した複数の言語(JavaおよびPython)の SDK。 |
API 名 |
|
|
エラー処理 |
クライアント側の検証が行われます。 |
より豊富なエラーフィードバックを含むサーバー側の検証が提供されます。 |
バックプレッシャー処理 |
スレッドをスリープさせ、ブロック/非応答状態にします。 |
エラーを返し、呼び出し元がバックオフ/再試行戦略を実装できるようにします。 |
クライアントからテーブルへのマッピング |
単一のクライアントオブジェクトは、任意のテーブルにチャネルを開くことができました。 |
単一のクライアントオブジェクトは、1つのパイプオブジェクトに排他的に関連付けられるようになりました。 |
請求 |
コンピューティングとクライアント数に基づく。 |
フラット、インジェストされた GB あたり。 |
スキーマ / 変換 |
クライアント側で管理されます。 |
PIPE 定義を通じてサーバー側で管理されます。 |
移行プロセス¶
アプリケーションを高性能 SDK に移行するには、次の高レベルのステップを完了します。
各ターゲットテーブルの PIPE を作成します。
CREATE PIPE my_pipe AS COPY INTO my_table FROM TABLE (DATA_SOURCE(TYPE => 'STREAMING')) MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE [CLUSTER_AT_INGEST_TIME = TRUE];
すべての従来のクライアントからのインジェスチョンを停止します。
従来のクライアントの各チャネルについて、最後にコミットされたオフセットを確認します。これらのオフセットを取得するには、従来の SDK の``getLatestCommittedOffsetTokens()`` メソッドを使用します。これらのオフセットがクライアント側の記録と一致していることを確認します。
アプリケーションコードを更新します。
プロジェクトの依存関係を高性能 SDK(JavaまたはPython)に切り替えます。
次の API および構成の変更 セクションで詳述されているように、API 呼び出しを更新します。
Snowflakeから最後にコミットされたオフセットを使用して、テーブル/PIPE ごとに1つのクライアントを初期化します。
新しいクライアントが構成され安定したら、インジェスチョンを再開します。
API および構成の変更¶
次の変更は、移行中に API 呼び出しと構成設定に対して行う必要があります。
クライアントの初期化¶
従来:
builder(name)高性能:
builder(name, db, schema, pipeName)
チャネル¶
従来:
openChannel(OpenChannelRequest)高性能:
openChannel(channelName, offsetToken)はチャネルとステータスの両方を返します
インジェスチョンメソッド¶
従来:
insertRow/insertRows(...)高性能:
appendRow/appendRows(...)
オフセットの追跡¶
従来の SDK の
getLatestCommittedOffsetTokens(channels)メソッドは可視性が限られており、エラーコンテキストがありません。高性能 SDK は引き続き
getLatestCommittedOffsetTokens(...)をサポートしますが、堅牢な監視のために、getChannelStatuses(...)を使用することを推奨します。このメソッドは次のタスクを実行します。オフセットが想定どおりに進行していることを確認する。
チャネルごとにエラーカウントと詳細なエラー情報を返す。
データパイプラインの積極的なモニタリングとトラブルシューティングを可能にする。
半構造化データの処理¶
高性能 SDK に移行する際に、アプリケーションがどのように ARRAY 列および VARIANT 列のデータを提供するかを確認して、データがリテラル文字列として保存されないようにします。
動作変更¶
シリアル化された文字列リテラル(例:「[1, 2, 3]」)をv2の ARRAY 列に渡すと、その文字列リテラルを含む単一要素配列が生成されます。従来のアーキテクチャの動作を維持するには、次のオプションのいずれかを選択します。
オプション1:ネイティブオブジェクトを渡す(推奨)¶
クライアントアプリケーションを更新して、appendRow を呼び出す前に、JSON 文字列を逆シリアル化してネイティブオブジェクトにします。
Java:配列に
java.util.Listを使用し、オブジェクトにjava.util.Mapを使用します。Python:ネイティブの
list型およびdict型を使用します。
利点:デフォルトパイプおよびスキーマの自動進化と互換性があります。
オプション2:パイプ側の変換¶
PARSE_JSON 関数を使用して変換ロジックが設定された Pipe オブジェクトを明示的に定義します。
例 SQL
CREATE PIPE my_pipe AS
COPY INTO my_table (my_array_col)
FROM (SELECT PARSE_JSON($1:my_array_col) FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
注釈
この方法には、デフォルトのパイプおよびスキーマの自動進化機能との互換性がありません。