Premiers pas avec l’API REST Snowpipe Streaming : Un tutoriel cURL et JWT¶
Note
Nous vous recommandons de commencer par le SDK Snowpipe Streaming sur l’API REST pour bénéficier de l’amélioration des performances et de l’expérience de démarrage.
Ce guide vous montre comment diffuser des données dans Snowflake à l’aide de l’API REST Snowpipe Streaming et d’un jeton web JSON (JWT) généré avec SnowSQL.
Conditions préalables¶
Avant de commencer, assurez-vous de disposer des éléments suivants :
Utilisateur et objets Snowflake :
Utilisateur Snowflake configuré pour l’authentification par paire de clés. Enregistrez votre clé publique en utilisant la commande SQLsuivante :
ALTER USER MY_USER SET RSA_PUBLIC_KEY='<your-public-key>';
A Snowflake database, schema, and a target table for streaming ingestion. You can create them by using the following SQL commands and replacing placeholders like MY_DATABASE, MY_SCHEMA, MY_TABLE with the names that you want:
-- Create Database and Schema
CREATE OR REPLACE DATABASE MY_DATABASE;
CREATE OR REPLACE SCHEMA MY_SCHEMA;
-- Create Target Table
CREATE OR REPLACE TABLE MY_TABLE (
id NUMBER,
c1 NUMBER,
ts STRING
);
ACCOUNT_IDENTIFIER :
We suggest that you use Format 1 for the ACCOUNT_IDENTIFIER, which uses the account name within your organization; for example, myorg-account123. For more information on the format, see Identificateurs de compte.
Installed tools:
curl: Pour effectuer des requêtes HTTP.jq: Pour effectuer des réponses JSON.SnowSQL: Pour l’exécution des commandes, le client de ligne de commande de Snowflake.
Generated JWT:
Generate your JWT by using SnowSQL:
snowsql --private-key-path rsa_key.p8 --generate-jwt \
-a <ACCOUNT_IDENTIFIER> \
-u MY_USER
Prudence
Stockez votre JWT en toute sécurité. Évitez de l’exposer dans les journaux ou les scripts.
Instructions pas à pas¶
Effectuez les étapes suivantes pour diffuser des données dans Snowflake.
Étape 1 : Variables d’environnement¶
Set up the necessary environment variables for your Snowflake account and the streaming operation. Note that the PIPE variable targets the default streaming pipe associated with your table.
# Paste the JWT token obtained from SnowSQL
export JWT_TOKEN="PASTE_YOUR_JWT_TOKEN_HERE"
# Configure your Snowflake account and resources:
export ACCOUNT="<ACCOUNT_IDENTIFIER>" # For example, ab12345
export USER="MY_USER"
export DB="MY_DATABASE"
export SCHEMA="MY_SCHEMA"
export PIPE="MY_TABLE-STREAMING"
export CHANNEL="MY_CHANNEL"
# Replace ACCOUNT with your Account URL Host to form the control plane host:
export CONTROL_HOST="${ACCOUNT}.snowflakecomputing.com"
Étape 2 : Découvrir l’hôte d’ingestion¶
Important
Si le nom de votre compte Snowflake contient des tirets bas (par exemple, MY_ACCOUNT), un problème connu peut provoquer une erreur interne lors de l’appel du service d’ingestion.
Vous devez remplacer tous les tirets bas par des tirets dans l’INGEST_HOST avant de générer le jeton restreint. Ce format converti (avec des tirets) doit être utilisé pour tous les appels d’API REST suivants, y compris la génération du jeton restreint lui-même.
Par exemple, si le nom d’hôte renvoyé est my_account.region.ingest.snowflakecomputing.com, vous devez le remplacer par my-account.region.ingest.snowflakecomputing.com pour tous les appels d’API REST suivants.
L’hôte d’ingestion est le point de terminaison pour les données en continu. Découvrez l’hôte d’ingestion à l’aide de votre JWT :
export INGEST_HOST=$(curl -sS -X GET \
-H "Authorization: Bearer $JWT_TOKEN" \
-H "X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT" \
"https://${CONTROL_HOST}/v2/streaming/hostname")
echo "Ingest Host: $INGEST_HOST"
Obtenez un jeton à portée limitée pour autoriser les opérations sur l’hôte d’ingestion :
export SCOPED_TOKEN=$(curl -sS -X POST "https://$CONTROL_HOST/oauth/token" \
-H 'Content-Type: application/x-www-form-urlencoded' \
-H "Authorization: Bearer $JWT_TOKEN" \
-d "grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&scope=${INGEST_HOST}")
echo "Scoped Token obtained for ingest host"
Étape 3 : Ouvrir le canal¶
Ouvrez un canal de streaming pour commencer l’ingestion de données :
curl -sS -X PUT \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/json" \
"https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL" \
-d '{}' | tee open_resp.json | jq .
Étape 4 : Ajouter une ligne de données¶
Ajoutez une seule ligne de données au canal ouvert.
4.1 Extraire les jetons de continuation et de décalage¶
Ces jetons sont essentiels pour maintenir l’état de votre session de streaming.
export CONT_TOKEN=$(jq -r '.next_continuation_token' open_resp.json)
export OFFSET_TOKEN=$(jq -r '.channel_status.last_committed_offset_token' open_resp.json)
export NEW_OFFSET=$((OFFSET_TOKEN + 1))
4.2 Créer un échantillon de ligne¶
Générez une ligne d’échantillon de données au format NDJSON :
export NOW_TS=$(date -u +"%Y-%m-%dT%H:%M:%SZ")
cat <<EOF > rows.ndjson
{
"id": 1,
"c1": $RANDOM,
"ts": "$NOW_TS"
}
EOF
4.3 Ajouter une ligne¶
Envoyez l’échantillon de ligne au canal de streaming :
curl -sS -X POST \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/x-ndjson" \
"https://${INGEST_HOST}/v2/streaming/data/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL/rows?continuationToken=$CONT_TOKEN&offsetToken=$NEW_OFFSET" \
--data-binary @rows.ndjson | jq .
Important
After each append operation, you must update the
continuationTokenfor the next append call. The response from the append rows call contains anext_continuation_tokenfield that you should use to make your updates.The success of the append operation confirms only that the data was received by the service, not that it is persisted to the table. Take the next step to verify persistence before querying or moving to the next batch.
4.4 Verify data persistence and committed offset by using getChannelStatus¶
Complete this critical step to ensure application reliability. Data isn’t guaranteed to be persistent until the committedOffset has advanced. To confirm that the rows that you just appended are successfully persisted, use getChannelStatus.
Vérifiez le statut actuel de votre canal de streaming :
curl -sS -X POST \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/json" \
"https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE:bulk-channel-status" \
-d "{\"channel_names\": [\"$CHANNEL\"]}" | jq ".channel_statuses.\"$CHANNEL\""
Verification check
You must ensure that the committedOffset returned in the response is greater than or equal to the offset of the rows you just appended. Only after the committedOffset advances can you be certain that the data is safely available in the table.
4.5 Query the table for persisted data¶
After you confirm that the committedOffset has advanced in the previous step (4.4), you can query to confirm that the data is ingested into your Snowflake table.
Exécutez la requête SQL suivante dans Snowflake :
SELECT * FROM MY_DATABASE.MY_SCHEMA.MY_TABLE WHERE id = 1;
(Optional) Step 5: Clean up¶
Supprimez les fichiers temporaires et les variables d’environnement non définies :
rm -f rows.ndjson open_resp.json
unset JWT_TOKEN SCOPED_TOKEN ACCOUNT USER DB SCHEMA PIPE CHANNEL CONTROL_HOST INGEST_HOST CONT_TOKEN OFFSET_TOKEN NEW_OFFSET NOW_TS
Résolution des problèmes¶
HTTP 401 (Non autorisé) : vérifiez que votre jeton JWTest valide et n’a pas expiré. Si nécessaire, régénérez-le.
HTTP 404 (Introuvable) : vérifiez soigneusement que les noms de la base de données, du schéma, du pipe et du canal sont correctement orthographiés et existent dans votre compte Snowflake.
Pas d’hôte d’ingestion : assurez-vous que l’URL de l’hôte de votre plan de contrôle est correcte et accessible.