Guia de migração do Snowpipe Streaming¶
Este guia descreve como migrar do SDK clássico do Snowpipe para o SDK do Snowpipe Streaming de alto desempenho. As alterações arquitetônicas e as atualizações da API discutidas aqui também se aplicam a migrações para o SDK Python, porque a arquitetura de alto desempenho está disponível em ambas as linguagens. Embora os exemplos de código neste documento estejam em Java, os principais princípios de migração permanecem consistentes em todas as linguagens.
Principais mudanças na arquitetura¶
A tabela a seguir resume as alterações arquitetônicas mais importantes na de alto desempenho do SDK do Snowpipe Streaming. Para uma comparação detalhada dos SDKs, consulte Comparação entre os SDKs de alto desempenho e clássicos do Snowpipe Streaming.
Área |
Clássico (snowflake-ingest-java) |
Alto desempenho (SDK snowpipe-streaming) |
|---|---|---|
Ponto de entrada |
Os dados são ingeridos diretamente nas tabelas. |
Os dados são ingeridos por meio de objetos PIPE, que oferecem suporte a transformações e aplicação de esquema. |
SDK/núcleo |
Somente SDK Java. |
SDK em várias linguagens (Java e Python) com um núcleo Rust compartilhado. |
Nomes de API |
|
|
Tratamento de erros |
É realizada a validação do lado do cliente. |
É fornecida validação do lado do servidor com feedback de erros mais detalhado. |
Tratamento de pressão de retorno |
Coloca o thread em modo de espera, levando a um estado bloqueado/não responde. |
Retorna um erro, permitindo que o autor da chamada implemente uma estratégia de espera/nova tentativa. |
Mapeamento de cliente para tabela |
Um único objeto de cliente podia abrir canais para qualquer tabela. |
Um único objeto de cliente está agora exclusivamente vinculado a um objeto de canal. |
Faturamento |
Com base na computação e na contagem de clientes. |
Fixo, por GB ingerido. |
Esquema/transformações |
Gerenciado no lado do cliente. |
Gerenciado no lado do servidor por meio da definição de PIPE. |
Processo de migração¶
Para migrar seu aplicativo para o SDK de alto desempenho, conclua as seguintes etapas de alto nível:
Para cada tabela de destino, crie um PIPE.
CREATE PIPE my_pipe AS COPY INTO my_table FROM TABLE (DATA_SOURCE(TYPE => 'STREAMING')) MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE [CLUSTER_AT_INGEST_TIME = TRUE];
Pare a ingestão de todos os clientes clássicos.
Para cada canal no cliente clássico, confirme os últimos offsets confirmados. Para recuperar esses offsets, use o método
getLatestCommittedOffsetTokens()do SDK clássico. Verifique se esses offsets estão alinhados com seus registros do lado do cliente.Atualize seu código de aplicativo.
Mude as dependências de seu projeto para o SDK de alto desempenho (Java ou Python).
Atualize suas chamadas de API, conforme detalhado na seção Alterações na API e na configuração a seguir.
Inicialização de um cliente por tabela/PIPE usando o último offset confirmado do Snowflake.
Depois que seu novo cliente estiver configurado e estável, retome a ingestão.
Alterações na API e na configuração¶
As seguintes alterações devem ser feitas em suas chamadas de API e nas definições de configuração durante a migração:
Inicialização do cliente¶
Clássico:
builder(name)Alto desempenho:
builder(name, db, schema, pipeName)
Canais¶
Clássico:
openChannel(OpenChannelRequest)Alto desempenho:
openChannel(channelName, offsetToken)retorna o canal e o status
Métodos de ingestão¶
Clássico:
insertRow/insertRows(...)Alto desempenho:
appendRow/appendRows(...)
Rastreamento de offset¶
O método
getLatestCommittedOffsetTokens(channels)do SDK clássico oferece visibilidade limitada e não possui contexto de erro.O SDK de alto desempenho ainda oferece suporte a
getLatestCommittedOffsetTokens(...), mas para um monitoramento robusto, recomendamos que você usegetChannelStatuses(...). Esse método executa as seguintes tarefas:Confirma que os offsets estão avançando conforme o esperado.
Retorna contagens de erros e informações detalhadas de erro por canal.
Permite o monitoramento proativo e a solução de problemas de seus pipelines de dados.
Tratando dados semiestruturados¶
Ao migrar para o SDK de alto desempenho, analise como seu aplicativo fornece dados para as colunas ARRAY e VARIANT para evitar que os dados sejam armazenados como cadeias de caracteres literais.
Mudança de comportamento¶
Passar um literal de cadeia de caracteres serializada, por exemplo, «[1, 2, 3]», para uma coluna ARRAY na v2 resulta em uma matriz de elemento único que contém esse literal de cadeia de caracteres. Para manter o comportamento da arquitetura clássica, selecione uma das seguintes opções:
Opção 1: passar objetos nativos (recomendado)¶
Atualize seu aplicativo cliente para desserializar as cadeias de caracteres JSON em objetos nativos antes de chamar appendRow.
Java: use
java.util.Listpara matrizes ejava.util.Mappara objetos.Python: use tipos
listedictnativos.
Beneficio: compatível com o canal padrão e a evolução de esquema automática.
Opção 2: transformação no lado do canal¶
Defina de maneira explícita o objeto Canal com a lógica de transformação usando a função PARSE_JSON.
SQL de exemplo
CREATE PIPE my_pipe AS
COPY INTO my_table (my_array_col)
FROM (SELECT PARSE_JSON($1:my_array_col) FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
Nota
Este método é incompatível com os recursos de canal padrão e de evolução de esquema automática.