Snowpipe Streaming RESTAPI の利用開始:cURL および JWT のチュートリアル

注釈

プライマリおよびデフォルトの選択肢として :code:`snowpipe-streaming`SDK の使用をお勧めします。RESTAPI は高スループットのシナリオに最適化されていません。

このガイドでは、Snowpipe Streaming REST API および SnowSQL </developer-guide/sql-api/authenticating>` で生成された:doc:` JSON ウェブトークン(JWT)を使用してSnowflakeにデータをストリーミングする方法を説明します。

前提条件

始める前に、以下の対象が準備できていることをご確認ください。

Snowflakeのユーザーおよびオブジェクト:

キーペア認証用に構成されたSnowflakeユーザー。次の SQL コマンドを使用してパブリックキーを登録します。

ALTER USER MY_USER SET RSA_PUBLIC_KEY='<your-public-key>';
Copy

Snowflakeのデータベース、スキーマ、およびストリーミングインジェスチョン用の PIPE オブジェクト。次の SQL コマンド(MY_DATABASEMY_SCHEMAMY_PIPEMY_TABLE のようなプレースホルダーを任意の名前に置き換えます)を使用して作成できます。

-- Create Database and Schema
CREATE OR REPLACE DATABASE MY_DATABASE;
CREATE OR REPLACE SCHEMA MY_SCHEMA;

-- Create Target Table
CREATE OR REPLACE TABLE MY_TABLE (
    data VARIANT,
    c1 NUMBER,
    c2 STRING
);

-- Create PIPE Object for Streaming Ingestion
CREATE OR REPLACE PIPE MY_PIPE
AS COPY INTO MY_TABLE FROM (SELECT $1, $1:c1, $1:ts FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
Copy

インストールされたツール:

  • curl:HTTP リクエストを行うために使用します。

  • jq:JSON 応答を解析するために使用します。

  • SnowSQL:コマンドを実行するために使用するSnowflakeのコマンドラインクライアント。

生成された JWT: SnowSQL を使用して JWT を生成します。

snowsql --private-key-path rsa_key.p8 --generate-jwt \
  -a <ACCOUNT_LOCATOR> \
  -u MY_USER
Copy

注意

JWT を安全に保存します。ログやスクリプトで公開しないでください。

手順

次のステップを実行して、Snowflakeにデータをストリーミングします。

ステップ1:環境変数を設定する

Snowflakeアカウントとストリーミング操作に必要な環境変数を設定します。

# Paste the JWT token obtained from SnowSQL
export JWT_TOKEN="PASTE_YOUR_JWT_TOKEN_HERE"

# Configure your Snowflake account and resources:
export ACCOUNT="<ACCOUNT_LOCATOR>" # For example, ab12345
export USER="MY_USER"
export DB="MY_DATABASE"
export SCHEMA="MY_SCHEMA"
export PIPE="MY_PIPE"
export CHANNEL="MY_CHANNEL"

# Replace ACCOUNT with your Account URL Host to form the control plane host:
export CONTROL_HOST="${ACCOUNT}.snowflakecomputing.com"
Copy

ステップ2:インジェストホストを検出する

インジェストホストは、ストリーミングデータのエンドポイントです。JWT を使用してインジェストホストを検出します。

export INGEST_HOST=$(curl -sS -X GET \
  -H "Authorization: Bearer $JWT_TOKEN" \
  -H "X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT" \
  "https://${CONTROL_HOST}/v2/streaming/hostname")

echo "Ingest Host: $INGEST_HOST"
Copy

インジェストホストでの操作を承認するために、スコープ付きトークンを取得します。

export SCOPED_TOKEN=$(curl -sS -X POST "https://$CONTROL_HOST/oauth/token" \
  -H 'Content-Type: application/x-www-form-urlencoded' \
  -H "Authorization: Bearer $JWT_TOKEN" \
  -d "grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&scope=${INGEST_HOST}")

echo "Scoped Token obtained for ingest host"
Copy

ステップ3:チャネルを開く

ストリーミングチャンネルを開き、データインジェスチョンを開始します。

curl -sS -X PUT \
  -H "Authorization: Bearer $SCOPED_TOKEN" \
  -H "Content-Type: application/json" \
  "https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL" \
  -d '{}' | tee open_resp.json | jq .
Copy

ステップ4:データ行を追加する

開いているチャネルにデータの1行を追加します。

4.1 継続とオフセットトークンを抽出する

これらのトークンは、ストリーミングセッションの状態を維持するために重要です。

export CONT_TOKEN=$(jq -r '.next_continuation_token' open_resp.json)
export OFFSET_TOKEN=$(jq -r '.channel_status.last_committed_offset_token' open_resp.json)
export NEW_OFFSET=$((OFFSET_TOKEN + 1))
Copy

4.2 サンプル行を作成する

NDJSON 形式でサンプルデータ行を生成します。

export NOW_TS=$(date -u +"%Y-%m-%dT%H:%M:%SZ")

cat <<EOF > rows.ndjson
{
  "id": 1,
  "c1": $RANDOM,
  "ts": "$NOW_TS"
}
EOF
Copy

4.3 行を追加する

サンプル行をストリーミングチャネルに送信します。

curl -sS -X POST \
  -H "Authorization: Bearer $SCOPED_TOKEN" \
  -H "Content-Type: application/x-ndjson" \
  "https://${INGEST_HOST}/v2/streaming/data/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL/rows?continuationToken=$CONT_TOKEN&offsetToken=$NEW_OFFSET" \
  --data-binary @rows.ndjson | jq .
Copy

重要

追加の操作ごとに、次の追加呼び出しのために継続トークンを更新する必要があります。行追加呼び出しからの応答には更新を行うために使用する必要がある next_continuation_token フィールドが含まれています。

4.4 データの取り込みを確認する

データがSnowflakeテーブルに取り込まれたことを確認します。

Snowflakeで次の SQL クエリを実行します。

SELECT * FROM MY_DATABASE.MY_SCHEMA.MY_TABLE WHERE id = 1;
Copy

ステップ5:チャネルステータスを取得する

ストリーミングチャネルの現在のステータスを確認します。

curl -sS -X POST \
  -H "Authorization: Bearer $SCOPED_TOKEN" \
  -H "Content-Type: application/json" \
  "https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE:bulk-channel-status" \
  -d "{\"channel_names\": [\"$CHANNEL\"]}" | jq ".channel_statuses.\"$CHANNEL\""
Copy

(任意)ステップ6:クリーンアップする

仮ファイルを削除し、環境変数の設定を解除します。

rm -f rows.ndjson open_resp.json
unset JWT_TOKEN SCOPED_TOKEN ACCOUNT USER DB SCHEMA PIPE CHANNEL CONTROL_HOST INGEST_HOST CONT_TOKEN OFFSET_TOKEN NEW_OFFSET NOW_TS
Copy

トラブルシューティング

  • HTTP 401(未承認): JWT トークンは有効であり、期限切れになっていないことを確認します。必要に応じて、再生成します。

  • HTTP 404(見つかりません): データベース、スキーマ、パイプ、およびチャネル名が正しいスペルで記載され、Snowflakeアカウントに存在することを再確認します。

  • インジェストホストなし: コントロールプレーンのホスト URL は正しく、アクセス可能であることを確認します。