Práticas recomendadas para Snowpipe Streaming com arquitetura de alto desempenho¶
Este guia descreve as principais práticas recomendadas para projetar e implementar pipelines de ingestão de dados robustos usando o Snowpipe Streaming com arquitetura de alto desempenho. Ao seguir essas práticas recomendadas, você garante que seus pipelines sejam fortes, confiáveis e tenham um tratamento de erros eficiente.
Gerenciamento estratégico de canais¶
Aplique as seguintes estratégias de gerenciamento de canais para desempenho e estabilidade a longo prazo:
Usar canais de longa duração: para minimizar a sobrecarga, abra um canal uma vez e mantenha-o ativo durante a tarefa de ingestão. Evite abrir e fechar canais repetidamente.
Usar nomes de canais determinísticos: aplique uma convenção de nomenclatura consistente e previsível (por exemplo,
source-env-region-client-id) para simplificar a solução de problemas e facilitar processos de recuperação automatizados.Escalonar com vários canais: para aumentar o rendimento, abra vários canais. Esses canais podem apontar para um único canal de destino ou para vários canais, dependendo dos limites de serviço e dos seus requisitos de taxa de transferência.
Monitorar status do canal: use regularmente o método
getChannelStatuspara monitorar a integridade de seus canais de ingestão.Rastreie
last_committed_offset_tokenpara verificar se os dados estão sendo ingeridos com sucesso e se o pipeline está fazendo progresso.Monitore
row_error_countpara detectar antecipadamente registros ruins ou outros problemas de ingestão.
Validação do esquema de forma consistente¶
Certifique-se de que os dados recebidos estejam em conformidade com o esquema de tabela esperado para evitar falhas de ingestão e manter a integridade dos dados:
Validação no lado do cliente: implemente a validação de esquema no lado do cliente para fornecer feedback imediato e reduzir erros no lado do servidor. Embora a validação completa linha por linha ofereça segurança máxima, um método com melhor desempenho pode envolver validação seletiva; por exemplo, limites em um lote ou amostragem de linhas.
Validação no lado do servidor: a arquitetura de alto desempenho pode descarregar a validação de esquema para o servidor. Os erros e suas contagens são relatados por meio de
getChannelStatus, se ocorrerem incompatibilidades de esquema durante a ingestão no canal e na tabela de destino.
Adição de colunas de metadados do lado do cliente¶
Para permitir uma detecção e recuperação de erros robustas, deve-se carregar metadados de ingestão como parte da carga útil da linha. Isso requer planejamento da forma de seus dados e a definição de PIPE com antecedência.
Adicione as seguintes colunas à carga útil da linha antes da ingestão:
CHANNEL_ID; por exemplo, um INTEGER compactoSTREAM_OFFSET; um BIGINT que aumenta monotonicamente por canal, como um deslocamento de partição do Kafka
Juntas, essas colunas identificam de forma única os registros por canal e permitem que você rastreie a origem dos dados.
Opcionalmente, adicione uma coluna PIPE_ID se vários canais fizerem a ingestão na mesma tabela de destino. Com essa coluna, é possível rastrear as linhas de volta ao pipeline de ingestão. Você pode armazenar nomes de canal descritivos em uma tabela de pesquisa separada, mapeamento os para inteiros compactados para reduzir os custos de armazenamento.
Detecção e recuperação de erros usando offsets de metadados¶
Combine o monitoramento de canal com suas colunas de metadados para detectar e se recuperar de problemas:
Monitorar status: verifique regularmente o
getChannelStatus. Umrow_error_countcrescente é um forte indicador de possível problema.Detectar registros ausentes: se forem detectados erros, use uma consulta SQL para identificar registros ausentes ou fora de ordem, verificando se há lacunas em sua sequência
STREAM_OFFSET.
SELECT
PIPE_ID,
CHANNEL_ID,
STREAM_OFFSET,
LAG(STREAM_OFFSET) OVER (
PARTITION BY PIPE_ID, CHANNEL_ID
ORDER BY STREAM_OFFSET
) AS previous_offset,
(LAG(STREAM_OFFSET) OVER (
PARTITION BY PIPE_ID, CHANNEL_ID
ORDER BY STREAM_OFFSET
) + 1) AS expected_next
FROM my_table
QUALIFY STREAM_OFFSET != previous_offset + 1;
Usar compactação para solicitações de API REST¶
Quando você usar a API REST Snowpipe Streaming, use a compactação para enviar mais dados por solicitação e reduzir a sobrecarga da rede.
Embora a API REST tenha um limite físico de 4 MB por solicitação, esse limite se aplica ao tamanho da transferência observado. Ao usar a compactação, você pode ajustar um volume maior de dados não compactados em cada solicitação, permitindo uma taxa de transferência maior e reduzindo o número de chamadas de API necessárias.
O Snowflake recomenda usar o ZSTD como algoritmo de compactação de alto desempenho, embora o Gzip também seja compatível.
Otimização do desempenho e do custo de ingestão com MATCH_BY_COLUMN_NAME¶
Configure seu canal para mapear as colunas necessárias de seus dados de origem em vez de ingerir todos os dados em uma única coluna VARIANT. Para isso, use MATCH_BY_COLUMN_NAME = CASE_SENSITIVE ou aplique transformações na definição do canal. Essa prática recomendada não apenas otimiza seus custos de ingestão, mas também melhora o desempenho geral do pipeline de dados de streaming.
Essa prática recomendada tem as seguintes vantagens importantes:
Ao usar
MATCH_BY_COLUMN_NAME = CASE_SENSITIVE, você será cobrado apenas pelos valores de dados que são ingeridos na sua tabela de destino. Em comparação, a ingestão de dados em uma única coluna VARIANT cobra por todos os bytes JSON, tanto as chaves quanto os valores. Para dados com detalhados ou muitas chaves JSON, isso pode levar a um aumento significativo e desnecessário nos seus custos de ingestão.O mecanismo de processamento do Snowflake é mais eficiente em termos de computação. Em vez de analisar todo o objeto JSON em uma VARIANT, e depois extrair as colunas necessárias, esse método extrai diretamente os valores necessários.
Usar tipos de dados nativos para dados semiestruturados¶
Para desempenho e integridade de dados ideais, forneça os dados semiestruturados usando objetos de linguagem nativos em vez de cadeias de caracteres serializadas.
Desempenho: com objetos nativos, o SDK pode lidar com dados de forma mais eficiente sem exigir etapas adicionais de análise no servidor Snowflake.
Segurança do tipo: a arquitetura de alto desempenho trata os literais de cadeia de caracteres como texto literal. Ao usar objetos nativos, você garante que seus dados sejam armazenados como JSON estruturado em vez de valores de cadeia de caracteres com escape.
Exemplo em Java:
// Preferred: SDK converts the List to a structured ARRAY
row.put("tags", Arrays.asList("electronics", "sale"));
Exemplo em Python:
# Preferred: SDK converts the dict to a structured VARIANT
row["payload"] = {"event_id": 101, "status": "active"}
Obter métricas do Prometheus¶
Para obter métricas de desempenho do cliente de alto desempenho do Snowpipe Streaming, você deve habilitar o servidor de métricas Prometheus integrado e configurar o serviço Prometheus para coletar dados do ponto de extremidade.
Ative o servidor de métricas definindo a variável de ambiente SS_ENABLE_METRICS para true antes de executar seu aplicativo.
Colete os dados do ponto de extremidade de métricas no host que está executando o processo de ingestão do Snowpipe Streaming. O caminho padrão é /metrics no host e porta definidos por SS_METRICS_IP e SS_METRICS_PORT.
Exemplo: Verificação do ponto de extremidade de métricas (processo local/máquina de desenvolvimento)¶
# Enable Prometheus metrics
export SS_ENABLE_METRICS=true
# Run your application (the server starts on 127.0.0.1:50000 by default)
# Curl the endpoint to verify the metrics are exposed
curl http://127.0.0.1:50000/metrics
Exemplo: Configuração da coleta de dados do Prometheus¶
Aponte seu serviço Prometheus para o host que executa o cliente Snowpipe Streaming.
scrape_configs:
- job_name: snowpipe_streaming_hp
metrics_path: /metrics # default is /metrics
static_configs:
- targets: ['127.0.0.1:50000']
Projeto com foco na resiliência¶
Delimitar a ingestão em blocos try-catch¶
Não presuma que insertRows sempre vai ser bem-sucedido. Garanta que o ciclo de ingestão possa capturar SFException e interpretar os códigos de status HTTP, especificamente o 409 para invalidações e o 429 para limitação.
Implementar a espera exponencial¶
Para erros repetíveis (429, 500, 503), não tente de novo imediatamente. Use uma estratégia de espera exponencial, aumentando o tempo de espera entre cada nova tentativa, para permitir que o sistema se recupere.
Verificar o progresso com tokens de deslocamento¶
Chame getLatestCommittedOffsetToken periodicamente para rastrear quais dados persistiram com sucesso. Se ocorrer um erro 409, use esse token para identificar o ponto exato do qual reproduzir os dados após a reabertura do canal.
Monitorar status do canal¶
verifique getChannelStatus() regularmente. Se o código de status for diferente de SUCCESS, acione sua lógica de tratamento de erros para redefinir a conexão do canal ou do cliente.