Premiers pas avec l’API REST Snowpipe Streaming : Un tutoriel cURL et JWT¶
Note
Nous vous recommandons de commencer par le SDK Snowpipe Streaming sur l’API REST pour bénéficier de l’amélioration des performances et de l’expérience de démarrage.
Ce guide vous montre comment diffuser des données dans Snowflake à l’aide de l’API REST Snowpipe Streaming et d’un jeton web JSON (JWT) généré avec SnowSQL.
Conditions préalables¶
Avant de commencer, assurez-vous de disposer des éléments suivants :
Utilisateur et objets Snowflake :
Utilisateur Snowflake configuré pour l’authentification par paire de clés. Enregistrez votre clé publique en utilisant la commande SQLsuivante :
ALTER USER MY_USER SET RSA_PUBLIC_KEY='<your-public-key>';
Une base de données, un schéma et une table cible pour l’ingestion de flux. Vous pouvez les créer en utilisant les commandes SQL suivantes et des caractères de remplacement comme MY_DATABASE, MY_SCHEMA, MY_TABLE avec les noms que vous voulez :
-- Create Database and Schema
CREATE OR REPLACE DATABASE MY_DATABASE;
CREATE OR REPLACE SCHEMA MY_SCHEMA;
-- Create Target Table
CREATE OR REPLACE TABLE MY_TABLE (
id NUMBER,
c1 NUMBER,
ts STRING
);
ACCOUNT_IDENTIFIER :
Nous conseillons d’utiliser le format 1 pour l’ACCOUNT_IDENTIFIER, qui utilise le nom d’utilisateur au sein de votre organisation ; par exemple myorg-account123. Pour plus d’informations sur le format, voir Identificateurs de compte.
Outils installés :
curl: Pour effectuer des requêtes HTTP.jq: Pour effectuer des réponses JSON.SnowSQL: Pour l’exécution des commandes, le client de ligne de commande de Snowflake.
JWT généré :
Générer votre JWT à l’aide de SnowSQL :
snowsql --private-key-path rsa_key.p8 --generate-jwt \
-a <ACCOUNT_IDENTIFIER> \
-u MY_USER
Prudence
Stockez votre JWT en toute sécurité. Évitez de l’exposer dans les journaux ou les scripts.
Instructions pas à pas¶
Effectuez les étapes suivantes pour diffuser des données dans Snowflake.
Étape 1 : Variables d’environnement¶
Configurez les variables d’environnement nécessaires pour votre compte Snowflake et l’opération de streaming. Notez que la variable PIPE cible le canal de streaming par défaut associé à votre table.
# Paste the JWT token obtained from SnowSQL
export JWT_TOKEN="PASTE_YOUR_JWT_TOKEN_HERE"
# Configure your Snowflake account and resources:
export ACCOUNT="<ACCOUNT_IDENTIFIER>" # For example, ab12345
export USER="MY_USER"
export DB="MY_DATABASE"
export SCHEMA="MY_SCHEMA"
export PIPE="MY_TABLE-STREAMING"
export CHANNEL="MY_CHANNEL"
# Replace ACCOUNT with your Account URL Host to form the control plane host:
export CONTROL_HOST="${ACCOUNT}.snowflakecomputing.com"
Étape 2 : Découvrir l’hôte d’ingestion¶
Important
Si le nom de votre compte Snowflake contient des tirets bas (par exemple, MY_ACCOUNT), un problème connu peut provoquer une erreur interne lors de l’appel du service d’ingestion.
Vous devez remplacer tous les tirets bas par des tirets dans l’INGEST_HOST avant de générer le jeton restreint. Ce format converti (avec des tirets) doit être utilisé pour tous les appels d’API REST suivants, y compris la génération du jeton restreint lui-même.
Par exemple, si le nom d’hôte renvoyé est my_account.region.ingest.snowflakecomputing.com, vous devez le remplacer par my-account.region.ingest.snowflakecomputing.com pour tous les appels d’API REST suivants.
L’hôte d’ingestion est le point de terminaison pour les données en continu. Découvrez l’hôte d’ingestion à l’aide de votre JWT :
export INGEST_HOST=$(curl -sS -X GET \
-H "Authorization: Bearer $JWT_TOKEN" \
-H "X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT" \
"https://${CONTROL_HOST}/v2/streaming/hostname")
echo "Ingest Host: $INGEST_HOST"
Obtenez un jeton à portée limitée pour autoriser les opérations sur l’hôte d’ingestion :
export SCOPED_TOKEN=$(curl -sS -X POST "https://$CONTROL_HOST/oauth/token" \
-H 'Content-Type: application/x-www-form-urlencoded' \
-H "Authorization: Bearer $JWT_TOKEN" \
-d "grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&scope=${INGEST_HOST}")
echo "Scoped Token obtained for ingest host"
Étape 3 : Ouvrir le canal¶
Ouvrez un canal de streaming pour commencer l’ingestion de données :
curl -sS -X PUT \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/json" \
"https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL" \
-d '{}' | tee open_resp.json | jq .
Étape 4 : Ajouter une ligne de données¶
Ajoutez une seule ligne de données au canal ouvert.
4.1 Extraire les jetons de continuation et de décalage¶
Ces jetons sont essentiels pour maintenir l’état de votre session de streaming.
export CONT_TOKEN=$(jq -r '.next_continuation_token' open_resp.json)
export OFFSET_TOKEN=$(jq -r '.channel_status.last_committed_offset_token' open_resp.json)
export NEW_OFFSET=$((OFFSET_TOKEN + 1))
4.2 Créer un échantillon de ligne¶
Générez une ligne d’échantillon de données au format NDJSON :
export NOW_TS=$(date -u +"%Y-%m-%dT%H:%M:%SZ")
cat <<EOF > rows.ndjson
{
"id": 1,
"c1": $RANDOM,
"ts": "$NOW_TS"
}
EOF
4.3 Ajouter une ligne¶
Envoyez l’échantillon de ligne au canal de streaming :
curl -sS -X POST \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/x-ndjson" \
"https://${INGEST_HOST}/v2/streaming/data/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL/rows?continuationToken=$CONT_TOKEN&offsetToken=$NEW_OFFSET" \
--data-binary @rows.ndjson | jq .
Important
Après chaque opération d’ajout, vous devez mettre à jour le
continuationTokenpour le prochain appel d’ajout. La réponse de l’appel append rows contient un champnext_continuation_tokenque vous devez utiliser pour effectuer vos mises à jour.La réussite de l’opération d’ajout confirme uniquement que les données ont été reçues par le service, et non qu’elles sont conservées dans la table. Passez à l’étape suivante pour vérifier la persistance avant de lancer la requête ou de passer au lot suivant.
4.4 Vérifier la persistance des données et le décalage engagé en utilisant getChannelStatus¶
Effectuez cette étape essentielle pour garantir la fiabilité de l’application. La persistance des données n’est pas garantie jusqu’à l’avancement du committedOffset. Pour confirmer que les lignes que vous venez d’ajouter sont conservées, utilisez getChannelStatus.
Vérifiez le statut actuel de votre canal de streaming :
curl -sS -X POST \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/json" \
"https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE:bulk-channel-status" \
-d "{\"channel_names\": [\"$CHANNEL\"]}" | jq ".channel_statuses.\"$CHANNEL\""
Contrôle de vérification
Vous devez vous assurer que la valeur committedOffset renvoyée dans la réponse est supérieure ou égale au décalage des lignes que vous venez d’ajouter. C’est uniquement après l’avancement du committedOffset que vous pouvez vous assurer que les données sont disponibles en toute sécurité dans la table.
4.5 Interroger la table pour les données persistantes¶
Après avoir confirmé l’avancement du committedOffset dans l’étape précédente (4.4), vous pouvez effectuer une requête pour confirmer que les données sont ingérées dans votre table Snowflake.
Exécutez la requête SQL suivante dans Snowflake :
SELECT * FROM MY_DATABASE.MY_SCHEMA.MY_TABLE WHERE id = 1;
(Facultatif) Étape 5 : Nettoyage¶
Supprimez les fichiers temporaires et les variables d’environnement non définies :
rm -f rows.ndjson open_resp.json
unset JWT_TOKEN SCOPED_TOKEN ACCOUNT USER DB SCHEMA PIPE CHANNEL CONTROL_HOST INGEST_HOST CONT_TOKEN OFFSET_TOKEN NEW_OFFSET NOW_TS
Résolution des problèmes¶
HTTP 401 (Non autorisé) : vérifiez que votre jeton JWTest valide et n’a pas expiré. Si nécessaire, régénérez-le.
HTTP 404 (Introuvable) : vérifiez soigneusement que les noms de la base de données, du schéma, du pipe et du canal sont correctement orthographiés et existent dans votre compte Snowflake.
Pas d’hôte d’ingestion : assurez-vous que l’URL de l’hôte de votre plan de contrôle est correcte et accessible.