Snowpipe Streaming RESTAPI の利用開始:cURL および JWT のチュートリアル¶
注釈
RESTAPI でSnowpipe Streaming SDK を始めて、パフォーマンスと使用開始のエクスペリエンスの向上を実感することをお勧めします。
このガイドでは、Snowpipe Streaming REST API および SnowSQL </developer-guide/sql-api/authenticating>` で生成された:doc:` JSON ウェブトークン(JWT)を使用してSnowflakeにデータをストリーミングする方法を説明します。
前提条件¶
始める前に、以下の対象が準備できていることをご確認ください。
Snowflakeのユーザーおよびオブジェクト:
キーペア認証用に構成されたSnowflakeユーザー。次の SQL コマンドを使用してパブリックキーを登録します。
ALTER USER MY_USER SET RSA_PUBLIC_KEY='<your-public-key>';
Snowflakeのデータベース、スキーマ、およびストリーミングインジェスチョン用の PIPE オブジェクト。次の SQL コマンド(MY_DATABASE、MY_SCHEMA、MY_PIPE、MY_TABLE のようなプレースホルダーを任意の名前に置き換えます)を使用して作成できます。
-- Create Database and Schema
CREATE OR REPLACE DATABASE MY_DATABASE;
CREATE OR REPLACE SCHEMA MY_SCHEMA;
-- Create Target Table
CREATE OR REPLACE TABLE MY_TABLE (
data VARIANT,
c1 NUMBER,
c2 STRING
);
-- Create PIPE Object for Streaming Ingestion
CREATE OR REPLACE PIPE MY_PIPE
AS COPY INTO MY_TABLE FROM (SELECT $1, $1:c1, $1:ts FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
ACCOUNT_IDENTIFIER:
ACCOUNT_IDENTIFIER については形式1の使用を推奨しています。これは、組織内のアカウント名を使用するものです(たとえば myorg-account123)。形式の詳細については、 アカウント識別子 をご参照ください。
インストール済みツール:
curl:HTTP リクエストを行うために使用します。jq:JSON 応答を解析するために使用します。SnowSQL:コマンドを実行するために使用するSnowflakeのコマンドラインクライアント。
生成済み JWT:
SnowSQL を使って JWT を生成します。
snowsql --private-key-path rsa_key.p8 --generate-jwt \
-a <ACCOUNT_IDENTIFIER> \
-u MY_USER
注意
JWT を安全に保存します。ログやスクリプトで公開しないでください。
手順¶
次のステップを実行して、Snowflakeにデータをストリーミングします。
ステップ1:環境変数を設定する¶
Snowflakeアカウントとストリーミング操作に必要な環境変数を設定します。
# Paste the JWT token obtained from SnowSQL
export JWT_TOKEN="PASTE_YOUR_JWT_TOKEN_HERE"
# Configure your Snowflake account and resources:
export ACCOUNT="<ACCOUNT_IDENTIFIER>" # For example, ab12345
export USER="MY_USER"
export DB="MY_DATABASE"
export SCHEMA="MY_SCHEMA"
export PIPE="MY_PIPE"
export CHANNEL="MY_CHANNEL"
# Replace ACCOUNT with your Account URL Host to form the control plane host:
export CONTROL_HOST="${ACCOUNT}.snowflakecomputing.com"
ステップ2:インジェストホストを検出する¶
重要
Snowflakeアカウント名にアンダースコアが含まれている場合(例: MY_ACCOUNT)、既知の問題により、インジェスチョンサービスを呼び出すときに内部エラーが発生する可能性があります。
スコープトークンを生成する前に、INGEST_HOST ですべてのアンダースコアをダッシュに置き換える必要があります。この変換済みの形式(ダッシュを使用)を、スコープトークン自体の生成など、後続のすべての RESTAPI 呼び出しに使用する必要があります。
たとえば、返されたホスト名が my_account.region.ingest.snowflakecomputing.com である場合、後続のすべての REST API 呼び出しのために my-account.region.ingest.snowflakecomputing.com に変更する必要があります。
インジェストホストは、ストリーミングデータのエンドポイントです。JWT を使用してインジェストホストを検出します。
export INGEST_HOST=$(curl -sS -X GET \
-H "Authorization: Bearer $JWT_TOKEN" \
-H "X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT" \
"https://${CONTROL_HOST}/v2/streaming/hostname")
echo "Ingest Host: $INGEST_HOST"
インジェストホストでの操作を承認するために、スコープ付きトークンを取得します。
export SCOPED_TOKEN=$(curl -sS -X POST "https://$CONTROL_HOST/oauth/token" \
-H 'Content-Type: application/x-www-form-urlencoded' \
-H "Authorization: Bearer $JWT_TOKEN" \
-d "grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&scope=${INGEST_HOST}")
echo "Scoped Token obtained for ingest host"
ステップ3:チャネルを開く¶
ストリーミングチャンネルを開き、データインジェスチョンを開始します。
curl -sS -X PUT \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/json" \
"https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL" \
-d '{}' | tee open_resp.json | jq .
ステップ4:データ行を追加する¶
開いているチャネルにデータの1行を追加します。
4.1 継続とオフセットトークンを抽出する¶
これらのトークンは、ストリーミングセッションの状態を維持するために重要です。
export CONT_TOKEN=$(jq -r '.next_continuation_token' open_resp.json)
export OFFSET_TOKEN=$(jq -r '.channel_status.last_committed_offset_token' open_resp.json)
export NEW_OFFSET=$((OFFSET_TOKEN + 1))
4.2 サンプル行を作成する¶
NDJSON 形式でサンプルデータ行を生成します。
export NOW_TS=$(date -u +"%Y-%m-%dT%H:%M:%SZ")
cat <<EOF > rows.ndjson
{
"id": 1,
"c1": $RANDOM,
"ts": "$NOW_TS"
}
EOF
4.3 行を追加する¶
サンプル行をストリーミングチャネルに送信します。
curl -sS -X POST \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/x-ndjson" \
"https://${INGEST_HOST}/v2/streaming/data/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL/rows?continuationToken=$CONT_TOKEN&offsetToken=$NEW_OFFSET" \
--data-binary @rows.ndjson | jq .
重要
After each append operation, you must update the
continuationTokenfor the next append call. The response from the append rows call contains anext_continuation_tokenfield that you should use to make your updates.The success of the append operation confirms only that the data was received by the service, not that it is persisted to the table. Take the next step to verify persistence before querying or moving to the next batch.
4.4 Verify data persistence and committed offset by using getChannelStatus¶
Complete this critical step to ensure application reliability. Data isn't guaranteed to be persistent until the committedOffset has advanced. To confirm that the rows that you just appended are successfully persisted, use getChannelStatus.
ストリーミングチャネルの現在のステータスを確認します。
curl -sS -X POST \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/json" \
"https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE:bulk-channel-status" \
-d "{\"channel_names\": [\"$CHANNEL\"]}" | jq ".channel_statuses.\"$CHANNEL\""
Verification check
You must ensure that the committedOffset returned in the response is greater than or equal to the offset of the rows you just appended. Only after the committedOffset advances can you be certain that the data is safely available in the table.
4.5 Query the table for persisted data¶
After you confirm that the committedOffset has advanced in the previous step (4.4), you can query to confirm that the data is ingested into your Snowflake table.
Snowflakeで次の SQL クエリを実行します。
SELECT * FROM MY_DATABASE.MY_SCHEMA.MY_TABLE WHERE data:id::NUMBER = 1;
(Optional) Step 5: Clean up¶
仮ファイルを削除し、環境変数の設定を解除します。
rm -f rows.ndjson open_resp.json
unset JWT_TOKEN SCOPED_TOKEN ACCOUNT USER DB SCHEMA PIPE CHANNEL CONTROL_HOST INGEST_HOST CONT_TOKEN OFFSET_TOKEN NEW_OFFSET NOW_TS
トラブルシューティング¶
HTTP 401(未承認): JWT トークンは有効であり、期限切れになっていないことを確認します。必要に応じて、再生成します。
HTTP 404(見つかりません): データベース、スキーマ、パイプ、およびチャネル名が正しいスペルで記載され、Snowflakeアカウントに存在することを再確認します。
インジェストホストなし: コントロールプレーンのホスト URL は正しく、アクセス可能であることを確認します。