Erste Schritte mit Snowpipe Streaming REST API: Ein cURL- und JWT-Tutorial

Bemerkung

Wir empfehlen, mit dem Snowpipe Streaming-SDK über die REST API zu beginnen, um von der verbesserten Leistung und dem einfacheren Einstieg zu profitieren.

Diese Anleitung zeigt Ihnen, wie Sie Daten mithilfe von Snowpipe Streaming REST API und einem JSON Web Token (JWT), das mit SnowSQL generiert wurde, in Snowflake streamen können.

Voraussetzungen

Bevor Sie beginnen, vergewissern Sie sich, dass Sie die folgenden Elemente haben:

Snowflake-Benutzer und -Objekte:

Ein Snowflake-Benutzer, der für die Authentifizierung mit Schlüsselpaaren konfiguriert ist. Registrieren Sie Ihren öffentlichen Schlüssel mithilfe des folgenden SQL-Befehls:

ALTER USER MY_USER SET RSA_PUBLIC_KEY='<your-public-key>';
Copy

A Snowflake database, schema, and a target table for streaming ingestion. You can create them by using the following SQL commands and replacing placeholders like MY_DATABASE, MY_SCHEMA, MY_TABLE with the names that you want:

-- Create Database and Schema
CREATE OR REPLACE DATABASE MY_DATABASE;
CREATE OR REPLACE SCHEMA MY_SCHEMA;

-- Create Target Table
CREATE OR REPLACE TABLE MY_TABLE (
    id NUMBER,
    c1 NUMBER,
    ts STRING
);
Copy

ACCOUNT_IDENTIFIER:

We suggest that you use Format 1 for the ACCOUNT_IDENTIFIER, which uses the account name within your organization; for example, myorg-account123. For more information on the format, see Kontobezeichner.

Installed tools:

  • curl: Für die Erstellung von HTTP-Anforderungen

  • jq: Zum Parsen von JSON-Antworten.

  • SnowSQL: Zum Ausführen von Befehlen, Befehlszeilenclient von Snowflake.

Generated JWT:

Generate your JWT by using SnowSQL:

snowsql --private-key-path rsa_key.p8 --generate-jwt \
  -a <ACCOUNT_IDENTIFIER> \
  -u MY_USER
Copy

Vorsicht

Speichern Sie Ihre JWT sicher. Vermeiden Sie es, diese in Protokollen oder Skripten offenzulegen.

Schritt-für-Schritt-Anleitungen

Führen Sie die folgenden Schritte aus, um Daten in Snowflake zu streamen.

Schritt 1: Umgebungsvariablen festlegen

Set up the necessary environment variables for your Snowflake account and the streaming operation. Note that the PIPE variable targets the default streaming pipe associated with your table.

# Paste the JWT token obtained from SnowSQL
export JWT_TOKEN="PASTE_YOUR_JWT_TOKEN_HERE"

# Configure your Snowflake account and resources:
export ACCOUNT="<ACCOUNT_IDENTIFIER>" # For example, ab12345
export USER="MY_USER"
export DB="MY_DATABASE"
export SCHEMA="MY_SCHEMA"
export PIPE="MY_TABLE-STREAMING"
export CHANNEL="MY_CHANNEL"

# Replace ACCOUNT with your Account URL Host to form the control plane host:
export CONTROL_HOST="${ACCOUNT}.snowflakecomputing.com"
Copy

Schritt 2: Ingest-Host ermitteln

Wichtig

Wenn Ihr Snowflake-Kontoname Unterstriche enthält (z. B. MY_ACCOUNT), kann ein bekanntes Problem beim Aufruf des Datenaufnahmeservice zu einem internen Fehler führen.

Sie müssen in INGEST_HOST alle Unterstriche durch Bindestriche ersetzen, bevor das bereichsbezogene Token erzeugt wird. Dieses konvertierte Format (mit Bindestrichen) muss für alle nachfolgenden REST API-Aufrufe verwendet werden, einschließlich der Generierung des bereichsbezogenen Tokens selbst.

Wenn der zurückgegebene Hostname zum Beispiel my_account.region.ingest.snowflakecomputing.com`lautet, müssen Sie den Wert in :code:`my-account.region.ingest.snowflakecomputing.com für alle nachfolgenden REST API-Aufrufe ändern.

Der Ingest-Host ist der Endpunkt für das Streaming von Daten. Ermitteln Sie den Ingest-Host mithilfe Ihrer JWT:

export INGEST_HOST=$(curl -sS -X GET \
  -H "Authorization: Bearer $JWT_TOKEN" \
  -H "X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT" \
  "https://${CONTROL_HOST}/v2/streaming/hostname")

echo "Ingest Host: $INGEST_HOST"
Copy

Besorgen Sie sich ein bereichsbezogenes Token, um Vorgänge auf dem Ingest-Host zu autorisieren:

export SCOPED_TOKEN=$(curl -sS -X POST "https://$CONTROL_HOST/oauth/token" \
  -H 'Content-Type: application/x-www-form-urlencoded' \
  -H "Authorization: Bearer $JWT_TOKEN" \
  -d "grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&scope=${INGEST_HOST}")

echo "Scoped Token obtained for ingest host"
Copy

Schritt 3: Kanal öffnen

Öffnen Sie einen Streaming-Kanal, um mit der Datenerfassung zu beginnen:

curl -sS -X PUT \
  -H "Authorization: Bearer $SCOPED_TOKEN" \
  -H "Content-Type: application/json" \
  "https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL" \
  -d '{}' | tee open_resp.json | jq .
Copy

Schritt 4: Datenzeile anhängen

Hängen Sie eine einzelne Datenzeile an den offenen Kanal an.

4.1 Fortsetzungs- und Offset-Tokens extrahieren

Diese Token sind entscheidend für die Aufrechterhaltung des Status Ihrer Streaming-Sitzung.

export CONT_TOKEN=$(jq -r '.next_continuation_token' open_resp.json)
export OFFSET_TOKEN=$(jq -r '.channel_status.last_committed_offset_token' open_resp.json)
export NEW_OFFSET=$((OFFSET_TOKEN + 1))
Copy

4.2 Beispielzeile erstellen

Generieren Sie eine Beispieldatenzeile im NDJSON-Format:

export NOW_TS=$(date -u +"%Y-%m-%dT%H:%M:%SZ")

cat <<EOF > rows.ndjson
{
  "id": 1,
  "c1": $RANDOM,
  "ts": "$NOW_TS"
}
EOF
Copy

4.3 Zeile anhängen

Senden Sie die Beispielzeile an den Streaming-Kanal:

curl -sS -X POST \
  -H "Authorization: Bearer $SCOPED_TOKEN" \
  -H "Content-Type: application/x-ndjson" \
  "https://${INGEST_HOST}/v2/streaming/data/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL/rows?continuationToken=$CONT_TOKEN&offsetToken=$NEW_OFFSET" \
  --data-binary @rows.ndjson | jq .
Copy

Wichtig

  • After each append operation, you must update the continuationToken for the next append call. The response from the append rows call contains a next_continuation_token field that you should use to make your updates.

  • Der Erfolg der Anfügeoperation bestätigt lediglich, dass die Daten vom Service empfangen wurden, und nicht, dass sie dauerhaft in der Tabelle gespeichert sind. Führen Sie den nächsten Schritt aus, um die Persistenz zu überprüfen, bevor Sie abfragen oder zum nächsten Batch wechseln.

4.4 Überprüfen der Datenhäufigkeit und des bestätigten Offset mithilfe von getChannelStatus

Führen Sie diesen kritischen Schritt aus, um die Zuverlässigkeit der Anwendung sicherzustellen. Es ist nicht garantiert, dass die Daten persistent sind, bis committedOffset vorangeschritten ist. Um zu bestätigen, dass die Zeilen, die Sie gerade angehängt haben, erfolgreich beibehalten wurden, verwenden Sie getChannelStatus.

Prüfen Sie den aktuellen Status Ihres Streaming-Kanals:

curl -sS -X POST \
  -H "Authorization: Bearer $SCOPED_TOKEN" \
  -H "Content-Type: application/json" \
  "https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE:bulk-channel-status" \
  -d "{\"channel_names\": [\"$CHANNEL\"]}" | jq ".channel_statuses.\"$CHANNEL\""
Copy

Verifizierungsprüfung

Sie müssen sicherstellen, dass der in der Antwort zurückgegebene committedOffset-Wert größer oder gleich dem Offset der gerade angehängten Zeilen ist. Erst nach dem Voranschreiten des committedOffset sind die Daten in der Tabelle sicher verfügbar.

4.5 Abfragen der Tabelle nach persistenten Daten

Nachdem Sie bestätigt haben, dass das committedOffset im vorherigen Schritt (4.4) vorangeschritten ist, können Sie abfragen, ob die Daten in Ihre Snowflake-Tabelle aufgenommen werden.

Führen Sie die folgende SQL-Abfrage in Snowflake aus:

SELECT * FROM MY_DATABASE.MY_SCHEMA.MY_TABLE WHERE id = 1;
Copy

(Optional) Step 5: Clean up

Entfernen Sie temporäre Dateien und nicht festgelegte Umgebungsvariablen:

rm -f rows.ndjson open_resp.json
unset JWT_TOKEN SCOPED_TOKEN ACCOUNT USER DB SCHEMA PIPE CHANNEL CONTROL_HOST INGEST_HOST CONT_TOKEN OFFSET_TOKEN NEW_OFFSET NOW_TS
Copy

Problembehandlung

  • HTTP 401 (Nicht autorisiert): Überprüfen Sie, ob Ihr JWT-Token gültig und nicht abgelaufen ist. Generieren Sie es bei Bedarf neu.

  • HTTP 404 (Nicht gefunden): Überprüfen Sie noch einmal, ob die Namen von Datenbank, Schema, Pipe und Kanal korrekt geschrieben sind und in Ihrem Snowflake-Konto vorhanden sind.

  • Kein Ingest-Host: Stellen Sie sicher, dass die Host-URL der Kontrollebene korrekt und zugänglich ist.