Introdução à API REST do Snowpipe Streaming: Um tutorial de cURL e JWT¶
Nota
Recomendamos que você use o SDK snowpipe-streaming
como a opção principal e padrão. A API REST não é otimizada para cenários de alta taxa de transferência.
Este guia mostra como transmitir dados para o Snowflake usando a API REST do Snowpipe Streaming e um token Web JSON (JWT) gerado com SnowSQL.
Pré-requisitos¶
Antes de começar, certifique-se de ter os seguintes itens:
Usuário e objetos do Snowflake:
Um usuário do Snowflake configurado para autenticação de par de chaves. Registre sua chave pública usando o seguinte comando SQL:
ALTER USER MY_USER SET RSA_PUBLIC_KEY='<your-public-key>';
Um banco de dados Snowflake, um esquema e um objeto PIPE para ingestão de streaming. Você pode criá-los usando os seguintes comandos SQL (substitua marcadores de posição como MY_DATABASE
, MY_SCHEMA
, MY_PIPE
, MY_TABLE
pelos nomes desejados):
-- Create Database and Schema
CREATE OR REPLACE DATABASE MY_DATABASE;
CREATE OR REPLACE SCHEMA MY_SCHEMA;
-- Create Target Table
CREATE OR REPLACE TABLE MY_TABLE (
data VARIANT,
c1 NUMBER,
c2 STRING
);
-- Create PIPE Object for Streaming Ingestion
CREATE OR REPLACE PIPE MY_PIPE
AS COPY INTO MY_TABLE FROM (SELECT $1, $1:c1, $1:ts FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
Ferramentas instaladas:
curl
: Para fazer solicitações HTTP.jq
: Para analisar respostas JSON.SnowSQL
: Para executar comandos, o cliente de linha de comando do Snowflake.
JWT gerado: gere seu JWT usando o SnowSQL:
snowsql --private-key-path rsa_key.p8 --generate-jwt \
-a <ACCOUNT_LOCATOR> \
-u MY_USER
Cuidado
Armazene seu JWT com segurança. Evite expô-lo em logs ou scripts.
Instruções passo a passo¶
Conclua as seguintes etapas para transmitir dados para o Snowflake.
Etapa 1: Definir variáveis de ambiente¶
Configure as variáveis de ambiente necessárias para sua conta Snowflake e a operação de streaming:
# Paste the JWT token obtained from SnowSQL
export JWT_TOKEN="PASTE_YOUR_JWT_TOKEN_HERE"
# Configure your Snowflake account and resources:
export ACCOUNT="<ACCOUNT_LOCATOR>" # For example, ab12345
export USER="MY_USER"
export DB="MY_DATABASE"
export SCHEMA="MY_SCHEMA"
export PIPE="MY_PIPE"
export CHANNEL="MY_CHANNEL"
# Replace ACCOUNT with your Account URL Host to form the control plane host:
export CONTROL_HOST="${ACCOUNT}.snowflakecomputing.com"
Etapa 2: Descobrir o host de ingestão¶
O host de ingestão é o ponto de extremidade para o streaming de dados. Descobrir o host de ingestão usando seu JWT:
export INGEST_HOST=$(curl -sS -X GET \
-H "Authorization: Bearer $JWT_TOKEN" \
-H "X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT" \
"https://${CONTROL_HOST}/v2/streaming/hostname")
echo "Ingest Host: $INGEST_HOST"
Obtenha um token com escopo para autorizar operações no host de ingestão:
export SCOPED_TOKEN=$(curl -sS -X POST "https://$CONTROL_HOST/oauth/token" \
-H 'Content-Type: application/x-www-form-urlencoded' \
-H "Authorization: Bearer $JWT_TOKEN" \
-d "grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&scope=${INGEST_HOST}")
echo "Scoped Token obtained for ingest host"
Etapa 3: Abrir o canal¶
Abrir um canal de streaming para iniciar a ingestão de dados:
curl -sS -X PUT \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/json" \
"https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL" \
-d '{}' | tee open_resp.json | jq .
Etapa 4: Anexar uma linha de dados¶
Anexar uma única linha de dados ao canal aberto.
4.1 Extrair tokens de continuação e deslocamento¶
Esses tokens são cruciais para manter o estado da sua sessão de streaming.
export CONT_TOKEN=$(jq -r '.next_continuation_token' open_resp.json)
export OFFSET_TOKEN=$(jq -r '.channel_status.last_committed_offset_token' open_resp.json)
export NEW_OFFSET=$((OFFSET_TOKEN + 1))
4.2 Criar linha de amostra¶
Gerar uma linha de dados de amostra no formato NDJSON:
export NOW_TS=$(date -u +"%Y-%m-%dT%H:%M:%SZ")
cat <<EOF > rows.ndjson
{
"id": 1,
"c1": $RANDOM,
"ts": "$NOW_TS"
}
EOF
4.3 Acrescentar linha¶
Enviar a linha de amostra para o canal de streaming:
curl -sS -X POST \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/x-ndjson" \
"https://${INGEST_HOST}/v2/streaming/data/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL/rows?continuationToken=$CONT_TOKEN&offsetToken=$NEW_OFFSET" \
--data-binary @rows.ndjson | jq .
Importante
Após cada operação de acréscimo, você deve atualizar o continuationToken para a próxima chamada de acréscimo. A resposta da chamada de acréscimo de linhas contém um campo next_continuation_token
que você deve usar para fazer suas atualizações.
4.4 Verificar a ingestão de dados¶
Confirme se os dados foram ingeridos na sua tabela do Snowflake.
Execute a seguinte consulta SQL no Snowflake:
SELECT * FROM MY_DATABASE.MY_SCHEMA.MY_TABLE WHERE id = 1;
Etapa 5: Obter o status do canal¶
Verifique o status atual do seu canal de streaming:
curl -sS -X POST \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/json" \
"https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE:bulk-channel-status" \
-d "{\"channel_names\": [\"$CHANNEL\"]}" | jq ".channel_statuses.\"$CHANNEL\""
(Opcional) Etapa 6: Limpeza¶
Remover arquivos temporários e desconfigurar variáveis de ambiente:
rm -f rows.ndjson open_resp.json
unset JWT_TOKEN SCOPED_TOKEN ACCOUNT USER DB SCHEMA PIPE CHANNEL CONTROL_HOST INGEST_HOST CONT_TOKEN OFFSET_TOKEN NEW_OFFSET NOW_TS
Solução de problemas¶
HTTP 401 (Não autorizado): verifique se o seu token JWT é válido e não expirou. Se necessário, gere-o novamente.
HTTP 404 (Não encontrado): verifique novamente se os nomes do banco de dados, esquema, pipe e canal estão escritos corretamente e existem na sua conta Snowflake.
Sem host de ingestão: certifique-se de que o URL do host do plano de controle esteja correto e acessível.