Guia de migração do Snowpipe Streaming

Este guia descreve como migrar do SDK clássico do Snowpipe para o SDK do Snowpipe Streaming de alto desempenho. As alterações arquitetônicas e as atualizações da API discutidas aqui também se aplicam a migrações para o SDK Python, porque a arquitetura de alto desempenho está disponível em ambas as linguagens. Embora os exemplos de código neste documento estejam em Java, os principais princípios de migração permanecem consistentes em todas as linguagens.

Principais mudanças na arquitetura

A tabela a seguir resume as alterações arquitetônicas mais importantes na de alto desempenho do SDK do Snowpipe Streaming. Para uma comparação detalhada dos SDKs, consulte Comparação entre o SDK clássico e o SDK de alto desempenho.

Área

Clássico (snowflake-ingest-java)

Alto desempenho (SDK snowpipe-streaming)

Ponto de entrada

Os dados são ingeridos diretamente nas tabelas.

Os dados são ingeridos por meio de objetos PIPE, que oferecem suporte a transformações e aplicação de esquema.

SDK/núcleo

Somente SDK Java.

SDK em várias linguagens (Java e Python) com um núcleo Rust compartilhado.

Nomes de API

insertRow/insertRows, openChannel(request)

appendRow/appendRows, openChannel(channelName, offsetToken)

Tratamento de erros

É realizada a validação do lado do cliente.

É fornecida validação do lado do servidor com feedback de erros mais detalhado.

Tratamento de pressão de retorno

Coloca o thread em modo de espera, levando a um estado bloqueado/não responde.

Retorna um erro, permitindo que o autor da chamada implemente uma estratégia de espera/nova tentativa.

Mapeamento de cliente para tabela

Um único objeto de cliente podia abrir canais para qualquer tabela.

Um único objeto de cliente está agora exclusivamente vinculado a um objeto de canal.

Faturamento

Com base na computação e na contagem de clientes.

Fixo, por GB ingerido.

Esquema/transformações

Gerenciado no lado do cliente.

Gerenciado no lado do servidor por meio da definição de PIPE.

Processo de migração

Para migrar seu aplicativo para o SDK de alto desempenho, conclua as seguintes etapas de alto nível:

  1. Para cada tabela de destino, crie um PIPE.

    CREATE PIPE my_pipe
    AS COPY INTO my_table
      FROM TABLE (DATA_SOURCE(TYPE => 'STREAMING'))
      MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
      [CLUSTER_AT_INGEST_TIME = TRUE];
    
    Copy
  2. Pare a ingestão de todos os clientes clássicos.

  3. Para cada canal no cliente clássico, confirme os últimos offsets confirmados. Para recuperar esses offsets, use o método getLatestCommittedOffsetTokens() do SDK clássico. Verifique se esses offsets estão alinhados com seus registros do lado do cliente.

  4. Atualize seu código de aplicativo.

    • Mude as dependências de seu projeto para o SDK de alto desempenho (Java ou Python).

    • Atualize suas chamadas de API, conforme detalhado na seção Alterações na API e na configuração a seguir.

    • Inicialização de um cliente por tabela/PIPE usando o último offset confirmado do Snowflake.

  5. Depois que seu novo cliente estiver configurado e estável, retome a ingestão.

Alterações na API e na configuração

As seguintes alterações devem ser feitas em suas chamadas de API e nas definições de configuração durante a migração:

Inicialização do cliente

  • Clássico: builder(name)

  • Alto desempenho: builder(name, db, schema, pipeName)

Canais

  • Clássico: openChannel(OpenChannelRequest)

  • Alto desempenho: openChannel(channelName, offsetToken) retorna o canal e o status

Métodos de ingestão

  • Clássico: insertRow/insertRows(...)

  • Alto desempenho: appendRow/appendRows(...)

Rastreamento de offset

  • O método getLatestCommittedOffsetTokens(channels) do SDK clássico oferece visibilidade limitada e não possui contexto de erro.

  • O SDK de alto desempenho ainda oferece suporte a getLatestCommittedOffsetTokens(...), mas para um monitoramento robusto, recomendamos que você use getChannelStatuses(...). Esse método executa as seguintes tarefas:

    • Confirma que os offsets estão avançando conforme o esperado.

    • Retorna contagens de erros e informações detalhadas de erro por canal.

    • Permite o monitoramento proativo e a solução de problemas de seus pipelines de dados.