Erste Schritte mit Snowpipe Streaming REST API: Ein cURL- und JWT-Tutorial¶
Bemerkung
Wir empfehlen Ihnen, das Feature snowpipe-streaming
SDK als primäre und standardmäßige Wahl zu verwenden. Das Feature REST API ist nicht für Szenarios mit hohem Durchsatz optimiert.
Diese Anleitung zeigt Ihnen, wie Sie Daten mithilfe von Snowpipe Streaming REST API und einem JSON Web Token (JWT), das mit SnowSQL generiert wurde, in Snowflake streamen können.
Voraussetzungen¶
Bevor Sie beginnen, vergewissern Sie sich, dass Sie die folgenden Elemente haben:
Snowflake-Benutzer und -Objekte:
Ein Snowflake-Benutzer, der für die Authentifizierung mit Schlüsselpaaren konfiguriert ist. Registrieren Sie Ihren öffentlichen Schlüssel mithilfe des folgenden SQL-Befehls:
ALTER USER MY_USER SET RSA_PUBLIC_KEY='<your-public-key>';
Eine Snowflake-Datenbank, ein Snowflake-Schema und ein PIPE-Objekt für die Streaming-Datenaufnahme. Sie können sie mit den folgenden SQL-Befehlen erstellen (ersetzen Sie Platzhalter wie MY_DATABASE
, MY_SCHEMA
, MY_PIPE
, MY_TABLE
mit den gewünschten Namen):
-- Create Database and Schema
CREATE OR REPLACE DATABASE MY_DATABASE;
CREATE OR REPLACE SCHEMA MY_SCHEMA;
-- Create Target Table
CREATE OR REPLACE TABLE MY_TABLE (
data VARIANT,
c1 NUMBER,
c2 STRING
);
-- Create PIPE Object for Streaming Ingestion
CREATE OR REPLACE PIPE MY_PIPE
AS COPY INTO MY_TABLE FROM (SELECT $1, $1:c1, $1:ts FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
Installierte Tools:
curl
: Für die Erstellung von HTTP-Anforderungenjq
: Zum Parsen von JSON-Antworten.SnowSQL
: Zum Ausführen von Befehlen, Befehlszeilenclient von Snowflake.
Generierte JWT: Generieren Sie Ihre JWT durch Verwendung von SnowSQL:
snowsql --private-key-path rsa_key.p8 --generate-jwt \
-a <ACCOUNT_LOCATOR> \
-u MY_USER
Vorsicht
Speichern Sie Ihre JWT sicher. Vermeiden Sie es, diese in Protokollen oder Skripten offenzulegen.
Schritt-für-Schritt-Anleitungen¶
Führen Sie die folgenden Schritte aus, um Daten in Snowflake zu streamen.
Schritt 1: Umgebungsvariablen festlegen¶
Legen Sie die erforderlichen Umgebungsvariablen für Ihr Snowflake-Konto und den Streaming-Vorgang fest:
# Paste the JWT token obtained from SnowSQL
export JWT_TOKEN="PASTE_YOUR_JWT_TOKEN_HERE"
# Configure your Snowflake account and resources:
export ACCOUNT="<ACCOUNT_LOCATOR>" # For example, ab12345
export USER="MY_USER"
export DB="MY_DATABASE"
export SCHEMA="MY_SCHEMA"
export PIPE="MY_PIPE"
export CHANNEL="MY_CHANNEL"
# Replace ACCOUNT with your Account URL Host to form the control plane host:
export CONTROL_HOST="${ACCOUNT}.snowflakecomputing.com"
Schritt 2: Ingest-Host ermitteln¶
Der Ingest-Host ist der Endpunkt für das Streaming von Daten. Ermitteln Sie den Ingest-Host mithilfe Ihrer JWT:
export INGEST_HOST=$(curl -sS -X GET \
-H "Authorization: Bearer $JWT_TOKEN" \
-H "X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT" \
"https://${CONTROL_HOST}/v2/streaming/hostname")
echo "Ingest Host: $INGEST_HOST"
Besorgen Sie sich ein bereichsbezogenes Token, um Vorgänge auf dem Ingest-Host zu autorisieren:
export SCOPED_TOKEN=$(curl -sS -X POST "https://$CONTROL_HOST/oauth/token" \
-H 'Content-Type: application/x-www-form-urlencoded' \
-H "Authorization: Bearer $JWT_TOKEN" \
-d "grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&scope=${INGEST_HOST}")
echo "Scoped Token obtained for ingest host"
Schritt 3: Kanal öffnen¶
Öffnen Sie einen Streaming-Kanal, um mit der Datenerfassung zu beginnen:
curl -sS -X PUT \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/json" \
"https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL" \
-d '{}' | tee open_resp.json | jq .
Schritt 4: Datenzeile anhängen¶
Hängen Sie eine einzelne Datenzeile an den offenen Kanal an.
4.1 Fortsetzungs- und Offset-Tokens extrahieren¶
Diese Token sind entscheidend für die Aufrechterhaltung des Status Ihrer Streaming-Sitzung.
export CONT_TOKEN=$(jq -r '.next_continuation_token' open_resp.json)
export OFFSET_TOKEN=$(jq -r '.channel_status.last_committed_offset_token' open_resp.json)
export NEW_OFFSET=$((OFFSET_TOKEN + 1))
4.2 Beispielzeile erstellen¶
Generieren Sie eine Beispieldatenzeile im NDJSON-Format:
export NOW_TS=$(date -u +"%Y-%m-%dT%H:%M:%SZ")
cat <<EOF > rows.ndjson
{
"id": 1,
"c1": $RANDOM,
"ts": "$NOW_TS"
}
EOF
4.3 Zeile anhängen¶
Senden Sie die Beispielzeile an den Streaming-Kanal:
curl -sS -X POST \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/x-ndjson" \
"https://${INGEST_HOST}/v2/streaming/data/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL/rows?continuationToken=$CONT_TOKEN&offsetToken=$NEW_OFFSET" \
--data-binary @rows.ndjson | jq .
Wichtig
Nach jedem Anfügevorgang müssen Sie das Fortsetzungstoken für den nächsten Anfügeaufruf aktualisieren. Die Antwort auf den Aufruf zum Anhängen von Zeilen enthält ein next_continuation_token
-Feld, das Sie für Ihre Aktualisierungen verwenden sollten.
4.4 Datenerfassung überprüfen¶
Stellen Sie sicher, dass die Daten in Ihre Snowflake-Tabelle aufgenommen wurden.
Führen Sie die folgende SQL-Abfrage in Snowflake aus:
SELECT * FROM MY_DATABASE.MY_SCHEMA.MY_TABLE WHERE id = 1;
Schritt 5: Kanalstatus abrufen¶
Prüfen Sie den aktuellen Status Ihres Streaming-Kanals:
curl -sS -X POST \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/json" \
"https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE:bulk-channel-status" \
-d "{\"channel_names\": [\"$CHANNEL\"]}" | jq ".channel_statuses.\"$CHANNEL\""
(Optional) Schritt 6: Bereinigen¶
Entfernen Sie temporäre Dateien und nicht festgelegte Umgebungsvariablen:
rm -f rows.ndjson open_resp.json
unset JWT_TOKEN SCOPED_TOKEN ACCOUNT USER DB SCHEMA PIPE CHANNEL CONTROL_HOST INGEST_HOST CONT_TOKEN OFFSET_TOKEN NEW_OFFSET NOW_TS
Problembehandlung¶
HTTP 401 (Nicht autorisiert): Überprüfen Sie, ob Ihr JWT-Token gültig und nicht abgelaufen ist. Generieren Sie es bei Bedarf neu.
HTTP 404 (Nicht gefunden): Überprüfen Sie noch einmal, ob die Namen von Datenbank, Schema, Pipe und Kanal korrekt geschrieben sind und in Ihrem Snowflake-Konto vorhanden sind.
Kein Ingest-Host: Stellen Sie sicher, dass die Host-URL der Kontrollebene korrekt und zugänglich ist.