Introdução à API REST do Snowpipe Streaming: Um tutorial de cURL e JWT

Nota

Recomendamos que você comece com oSDK do Snowpipe Streaming sobre API REST para se beneficiar do melhor desempenho e da experiência de introdução.

Este guia mostra como transmitir dados para o Snowflake usando a API REST do Snowpipe Streaming e um token Web JSON (JWT) gerado com SnowSQL.

Pré-requisitos

Antes de começar, certifique-se de ter os seguintes itens:

Usuário e objetos do Snowflake:

Um usuário do Snowflake configurado para autenticação de par de chaves. Registre sua chave pública usando o seguinte comando SQL:

ALTER USER MY_USER SET RSA_PUBLIC_KEY='<your-public-key>';
Copy

Um banco de dados Snowflake, um esquema e um objeto PIPE para ingestão de streaming. Você pode criá-los usando os seguintes comandos SQL (substitua marcadores de posição como MY_DATABASE, MY_SCHEMA, MY_PIPE, MY_TABLE pelos nomes desejados):

-- Create Database and Schema
CREATE OR REPLACE DATABASE MY_DATABASE;
CREATE OR REPLACE SCHEMA MY_SCHEMA;

-- Create Target Table
CREATE OR REPLACE TABLE MY_TABLE (
    data VARIANT,
    c1 NUMBER,
    c2 STRING
);

-- Create PIPE Object for Streaming Ingestion
CREATE OR REPLACE PIPE MY_PIPE
AS COPY INTO MY_TABLE FROM (SELECT $1, $1:c1, $1:ts FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
Copy

ACCOUNT_IDENTIFIER:

Sugerimos o uso do Formato 1 para ACCOUNT_IDENTIFIER, que usa o nome da conta dentro de sua organização; por exemplo, myorg-account123. Para obter mais informações sobre o formato, consulte Identificadores de conta.

Ferramentas instaladas:

  • curl: Para fazer solicitações HTTP.

  • jq: Para analisar respostas JSON.

  • SnowSQL: Para executar comandos, o cliente de linha de comando do Snowflake.

JWT gerado:

gere seu JWT usando SnowSQL:

snowsql --private-key-path rsa_key.p8 --generate-jwt \
  -a <ACCOUNT_IDENTIFIER> \
  -u MY_USER
Copy

Cuidado

Armazene seu JWT com segurança. Evite expô-lo em logs ou scripts.

Instruções passo a passo

Conclua as seguintes etapas para transmitir dados para o Snowflake.

Etapa 1: Definir variáveis ​ de ambiente

Configure as variáveis ​ de ambiente necessárias para sua conta Snowflake e a operação de streaming:

# Paste the JWT token obtained from SnowSQL
export JWT_TOKEN="PASTE_YOUR_JWT_TOKEN_HERE"

# Configure your Snowflake account and resources:
export ACCOUNT="<ACCOUNT_IDENTIFIER>" # For example, ab12345
export USER="MY_USER"
export DB="MY_DATABASE"
export SCHEMA="MY_SCHEMA"
export PIPE="MY_PIPE"
export CHANNEL="MY_CHANNEL"

# Replace ACCOUNT with your Account URL Host to form the control plane host:
export CONTROL_HOST="${ACCOUNT}.snowflakecomputing.com"
Copy

Etapa 2: Descobrir o host de ingestão

Importante

Se o nome de sua conta Snowflake contiver sublinhados (por exemplo, MY_ACCOUNT), um problema conhecido pode causar um erro interno ao chamar o serviço de ingestão.

Você deve substituir todos os sublinhados por traços no INGEST_HOST antes de gerar o token com escopo. Este formato convertido (com traços) deve ser usado para todas as chamadas de API REST subsequentes, incluindo a geração do próprio token com escopo.

Por exemplo, se o nome do host retornado for my_account.region.ingest.snowflakecomputing.com, você deve alterá-lo para my-account.region.ingest.snowflakecomputing.com em todas chamadas de API REST subsequentes.

O host de ingestão é o ponto de extremidade para o streaming de dados. Descobrir o host de ingestão usando seu JWT:

export INGEST_HOST=$(curl -sS -X GET \
  -H "Authorization: Bearer $JWT_TOKEN" \
  -H "X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT" \
  "https://${CONTROL_HOST}/v2/streaming/hostname")

echo "Ingest Host: $INGEST_HOST"
Copy

Obtenha um token com escopo para autorizar operações no host de ingestão:

export SCOPED_TOKEN=$(curl -sS -X POST "https://$CONTROL_HOST/oauth/token" \
  -H 'Content-Type: application/x-www-form-urlencoded' \
  -H "Authorization: Bearer $JWT_TOKEN" \
  -d "grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&scope=${INGEST_HOST}")

echo "Scoped Token obtained for ingest host"
Copy

Etapa 3: Abrir o canal

Abrir um canal de streaming para iniciar a ingestão de dados:

curl -sS -X PUT \
  -H "Authorization: Bearer $SCOPED_TOKEN" \
  -H "Content-Type: application/json" \
  "https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL" \
  -d '{}' | tee open_resp.json | jq .
Copy

Etapa 4: Anexar uma linha de dados

Anexar uma única linha de dados ao canal aberto.

4.1 Extrair tokens de continuação e deslocamento

Esses tokens são cruciais para manter o estado da sua sessão de streaming.

export CONT_TOKEN=$(jq -r '.next_continuation_token' open_resp.json)
export OFFSET_TOKEN=$(jq -r '.channel_status.last_committed_offset_token' open_resp.json)
export NEW_OFFSET=$((OFFSET_TOKEN + 1))
Copy

4.2 Criar linha de amostra

Gerar uma linha de dados de amostra no formato NDJSON:

export NOW_TS=$(date -u +"%Y-%m-%dT%H:%M:%SZ")

cat <<EOF > rows.ndjson
{
  "id": 1,
  "c1": $RANDOM,
  "ts": "$NOW_TS"
}
EOF
Copy

4.3 Acrescentar linha

Enviar a linha de amostra para o canal de streaming:

curl -sS -X POST \
  -H "Authorization: Bearer $SCOPED_TOKEN" \
  -H "Content-Type: application/x-ndjson" \
  "https://${INGEST_HOST}/v2/streaming/data/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL/rows?continuationToken=$CONT_TOKEN&offsetToken=$NEW_OFFSET" \
  --data-binary @rows.ndjson | jq .
Copy

Importante

  • After each append operation, you must update the continuationToken for the next append call. The response from the append rows call contains a next_continuation_token field that you should use to make your updates.

  • The success of the append operation confirms only that the data was received by the service, not that it is persisted to the table. Take the next step to verify persistence before querying or moving to the next batch.

4.4 Verify data persistence and committed offset by using getChannelStatus

Complete this critical step to ensure application reliability. Data isn’t guaranteed to be persistent until the committedOffset has advanced. To confirm that the rows that you just appended are successfully persisted, use getChannelStatus.

Verifique o status atual do seu canal de streaming:

curl -sS -X POST \
  -H "Authorization: Bearer $SCOPED_TOKEN" \
  -H "Content-Type: application/json" \
  "https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE:bulk-channel-status" \
  -d "{\"channel_names\": [\"$CHANNEL\"]}" | jq ".channel_statuses.\"$CHANNEL\""
Copy

Verification check

You must ensure that the committedOffset returned in the response is greater than or equal to the offset of the rows you just appended. Only after the committedOffset advances can you be certain that the data is safely available in the table.

4.5 Query the table for persisted data

After you confirm that the committedOffset has advanced in the previous step (4.4), you can query to confirm that the data is ingested into your Snowflake table.

Execute a seguinte consulta SQL no Snowflake:

SELECT * FROM MY_DATABASE.MY_SCHEMA.MY_TABLE WHERE data:id::NUMBER = 1;
Copy

(Optional) Step 5: Clean up

Remover arquivos temporários e desconfigurar variáveis ​ de ambiente:

rm -f rows.ndjson open_resp.json
unset JWT_TOKEN SCOPED_TOKEN ACCOUNT USER DB SCHEMA PIPE CHANNEL CONTROL_HOST INGEST_HOST CONT_TOKEN OFFSET_TOKEN NEW_OFFSET NOW_TS
Copy

Solução de problemas

  • HTTP 401 (Não autorizado): verifique se o seu token JWT é válido e não expirou. Se necessário, gere-o novamente.

  • HTTP 404 (Não encontrado): verifique novamente se os nomes do banco de dados, esquema, pipe e canal estão escritos corretamente e existem na sua conta Snowflake.

  • Sem host de ingestão: certifique-se de que o URL do host do plano de controle esteja correto e acessível.