Snowpipe Streaming RESTAPI の利用開始:cURL および JWT のチュートリアル¶
注釈
RESTAPI でSnowpipe Streaming SDK を始めて、パフォーマンスと使用開始のエクスペリエンスの向上を実感することをお勧めします。
このガイドでは、Snowpipe Streaming REST API および SnowSQL </developer-guide/sql-api/authenticating>` で生成された:doc:` JSON ウェブトークン(JWT)を使用してSnowflakeにデータをストリーミングする方法を説明します。
前提条件¶
始める前に、以下の対象が準備できていることをご確認ください。
Snowflakeのユーザーおよびオブジェクト:
キーペア認証用に構成されたSnowflakeユーザー。次の SQL コマンドを使用してパブリックキーを登録します。
ALTER USER MY_USER SET RSA_PUBLIC_KEY='<your-public-key>';
A Snowflake database, schema, and a target table for streaming ingestion. You can create them by using the following SQL commands and replacing placeholders like MY_DATABASE, MY_SCHEMA, MY_TABLE with the names that you want:
-- Create Database and Schema
CREATE OR REPLACE DATABASE MY_DATABASE;
CREATE OR REPLACE SCHEMA MY_SCHEMA;
-- Create Target Table
CREATE OR REPLACE TABLE MY_TABLE (
id NUMBER,
c1 NUMBER,
ts STRING
);
ACCOUNT_IDENTIFIER:
We suggest that you use Format 1 for the ACCOUNT_IDENTIFIER, which uses the account name within your organization; for example, myorg-account123. For more information on the format, see アカウント識別子.
Installed tools:
curl:HTTP リクエストを行うために使用します。jq:JSON 応答を解析するために使用します。SnowSQL:コマンドを実行するために使用するSnowflakeのコマンドラインクライアント。
Generated JWT:
Generate your JWT by using SnowSQL:
snowsql --private-key-path rsa_key.p8 --generate-jwt \
-a <ACCOUNT_IDENTIFIER> \
-u MY_USER
注意
JWT を安全に保存します。ログやスクリプトで公開しないでください。
手順¶
次のステップを実行して、Snowflakeにデータをストリーミングします。
ステップ1:環境変数を設定する¶
Set up the necessary environment variables for your Snowflake account and the streaming operation. Note that the PIPE variable targets the default streaming pipe associated with your table.
# Paste the JWT token obtained from SnowSQL
export JWT_TOKEN="PASTE_YOUR_JWT_TOKEN_HERE"
# Configure your Snowflake account and resources:
export ACCOUNT="<ACCOUNT_IDENTIFIER>" # For example, ab12345
export USER="MY_USER"
export DB="MY_DATABASE"
export SCHEMA="MY_SCHEMA"
export PIPE="MY_TABLE-STREAMING"
export CHANNEL="MY_CHANNEL"
# Replace ACCOUNT with your Account URL Host to form the control plane host:
export CONTROL_HOST="${ACCOUNT}.snowflakecomputing.com"
ステップ2:インジェストホストを検出する¶
重要
Snowflakeアカウント名にアンダースコアが含まれている場合(例: MY_ACCOUNT)、既知の問題により、インジェスチョンサービスを呼び出すときに内部エラーが発生する可能性があります。
スコープトークンを生成する前に、INGEST_HOST ですべてのアンダースコアをダッシュに置き換える必要があります。この変換済みの形式(ダッシュを使用)を、スコープトークン自体の生成など、後続のすべての RESTAPI 呼び出しに使用する必要があります。
たとえば、返されたホスト名が my_account.region.ingest.snowflakecomputing.com である場合、後続のすべての REST API 呼び出しのために my-account.region.ingest.snowflakecomputing.com に変更する必要があります。
インジェストホストは、ストリーミングデータのエンドポイントです。JWT を使用してインジェストホストを検出します。
export INGEST_HOST=$(curl -sS -X GET \
-H "Authorization: Bearer $JWT_TOKEN" \
-H "X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT" \
"https://${CONTROL_HOST}/v2/streaming/hostname")
echo "Ingest Host: $INGEST_HOST"
インジェストホストでの操作を承認するために、スコープ付きトークンを取得します。
export SCOPED_TOKEN=$(curl -sS -X POST "https://$CONTROL_HOST/oauth/token" \
-H 'Content-Type: application/x-www-form-urlencoded' \
-H "Authorization: Bearer $JWT_TOKEN" \
-d "grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&scope=${INGEST_HOST}")
echo "Scoped Token obtained for ingest host"
ステップ3:チャネルを開く¶
ストリーミングチャンネルを開き、データインジェスチョンを開始します。
curl -sS -X PUT \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/json" \
"https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL" \
-d '{}' | tee open_resp.json | jq .
ステップ4:データ行を追加する¶
開いているチャネルにデータの1行を追加します。
4.1 継続とオフセットトークンを抽出する¶
これらのトークンは、ストリーミングセッションの状態を維持するために重要です。
export CONT_TOKEN=$(jq -r '.next_continuation_token' open_resp.json)
export OFFSET_TOKEN=$(jq -r '.channel_status.last_committed_offset_token' open_resp.json)
export NEW_OFFSET=$((OFFSET_TOKEN + 1))
4.2 サンプル行を作成する¶
NDJSON 形式でサンプルデータ行を生成します。
export NOW_TS=$(date -u +"%Y-%m-%dT%H:%M:%SZ")
cat <<EOF > rows.ndjson
{
"id": 1,
"c1": $RANDOM,
"ts": "$NOW_TS"
}
EOF
4.3 行を追加する¶
サンプル行をストリーミングチャネルに送信します。
curl -sS -X POST \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/x-ndjson" \
"https://${INGEST_HOST}/v2/streaming/data/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL/rows?continuationToken=$CONT_TOKEN&offsetToken=$NEW_OFFSET" \
--data-binary @rows.ndjson | jq .
重要
After each append operation, you must update the
continuationTokenfor the next append call. The response from the append rows call contains anext_continuation_tokenfield that you should use to make your updates.追加操作が成功した場合、データがサービスによって受信されたことのみが確認され、データがテーブルに保存されたことは確認されません。次のバッチにクエリを実行するか移動する前に、次のステップを実行して永続性を確認します。
4.4 getChannelStatus を使用して、データ永続性とコミットされたオフセットを確認する¶
アプリケーションの信頼性を確保するために、この重要なステップを完了します。committedOffset が進むまで、データの永続性は保証されません。追加した行が正常に永続化されていることを確認するには、 getChannelStatus を使用します。
ストリーミングチャネルの現在のステータスを確認します。
curl -sS -X POST \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/json" \
"https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE:bulk-channel-status" \
-d "{\"channel_names\": [\"$CHANNEL\"]}" | jq ".channel_statuses.\"$CHANNEL\""
検証チェック
応答で返される committedOffset が、追加した行のオフセット以上であることを確認する必要があります。committedOffset が進んだ後にのみ、データがテーブル内で安全に利用できることを確認できるようになります。
4.5 永続データのテーブルをクエリする¶
前のステップ(4.4) で committedOffset が進んだことを確認したら、クエリして、データがSnowflakeテーブルにインジェストされていることを確認できます。
Snowflakeで次の SQL クエリを実行します。
SELECT * FROM MY_DATABASE.MY_SCHEMA.MY_TABLE WHERE id = 1;
(Optional) Step 5: Clean up¶
仮ファイルを削除し、環境変数の設定を解除します。
rm -f rows.ndjson open_resp.json
unset JWT_TOKEN SCOPED_TOKEN ACCOUNT USER DB SCHEMA PIPE CHANNEL CONTROL_HOST INGEST_HOST CONT_TOKEN OFFSET_TOKEN NEW_OFFSET NOW_TS
トラブルシューティング¶
HTTP 401(未承認): JWT トークンは有効であり、期限切れになっていないことを確認します。必要に応じて、再生成します。
HTTP 404(見つかりません): データベース、スキーマ、パイプ、およびチャネル名が正しいスペルで記載され、Snowflakeアカウントに存在することを再確認します。
インジェストホストなし: コントロールプレーンのホスト URL は正しく、アクセス可能であることを確認します。