Premiers pas avec l’API REST Snowpipe Streaming : Un tutoriel cURL et JWT

Note

Nous vous recommandons de commencer par le SDK Snowpipe Streaming sur l’API REST pour bénéficier de l’amélioration des performances et de l’expérience de démarrage.

Ce guide vous montre comment diffuser des données dans Snowflake à l’aide de l’API REST Snowpipe Streaming et d’un jeton web JSON (JWT) généré avec SnowSQL.

Conditions préalables

Avant de commencer, assurez-vous de disposer des éléments suivants :

Utilisateur et objets Snowflake :

Utilisateur Snowflake configuré pour l’authentification par paire de clés. Enregistrez votre clé publique en utilisant la commande SQLsuivante :

ALTER USER MY_USER SET RSA_PUBLIC_KEY='<your-public-key>';
Copy

A Snowflake database, schema, and a target table for streaming ingestion. You can create them by using the following SQL commands and replacing placeholders like MY_DATABASE, MY_SCHEMA, MY_TABLE with the names that you want:

-- Create Database and Schema
CREATE OR REPLACE DATABASE MY_DATABASE;
CREATE OR REPLACE SCHEMA MY_SCHEMA;

-- Create Target Table
CREATE OR REPLACE TABLE MY_TABLE (
    id NUMBER,
    c1 NUMBER,
    ts STRING
);
Copy

ACCOUNT_IDENTIFIER :

We suggest that you use Format 1 for the ACCOUNT_IDENTIFIER, which uses the account name within your organization; for example, myorg-account123. For more information on the format, see Identificateurs de compte.

Installed tools:

  • curl : Pour effectuer des requêtes HTTP.

  • jq : Pour effectuer des réponses JSON.

  • SnowSQL : Pour l’exécution des commandes, le client de ligne de commande de Snowflake.

Generated JWT:

Generate your JWT by using SnowSQL:

snowsql --private-key-path rsa_key.p8 --generate-jwt \
  -a <ACCOUNT_IDENTIFIER> \
  -u MY_USER
Copy

Prudence

Stockez votre JWT en toute sécurité. Évitez de l’exposer dans les journaux ou les scripts.

Instructions pas à pas

Effectuez les étapes suivantes pour diffuser des données dans Snowflake.

Étape 1 : Variables d’environnement

Set up the necessary environment variables for your Snowflake account and the streaming operation. Note that the PIPE variable targets the default streaming pipe associated with your table.

# Paste the JWT token obtained from SnowSQL
export JWT_TOKEN="PASTE_YOUR_JWT_TOKEN_HERE"

# Configure your Snowflake account and resources:
export ACCOUNT="<ACCOUNT_IDENTIFIER>" # For example, ab12345
export USER="MY_USER"
export DB="MY_DATABASE"
export SCHEMA="MY_SCHEMA"
export PIPE="MY_TABLE-STREAMING"
export CHANNEL="MY_CHANNEL"

# Replace ACCOUNT with your Account URL Host to form the control plane host:
export CONTROL_HOST="${ACCOUNT}.snowflakecomputing.com"
Copy

Étape 2 : Découvrir l’hôte d’ingestion

Important

Si le nom de votre compte Snowflake contient des tirets bas (par exemple, MY_ACCOUNT), un problème connu peut provoquer une erreur interne lors de l’appel du service d’ingestion.

Vous devez remplacer tous les tirets bas par des tirets dans l’INGEST_HOST avant de générer le jeton restreint. Ce format converti (avec des tirets) doit être utilisé pour tous les appels d’API REST suivants, y compris la génération du jeton restreint lui-même.

Par exemple, si le nom d’hôte renvoyé est my_account.region.ingest.snowflakecomputing.com, vous devez le remplacer par my-account.region.ingest.snowflakecomputing.com pour tous les appels d’API REST suivants.

L’hôte d’ingestion est le point de terminaison pour les données en continu. Découvrez l’hôte d’ingestion à l’aide de votre JWT :

export INGEST_HOST=$(curl -sS -X GET \
  -H "Authorization: Bearer $JWT_TOKEN" \
  -H "X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT" \
  "https://${CONTROL_HOST}/v2/streaming/hostname")

echo "Ingest Host: $INGEST_HOST"
Copy

Obtenez un jeton à portée limitée pour autoriser les opérations sur l’hôte d’ingestion :

export SCOPED_TOKEN=$(curl -sS -X POST "https://$CONTROL_HOST/oauth/token" \
  -H 'Content-Type: application/x-www-form-urlencoded' \
  -H "Authorization: Bearer $JWT_TOKEN" \
  -d "grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&scope=${INGEST_HOST}")

echo "Scoped Token obtained for ingest host"
Copy

Étape 3 : Ouvrir le canal

Ouvrez un canal de streaming pour commencer l’ingestion de données :

curl -sS -X PUT \
  -H "Authorization: Bearer $SCOPED_TOKEN" \
  -H "Content-Type: application/json" \
  "https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL" \
  -d '{}' | tee open_resp.json | jq .
Copy

Étape 4 : Ajouter une ligne de données

Ajoutez une seule ligne de données au canal ouvert.

4.1 Extraire les jetons de continuation et de décalage

Ces jetons sont essentiels pour maintenir l’état de votre session de streaming.

export CONT_TOKEN=$(jq -r '.next_continuation_token' open_resp.json)
export OFFSET_TOKEN=$(jq -r '.channel_status.last_committed_offset_token' open_resp.json)
export NEW_OFFSET=$((OFFSET_TOKEN + 1))
Copy

4.2 Créer un échantillon de ligne

Générez une ligne d’échantillon de données au format NDJSON :

export NOW_TS=$(date -u +"%Y-%m-%dT%H:%M:%SZ")

cat <<EOF > rows.ndjson
{
  "id": 1,
  "c1": $RANDOM,
  "ts": "$NOW_TS"
}
EOF
Copy

4.3 Ajouter une ligne

Envoyez l’échantillon de ligne au canal de streaming :

curl -sS -X POST \
  -H "Authorization: Bearer $SCOPED_TOKEN" \
  -H "Content-Type: application/x-ndjson" \
  "https://${INGEST_HOST}/v2/streaming/data/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL/rows?continuationToken=$CONT_TOKEN&offsetToken=$NEW_OFFSET" \
  --data-binary @rows.ndjson | jq .
Copy

Important

  • After each append operation, you must update the continuationToken for the next append call. The response from the append rows call contains a next_continuation_token field that you should use to make your updates.

  • La réussite de l’opération d’ajout confirme uniquement que les données ont été reçues par le service, et non qu’elles sont conservées dans la table. Passez à l’étape suivante pour vérifier la persistance avant de lancer la requête ou de passer au lot suivant.

4.4 Vérifier la persistance des données et le décalage engagé en utilisant getChannelStatus

Effectuez cette étape essentielle pour garantir la fiabilité de l’application. La persistance des données n’est pas garantie jusqu’à l’avancement du committedOffset. Pour confirmer que les lignes que vous venez d’ajouter sont conservées, utilisez getChannelStatus.

Vérifiez le statut actuel de votre canal de streaming :

curl -sS -X POST \
  -H "Authorization: Bearer $SCOPED_TOKEN" \
  -H "Content-Type: application/json" \
  "https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE:bulk-channel-status" \
  -d "{\"channel_names\": [\"$CHANNEL\"]}" | jq ".channel_statuses.\"$CHANNEL\""
Copy

Contrôle de vérification

Vous devez vous assurer que la valeur committedOffset renvoyée dans la réponse est supérieure ou égale au décalage des lignes que vous venez d’ajouter. C’est uniquement après l’avancement du committedOffset que vous pouvez vous assurer que les données sont disponibles en toute sécurité dans la table.

4.5 Interroger la table pour les données persistantes

Après avoir confirmé l’avancement du committedOffset dans l’étape précédente (4.4), vous pouvez effectuer une requête pour confirmer que les données sont ingérées dans votre table Snowflake.

Exécutez la requête SQL suivante dans Snowflake :

SELECT * FROM MY_DATABASE.MY_SCHEMA.MY_TABLE WHERE id = 1;
Copy

(Optional) Step 5: Clean up

Supprimez les fichiers temporaires et les variables d’environnement non définies :

rm -f rows.ndjson open_resp.json
unset JWT_TOKEN SCOPED_TOKEN ACCOUNT USER DB SCHEMA PIPE CHANNEL CONTROL_HOST INGEST_HOST CONT_TOKEN OFFSET_TOKEN NEW_OFFSET NOW_TS
Copy

Résolution des problèmes

  • HTTP 401 (Non autorisé) : vérifiez que votre jeton JWTest valide et n’a pas expiré. Si nécessaire, régénérez-le.

  • HTTP 404 (Introuvable) : vérifiez soigneusement que les noms de la base de données, du schéma, du pipe et du canal sont correctement orthographiés et existent dans votre compte Snowflake.

  • Pas d’hôte d’ingestion : assurez-vous que l’URL de l’hôte de votre plan de contrôle est correcte et accessible.