Get Started with Snowpipe Streaming REST API: A cURL and JWT tutorial¶
This guide shows you how to stream data into Snowflake using the Snowpipe Streaming REST API and a JSON Web Token (JWT) generated with SnowSQL.
Prerequisites¶
Before you begin, ensure you have the following items:
Snowflake User and Objects:
A Snowflake user that is configured for key-pair authentication. Register your public key by using the following SQL command:
ALTER USER MY_USER SET RSA_PUBLIC_KEY='<your-public-key>';
A Snowflake database, schema, and a PIPE object for streaming ingestion. You can create them using the following SQL commands (replace placeholders like MY_DATABASE
, MY_SCHEMA
, MY_PIPE
, MY_TABLE
with your desired names):
-- Create Database and Schema
CREATE OR REPLACE DATABASE MY_DATABASE;
CREATE OR REPLACE SCHEMA MY_SCHEMA;
-- Create Target Table
CREATE OR REPLACE TABLE MY_TABLE (
data VARIANT,
c1 NUMBER,
c2 STRING
);
-- Create PIPE Object for Streaming Ingestion
CREATE OR REPLACE PIPE MY_PIPE
AS COPY INTO MY_TABLE FROM (SELECT $1, $1:c1, $1:ts FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
Installed Tools:
curl
: For making HTTP requests.jq
: For parsing JSON responses.SnowSQL
: For running commands, Snowflake’s command-line client.
Generated JWT: Generate your JWT by using SnowSQL:
snowsql --private-key-path rsa_key.p8 --generate-jwt \
-a <ACCOUNT_LOCATOR> \
-u MY_USER
Caution
Store your JWT securely. Avoid exposing it in logs or scripts.
Step-by-step instructions¶
Complete the following steps to stream data into Snowflake.
Step 1: Set environment variables¶
Set up the necessary environment variables for your Snowflake account and the streaming operation:
# Paste the JWT token obtained from SnowSQL
export JWT_TOKEN="PASTE_YOUR_JWT_TOKEN_HERE"
# Configure your Snowflake account and resources:
export ACCOUNT="<ACCOUNT_LOCATOR>" # For example, ab12345
export USER="MY_USER"
export DB="MY_DATABASE"
export SCHEMA="MY_SCHEMA"
export PIPE="MY_PIPE"
export CHANNEL="MY_CHANNEL"
# Replace ACCOUNT with your Account URL Host to form the control plane host:
export CONTROL_HOST="${ACCOUNT}.snowflakecomputing.com"
Step 2: Discover ingest host¶
The ingest host is the endpoint for streaming data. Discover the ingest host by using your JWT:
export INGEST_HOST=$(curl -sS -X GET \
-H "Authorization: Bearer $JWT_TOKEN" \
-H "X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT" \
"https://${CONTROL_HOST}/v2/streaming/hostname")
echo "Ingest Host: $INGEST_HOST"
Obtain a scoped token to authorize operations on the ingest host:
export SCOPED_TOKEN=$(curl -sS -X POST "https://$CONTROL_HOST/oauth/token" \
-H 'Content-Type: application/x-www-form-urlencoded' \
-H "Authorization: Bearer $JWT_TOKEN" \
-d "grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&scope=${INGEST_HOST}")
echo "Scoped Token obtained for ingest host"
Step 3: Open the channel¶
Open a streaming channel to begin data ingestion:
curl -sS -X PUT \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/json" \
"https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL" \
-d '{}' | tee open_resp.json | jq .
Step 4: Append a row of data¶
Append a single row of data to the open channel.
4.1 Extract continuation and offset tokens¶
These tokens are crucial for maintaining the state of your streaming session.
export CONT_TOKEN=$(jq -r '.next_continuation_token' open_resp.json)
export OFFSET_TOKEN=$(jq -r '.channel_status.last_committed_offset_token' open_resp.json)
export NEW_OFFSET=$((OFFSET_TOKEN + 1))
4.2 Create sample row¶
Generate a sample data row in NDJSON format:
export NOW_TS=$(date -u +"%Y-%m-%dT%H:%M:%SZ")
cat <<EOF > rows.ndjson
{
"id": 1,
"c1": $RANDOM,
"ts": "$NOW_TS"
}
EOF
4.3 Append row¶
Send the sample row to the streaming channel:
curl -sS -X POST \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/x-ndjson" \
"https://${INGEST_HOST}/v2/streaming/data/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL/rows?continuationToken=$CONT_TOKEN&offsetToken=$NEW_OFFSET" \
--data-binary @rows.ndjson | jq .
Important
After each append operation, you must update the continuationToken for the next append call. The response from the append rows call contains a next_continuation_token
field that you should use to make your updates.
4.4 Verify the data ingestion¶
Confirm that the data was ingested into your Snowflake table.
Run the following SQL query in Snowflake:
SELECT * FROM MY_DATABASE.MY_SCHEMA.MY_TABLE WHERE id = 1;
Step 5: Get channel status¶
Check the current status of your streaming channel:
curl -sS -X POST \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/json" \
"https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE:bulk-channel-status" \
-d "{\"channel_names\": [\"$CHANNEL\"]}" | jq ".channel_statuses.\"$CHANNEL\""
(Optional) Step 6: Clean up¶
Remove temporary files and unset environment variables:
rm -f rows.ndjson open_resp.json
unset JWT_TOKEN SCOPED_TOKEN ACCOUNT USER DB SCHEMA PIPE CHANNEL CONTROL_HOST INGEST_HOST CONT_TOKEN OFFSET_TOKEN NEW_OFFSET NOW_TS
Troubleshooting¶
HTTP 401 (Unauthorized): Verify that your JWT token is valid and not expired. If needed, regenerate it.
HTTP 404 (Not Found): Double-check that the database, schema, pipe, and channel names are spelled correctly and exist in your Snowflake account.
No Ingest Host: Ensure your control plane host URL is correct and accessible.