Introdução à API REST do Snowpipe Streaming: Um tutorial de cURL e JWT

Nota

Recomendamos que você use o SDK snowpipe-streaming como a opção principal e padrão. A API REST não é otimizada para cenários de alta taxa de transferência.

Este guia mostra como transmitir dados para o Snowflake usando a API REST do Snowpipe Streaming e um token Web JSON (JWT) gerado com SnowSQL.

Pré-requisitos

Antes de começar, certifique-se de ter os seguintes itens:

Usuário e objetos do Snowflake:

Um usuário do Snowflake configurado para autenticação de par de chaves. Registre sua chave pública usando o seguinte comando SQL:

ALTER USER MY_USER SET RSA_PUBLIC_KEY='<your-public-key>';
Copy

Um banco de dados Snowflake, um esquema e um objeto PIPE para ingestão de streaming. Você pode criá-los usando os seguintes comandos SQL (substitua marcadores de posição como MY_DATABASE, MY_SCHEMA, MY_PIPE, MY_TABLE pelos nomes desejados):

-- Create Database and Schema
CREATE OR REPLACE DATABASE MY_DATABASE;
CREATE OR REPLACE SCHEMA MY_SCHEMA;

-- Create Target Table
CREATE OR REPLACE TABLE MY_TABLE (
    data VARIANT,
    c1 NUMBER,
    c2 STRING
);

-- Create PIPE Object for Streaming Ingestion
CREATE OR REPLACE PIPE MY_PIPE
AS COPY INTO MY_TABLE FROM (SELECT $1, $1:c1, $1:ts FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
Copy

Ferramentas instaladas:

  • curl: Para fazer solicitações HTTP.

  • jq: Para analisar respostas JSON.

  • SnowSQL: Para executar comandos, o cliente de linha de comando do Snowflake.

JWT gerado: gere seu JWT usando o SnowSQL:

snowsql --private-key-path rsa_key.p8 --generate-jwt \
  -a <ACCOUNT_LOCATOR> \
  -u MY_USER
Copy

Cuidado

Armazene seu JWT com segurança. Evite expô-lo em logs ou scripts.

Instruções passo a passo

Conclua as seguintes etapas para transmitir dados para o Snowflake.

Etapa 1: Definir variáveis ​​de ambiente

Configure as variáveis ​​de ambiente necessárias para sua conta Snowflake e a operação de streaming:

# Paste the JWT token obtained from SnowSQL
export JWT_TOKEN="PASTE_YOUR_JWT_TOKEN_HERE"

# Configure your Snowflake account and resources:
export ACCOUNT="<ACCOUNT_LOCATOR>" # For example, ab12345
export USER="MY_USER"
export DB="MY_DATABASE"
export SCHEMA="MY_SCHEMA"
export PIPE="MY_PIPE"
export CHANNEL="MY_CHANNEL"

# Replace ACCOUNT with your Account URL Host to form the control plane host:
export CONTROL_HOST="${ACCOUNT}.snowflakecomputing.com"
Copy

Etapa 2: Descobrir o host de ingestão

O host de ingestão é o ponto de extremidade para o streaming de dados. Descobrir o host de ingestão usando seu JWT:

export INGEST_HOST=$(curl -sS -X GET \
  -H "Authorization: Bearer $JWT_TOKEN" \
  -H "X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT" \
  "https://${CONTROL_HOST}/v2/streaming/hostname")

echo "Ingest Host: $INGEST_HOST"
Copy

Obtenha um token com escopo para autorizar operações no host de ingestão:

export SCOPED_TOKEN=$(curl -sS -X POST "https://$CONTROL_HOST/oauth/token" \
  -H 'Content-Type: application/x-www-form-urlencoded' \
  -H "Authorization: Bearer $JWT_TOKEN" \
  -d "grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&scope=${INGEST_HOST}")

echo "Scoped Token obtained for ingest host"
Copy

Etapa 3: Abrir o canal

Abrir um canal de streaming para iniciar a ingestão de dados:

curl -sS -X PUT \
  -H "Authorization: Bearer $SCOPED_TOKEN" \
  -H "Content-Type: application/json" \
  "https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL" \
  -d '{}' | tee open_resp.json | jq .
Copy

Etapa 4: Anexar uma linha de dados

Anexar uma única linha de dados ao canal aberto.

4.1 Extrair tokens de continuação e deslocamento

Esses tokens são cruciais para manter o estado da sua sessão de streaming.

export CONT_TOKEN=$(jq -r '.next_continuation_token' open_resp.json)
export OFFSET_TOKEN=$(jq -r '.channel_status.last_committed_offset_token' open_resp.json)
export NEW_OFFSET=$((OFFSET_TOKEN + 1))
Copy

4.2 Criar linha de amostra

Gerar uma linha de dados de amostra no formato NDJSON:

export NOW_TS=$(date -u +"%Y-%m-%dT%H:%M:%SZ")

cat <<EOF > rows.ndjson
{
  "id": 1,
  "c1": $RANDOM,
  "ts": "$NOW_TS"
}
EOF
Copy

4.3 Acrescentar linha

Enviar a linha de amostra para o canal de streaming:

curl -sS -X POST \
  -H "Authorization: Bearer $SCOPED_TOKEN" \
  -H "Content-Type: application/x-ndjson" \
  "https://${INGEST_HOST}/v2/streaming/data/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL/rows?continuationToken=$CONT_TOKEN&offsetToken=$NEW_OFFSET" \
  --data-binary @rows.ndjson | jq .
Copy

Importante

Após cada operação de acréscimo, você deve atualizar o continuationToken para a próxima chamada de acréscimo. A resposta da chamada de acréscimo de linhas contém um campo next_continuation_token que você deve usar para fazer suas atualizações.

4.4 Verificar a ingestão de dados

Confirme se os dados foram ingeridos na sua tabela do Snowflake.

Execute a seguinte consulta SQL no Snowflake:

SELECT * FROM MY_DATABASE.MY_SCHEMA.MY_TABLE WHERE id = 1;
Copy

Etapa 5: Obter o status do canal

Verifique o status atual do seu canal de streaming:

curl -sS -X POST \
  -H "Authorization: Bearer $SCOPED_TOKEN" \
  -H "Content-Type: application/json" \
  "https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE:bulk-channel-status" \
  -d "{\"channel_names\": [\"$CHANNEL\"]}" | jq ".channel_statuses.\"$CHANNEL\""
Copy

(Opcional) Etapa 6: Limpeza

Remover arquivos temporários e desconfigurar variáveis ​​de ambiente:

rm -f rows.ndjson open_resp.json
unset JWT_TOKEN SCOPED_TOKEN ACCOUNT USER DB SCHEMA PIPE CHANNEL CONTROL_HOST INGEST_HOST CONT_TOKEN OFFSET_TOKEN NEW_OFFSET NOW_TS
Copy

Solução de problemas

  • HTTP 401 (Não autorizado): verifique se o seu token JWT é válido e não expirou. Se necessário, gere-o novamente.

  • HTTP 404 (Não encontrado): verifique novamente se os nomes do banco de dados, esquema, pipe e canal estão escritos corretamente e existem na sua conta Snowflake.

  • Sem host de ingestão: certifique-se de que o URL do host do plano de controle esteja correto e acessível.