チュートリアル: cURL および JWT を使用してSnowpipe Streaming REST API の利用を開始する¶
注釈
RESTAPI でSnowpipe Streaming SDK を始めて、パフォーマンスと使用開始のエクスペリエンスの向上を実感することをお勧めします。
このガイドでは、Snowpipe Streaming REST API および SnowSQL </developer-guide/sql-api/authenticating>` で生成された:doc:` JSON ウェブトークン(JWT)を使用してSnowflakeにデータをストリーミングする方法を説明します。
前提条件¶
始める前に、以下の対象が準備できていることをご確認ください。
Snowflakeのユーザーおよびオブジェクト:
キーペア認証用に構成されたSnowflakeユーザー。次の SQL コマンドを使用してパブリックキーを登録します。
Snowflakeのデータベース、スキーマ、およびストリーミングインジェスチョン用のターゲットテーブル。次の SQL コマンドを使用し、 MY_DATABASE、 MY_SCHEMA、 MY_TABLE などのプレースホルダーを必要な名前に置き換えることで、これらを作成できます。
ACCOUNT_IDENTIFIER:
ACCOUNT_IDENTIFIER には、組織内のアカウント名(たとえば myorg-account123)を使用する形式1を使用することをお勧めします。形式の詳細については、 アカウント識別子 をご参照ください。
インストール済みツール:
curl:HTTP リクエストを行うために使用します。jq:JSON 応答を解析するために使用します。SnowSQL:コマンドを実行するために使用するSnowflakeのコマンドラインクライアント。
生成済み JWT:
SnowSQL を使って JWT を生成します。
注意
JWT を安全に保存します。ログやスクリプトで公開しないでください。
手順¶
次のステップを実行して、Snowflakeにデータをストリーミングします。
ステップ1:環境変数を設定する¶
Snowflakeアカウントとストリーミング操作に必要な環境変数を設定します。PIPE 変数は、テーブルに関連付けられたデフォルトのストリーミングパイプをターゲットにすることに留意してください。
ステップ2:インジェストホストを検出する¶
重要
Snowflakeアカウント名にアンダースコアが含まれている場合(例: MY_ACCOUNT)、既知の問題により、インジェスチョンサービスを呼び出すときに内部エラーが発生する可能性があります。
スコープトークンを生成する前に、INGEST_HOST ですべてのアンダースコアをダッシュに置き換える必要があります。この変換済みの形式(ダッシュを使用)を、スコープトークン自体の生成など、後続のすべての RESTAPI 呼び出しに使用する必要があります。
たとえば、返されたホスト名が my_account.region.ingest.snowflakecomputing.com である場合、後続のすべての REST API 呼び出しのために my-account.region.ingest.snowflakecomputing.com に変更する必要があります。
インジェストホストは、ストリーミングデータのエンドポイントです。JWT を使用してインジェストホストを検出します。
インジェストホストでの操作を承認するために、スコープ付きトークンを取得します。
ステップ3:チャネルを開く¶
ストリーミングチャンネルを開き、データインジェスチョンを開始します。
ステップ4:データ行を追加する¶
開いているチャネルにデータの1行を追加します。
4.1 継続とオフセットトークンを抽出する¶
これらのトークンは、ストリーミングセッションの状態を維持するために重要です。
4.2 サンプル行を作成する¶
NDJSON 形式でサンプルデータ行を生成します。
4.3 行を追加する¶
サンプル行をストリーミングチャネルに送信します。
注釈
この例には、圧縮のサポートを示す Content-Encoding: zstd ヘッダーが含まれています。非圧縮データを含むこの簡単な例では、このヘッダーを省略できます。圧縮データを送信する場合は、zstd または gzip のいずれかを指定して、ペイロードの圧縮形式に一致させます。
重要
追加の操作ごとに、次の追加呼び出しのために
continuationTokenを更新する必要があります。行追加呼び出しからの応答には更新を行うために使用する必要があるnext_continuation_tokenフィールドが含まれています。追加操作が成功した場合、データがサービスによって受信されたことのみが確認され、データがテーブルに保存されたことは確認されません。次のバッチにクエリを実行するか移動する前に、次のステップを実行して永続性を確認します。
4.4 getChannelStatus を使用して、データ永続性とコミットされたオフセットを確認する¶
アプリケーションの信頼性を確保するために、この重要なステップを完了します。committedOffset が進むまで、データの永続性は保証されません。追加した行が正常に永続化されていることを確認するには、 getChannelStatus を使用します。
ストリーミングチャネルの現在のステータスを確認します。
検証チェック
応答で返される committedOffset が、追加した行のオフセット以上であることを確認する必要があります。committedOffset が進んだ後にのみ、データがテーブル内で安全に利用できることを確認できるようになります。
4.5 永続データのテーブルをクエリする¶
前のステップ(4.4) で committedOffset が進んだことを確認したら、クエリして、データがSnowflakeテーブルにインジェストされていることを確認できます。
Snowflakeで次の SQL クエリを実行します。
(オプション)ステップ5: クリーンアップする¶
仮ファイルを削除し、環境変数の設定を解除します。
トラブルシューティング¶
HTTP 401(未承認): JWT トークンは有効であり、期限切れになっていないことを確認します。必要に応じて、再生成します。
HTTP 404(見つかりません): データベース、スキーマ、パイプ、およびチャネル名が正しいスペルで記載され、Snowflakeアカウントに存在することを再確認します。
インジェストホストなし: コントロールプレーンのホスト URL は正しく、アクセス可能であることを確認します。