Premiers pas avec l’API REST Snowpipe Streaming : Un tutoriel cURL et JWT¶
Note
Nous vous recommandons d’utiliser le SDK snowpipe-streaming
comme choix principal et par défaut. L API REST n’est pas optimisée pour les scénarios de débit élevé.
Ce guide vous montre comment diffuser des données dans Snowflake à l’aide de l’API REST Snowpipe Streaming et d’un jeton web JSON (JWT) généré avec SnowSQL.
Conditions préalables¶
Avant de commencer, assurez-vous de disposer des éléments suivants :
Utilisateur et objets Snowflake :
Utilisateur Snowflake configuré pour l’authentification par paire de clés. Enregistrez votre clé publique en utilisant la commande SQLsuivante :
ALTER USER MY_USER SET RSA_PUBLIC_KEY='<your-public-key>';
Une base de données, un schéma et un PIPE objet pour l’ingestion de flux. Vous pouvez les créer en utilisant les commandes SQL suivantes (remplacez les caractères de remplacement comme MY_DATABASE
, MY_SCHEMA
, MY_PIPE
, MY_TABLE
avec les noms souhaités) :
-- Create Database and Schema
CREATE OR REPLACE DATABASE MY_DATABASE;
CREATE OR REPLACE SCHEMA MY_SCHEMA;
-- Create Target Table
CREATE OR REPLACE TABLE MY_TABLE (
data VARIANT,
c1 NUMBER,
c2 STRING
);
-- Create PIPE Object for Streaming Ingestion
CREATE OR REPLACE PIPE MY_PIPE
AS COPY INTO MY_TABLE FROM (SELECT $1, $1:c1, $1:ts FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
Outils installés :
curl
: Pour effectuer des requêtes HTTP.jq
: Pour effectuer des réponses JSON.SnowSQL
: Pour l’exécution des commandes, le client de ligne de commande de Snowflake.
Généré un JWT : générez votre JWT à l’aide de SnowSQL :
snowsql --private-key-path rsa_key.p8 --generate-jwt \
-a <ACCOUNT_LOCATOR> \
-u MY_USER
Prudence
Stockez votre JWT en toute sécurité. Évitez de l’exposer dans les journaux ou les scripts.
Instructions pas à pas¶
Effectuez les étapes suivantes pour diffuser des données dans Snowflake.
Étape 1 : Variables d’environnement¶
Configurez les variables d’environnement nécessaires pour votre compte Snowflake et l’opération de streaming :
# Paste the JWT token obtained from SnowSQL
export JWT_TOKEN="PASTE_YOUR_JWT_TOKEN_HERE"
# Configure your Snowflake account and resources:
export ACCOUNT="<ACCOUNT_LOCATOR>" # For example, ab12345
export USER="MY_USER"
export DB="MY_DATABASE"
export SCHEMA="MY_SCHEMA"
export PIPE="MY_PIPE"
export CHANNEL="MY_CHANNEL"
# Replace ACCOUNT with your Account URL Host to form the control plane host:
export CONTROL_HOST="${ACCOUNT}.snowflakecomputing.com"
Étape 2 : Découvrir l’hôte d’ingestion¶
L’hôte d’ingestion est le point de terminaison pour les données en continu. Découvrez l’hôte d’ingestion à l’aide de votre JWT :
export INGEST_HOST=$(curl -sS -X GET \
-H "Authorization: Bearer $JWT_TOKEN" \
-H "X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT" \
"https://${CONTROL_HOST}/v2/streaming/hostname")
echo "Ingest Host: $INGEST_HOST"
Obtenez un jeton à portée limitée pour autoriser les opérations sur l’hôte d’ingestion :
export SCOPED_TOKEN=$(curl -sS -X POST "https://$CONTROL_HOST/oauth/token" \
-H 'Content-Type: application/x-www-form-urlencoded' \
-H "Authorization: Bearer $JWT_TOKEN" \
-d "grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&scope=${INGEST_HOST}")
echo "Scoped Token obtained for ingest host"
Étape 3 : Ouvrir le canal¶
Ouvrez un canal de streaming pour commencer l’ingestion de données :
curl -sS -X PUT \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/json" \
"https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL" \
-d '{}' | tee open_resp.json | jq .
Étape 4 : Ajouter une ligne de données¶
Ajoutez une seule ligne de données au canal ouvert.
4.1 Extraire les jetons de continuation et de décalage¶
Ces jetons sont essentiels pour maintenir l’état de votre session de streaming.
export CONT_TOKEN=$(jq -r '.next_continuation_token' open_resp.json)
export OFFSET_TOKEN=$(jq -r '.channel_status.last_committed_offset_token' open_resp.json)
export NEW_OFFSET=$((OFFSET_TOKEN + 1))
4.2 Créer un échantillon de ligne¶
Générez une ligne d’échantillon de données au format NDJSON :
export NOW_TS=$(date -u +"%Y-%m-%dT%H:%M:%SZ")
cat <<EOF > rows.ndjson
{
"id": 1,
"c1": $RANDOM,
"ts": "$NOW_TS"
}
EOF
4.3 Ajouter une ligne¶
Envoyez l’échantillon de ligne au canal de streaming :
curl -sS -X POST \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/x-ndjson" \
"https://${INGEST_HOST}/v2/streaming/data/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL/rows?continuationToken=$CONT_TOKEN&offsetToken=$NEW_OFFSET" \
--data-binary @rows.ndjson | jq .
Important
Après chaque opération d’ajout, vous devez mettre à jour le continuationToken pour le prochain appel d’ajout. La réponse de l’appel append rows contient un champ next_continuation_token
que vous devez utiliser pour effectuer vos mises à jour.
4.4 Vérifier l’ingestion des données¶
Confirmez que les données ont été ingérées dans votre table Snowflake.
Exécutez la requête SQL suivante dans Snowflake :
SELECT * FROM MY_DATABASE.MY_SCHEMA.MY_TABLE WHERE id = 1;
Étape 5 : Obtenir l’état du canal¶
Vérifiez le statut actuel de votre canal de streaming :
curl -sS -X POST \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/json" \
"https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE:bulk-channel-status" \
-d "{\"channel_names\": [\"$CHANNEL\"]}" | jq ".channel_statuses.\"$CHANNEL\""
(Facultatif) Étape 6 : Nettoyage¶
Supprimez les fichiers temporaires et les variables d’environnement non définies :
rm -f rows.ndjson open_resp.json
unset JWT_TOKEN SCOPED_TOKEN ACCOUNT USER DB SCHEMA PIPE CHANNEL CONTROL_HOST INGEST_HOST CONT_TOKEN OFFSET_TOKEN NEW_OFFSET NOW_TS
Résolution des problèmes¶
HTTP 401 (Non autorisé) : vérifiez que votre jeton JWTest valide et n’a pas expiré. Si nécessaire, régénérez-le.
HTTP 404 (Introuvable) : vérifiez soigneusement que les noms de la base de données, du schéma, du pipe et du canal sont correctement orthographiés et existent dans votre compte Snowflake.
Pas d’hôte d’ingestion : assurez-vous que l’URL de l’hôte de votre plan de contrôle est correcte et accessible.