Canais e entrega exatamente uma vez¶
Este tópico explica como o Snowpipe Streaming ingere dados por canais com garantias de ordenação e como os tokens de deslocamento permitem a entrega exatamente uma vez.
Princípios básicos da ingestão de streaming¶
O Snowpipe Streaming foi desenvolvido com base em vários princípios fundamentais de ingestão de streaming:
Ingestão contínua: os dados fluem para o Snowflake à medida que são produzidos, em vez de serem coletados em lotes e carregados periodicamente. Os aplicativos enviam as linhas continuamente por meio de conexões de longa duração, e o Snowflake confirma os dados automaticamente.
Entrega exatamente uma vez: cada registro é ingerido exatamente uma vez, mesmo quando há falhas no cliente ou interrupções na rede. O Snowpipe Streaming consegue isso por meio do rastreamento de tokens de deslocamento, que permite que os clientes retomem da última posição confirmada sem duplicar os dados.
Ingestão ordenada: as linhas são confirmadas na ordem em que são enviadas dentro de um canal. Isso preserva a sequência de eventos do sistema de origem, o que é fundamental para dados de séries temporais, pipelines CDC e trilhas de auditoria.
Baixa latência: os dados ficam disponíveis para consulta em pelo menos 5 segundos após a ingestão. Isso permite análises quase em tempo real sem os atrasos do carregamento em lote tradicional.
Sem servidor: o Snowflake gerencia todos os recursos de computação para ingestão. Os recursos são dimensionados automaticamente com base na taxa de transferência, sem infraestrutura para o cliente provisionar ou gerenciar.
Como é o fluxo dos dados¶
Um aplicativo cliente se conecta ao Snowflake usando um SDK do Snowpipe Streaming (Java ou Python) ou a API REST. O cliente abre um ou mais canais de um pipe e, em seguida, envia as linhas por esses canais. O Snowflake armazena em buffer e confirma os dados na tabela de destino, tornando-os disponíveis para consulta em segundos.
O fluxo completo:
O aplicativo cliente envia as linhas usando o SDK (
appendRows) ou a API REST (ponto de extremidadeAppend Rows).O canal recebe as linhas em ordem e associa cada lote a um token de deslocamento para rastreamento do progresso.
O pipe processa os dados do lado do servidor: valida o esquema, aplica as possíveis transformações configuradas ou o pré-clustering e, em seguida, confirma na tabela de destino.
A tabela de destino recebe os dados confirmados, que se tornam imediatamente consultáveis.
Canais¶
Canal é uma conexão lógica nomeada de streaming com o Snowflake para carregar dados em uma tabela. Os canais oferecem duas garantias:
Ingestão ordenada: a ordenação das linhas e seus respectivos tokens de deslocamento são preservados em um canal.
Entrega exatamente uma vez: os tokens de deslocamento permitem que os clientes rastreiem o progresso confirmado e reproduzam a partir da última posição confirmada na recuperação.
A ordem é preservada em um canal, mas não entre os canais que apontam para a mesma tabela.
Os canais são abertos de um pipe. O SDK cliente pode abrir vários canais para vários pipes; no entanto, o SDK não pode abrir canais de várias contas. Os canais devem ter longa duração quando um cliente insere dados ativamente, e devem ser reutilizados após as reinicializações do processo do cliente, pois as informações de token de deslocamento são mantidas.
Você pode descartar os canais permanentemente usando a API DropChannelRequest quando o canal e os metadados de deslocamento associados não são mais necessários. Você pode descartar um canal de duas maneiras:
Descartar um canal no fechamento. Os dados dentro do canal são automaticamente liberados antes que o canal seja descartado.
Descartar um canal às cegas. Não recomendamos essa abordagem porque ela descarta todos os dados pendentes.
Você pode executar o comando SHOW CHANNELS para listar os canais para os quais você tem privilégios de acesso. Para obter mais informações, consulte SHOW CHANNELS.
Nota
Os canais inativos juntamente com tokens de offset são excluídos automaticamente após 30 dias de inatividade.
Tokens deslocamento e entrega exatamente uma vez¶
Dica
Como funciona a opção exatamente uma vez no Snowpipe Streaming: Seu aplicativo envia as linhas com um token deslocamento (por exemplo, um deslocamento de partição Kafka). O Snowflake persiste o token quando os dados são confirmados. Na recuperação, seu aplicativo chama getLatestCommittedOffsetToken para descobrir onde parou e, em seguida, é reproduzido a partir dessa posição. Não há ingestão de dados duplicados e nem perda de dados.
Um token de deslocamento é uma cadeia de caracteres que um cliente inclui nas solicitações de envio de linhas para rastrear o progresso da ingestão por canal. Os métodos específicos usados são appendRow ou appendRows para SDK e o ponto de extremidade Append Rows para a API REST.
O token é inicializado como NULL na criação do canal e é atualizado quando as linhas com um token de deslocamento fornecido são confirmadas no Snowflake. Os clientes podem chamar getLatestCommittedOffsetToken periodicamente para obter o último token de deslocamento confirmado para um canal e usá-lo para raciocinar sobre o progresso da ingestão.
Quando um cliente reabre um canal, o último token offset persistente é devolvido. O cliente pode redefinir sua posição na fonte de dados ao usar o token para evitar o envio dos mesmos dados duas vezes. Quando ocorre um evento de reabertura de canal, quaisquer dados não confirmados armazenados em buffer no Snowflake são descartados para evitar sua confirmação.
Você pode usar o token de deslocamento confirmado mais recente para realizar o seguinte:
Rastrear o progresso da ingestão
Verificar se um deslocamento específico foi confirmado comparando-o com o último token de deslocamento confirmado
Avançar o deslocamento de origem e purgar os dados que já foram confirmados
Habilitar a desduplicação e garantir a entrega exatamente uma vez dos dados
Exemplo: recuperar-se de uma falha no conector Kafka
O conector Kafka lê um token de deslocamento de um tópico como <partition>:<offset>. Considere o seguinte cenário:
O conector Kafka fica online e abre um canal correspondente a
Partition 1no tópico KafkaTcom o nome do canalT:P1.O conector começa a ler registros da partição Kafka.
O conector chama o API, fazendo uma solicitação de método
appendRowscom o offset associado ao registro como o token offset.Por exemplo, o token offset poderia ser
10, referindo-se ao décimo registro na partição Kafka.O conector faz periodicamente solicitações de método
getLatestCommittedOffsetTokenpara determinar o progresso de ingestão.
Se o conector Kafka falhar, o procedimento a seguir retomará a leitura dos registros do deslocamento correto:
O conector Kafka volta a ficar online e reabre o canal usando o mesmo nome do anterior.
O conector chama
getLatestCommittedOffsetTokenpara obter o último deslocamento confirmado da partição.Por exemplo, suponha que o token offset persistente mais recente seja
20.O conector usa APIs de leitura Kafka para reiniciar um cursor correspondente ao offset mais 1 (
21neste exemplo).O conector retoma os registros de leitura. Nenhum dado duplicado é recuperado depois que o cursor de leitura é reposicionado com sucesso.
Exemplo: ingestão de arquivo de log com recuperação de falha
Um aplicativo lê os logs em um diretório e usa o SDK do Snowpipe Streaming para exportá-los para o Snowflake. O aplicativo faz o seguinte:
Lista os arquivos no diretório de logs.
Suponha que a estrutura de registro gere arquivos de log que podem ser ordenados lexicograficamente e que novos arquivos de registro são posicionados no fim desta ordenação.
Lê um arquivo de log linha por linha e chama o método API, fazendo solicitações de método
appendRowscom um token offset correspondente ao nome do arquivo de log e à contagem de linha ou posição do byte.Por exemplo, um token offset poderia ser
messages_1.log:20, ondemessages_1.logé o nome do arquivo de log e20é o número da linha.
Se o aplicativo falha ou precisa ser reiniciado, ele chama getLatestCommittedOffsetToken para recuperar o token de deslocamento que corresponde ao último arquivo de log e linha exportados. Continuando com o exemplo, isto poderia ser messages_1.log:20. Depois disso, o aplicativo abre messages_1.log e procura a linha 21 para impedir que a mesma linha de log seja ingerida duas vezes.
Nota
As informações do token offset podem ser perdidas. O token de offset é vinculado a um objeto de canal, e um canal é automaticamente limpo se nenhuma nova ingestão for realizada usando o canal por um período de 30 dias. Para evitar a perda do token offset, considere manter um offset separado e redefinir o token offset do canal, se necessário.
Funções offsetToken e continuationToken¶
offsetToken e continuationToken são usados para garantir exatamente uma entrega de dados, mas têm finalidades diferentes e são gerenciados por subsistemas distintos. A principal diferença é quem controla o valor do token e o escopo de uso.
continuationToken(usado apenas por usuários diretos da API REST):Esse token é gerenciado pelo Snowflake e é essencial para manter o estado de uma única sessão de streaming contínua. Quando um cliente envia dados usando a API
Append Rows, o Snowflake retorna umcontinuationToken. O cliente deve passar esse token novamente em sua próxima solicitação AppendRows para garantir que os dados sejam recebidos pelo Snowflake na ordem correta e sem lacunas. O Snowflake usa o token para detectar e evitar dados duplicados ou ausentes no caso de uma nova tentativa do SDK.offsetToken:Esse token é um identificador definido pelo usuário que permite uma entrega exata a partir de uma fonte externa. O Snowflake armazena esse valor, mas não o utiliza nas próprias operações internas e nem para impedir uma nova ingestão. É responsabilidade do sistema externo, como um conector Kafka, ler o offsetToken do Snowflake e usá-lo para rastrear o próprio progresso de ingestão, evitando o envio de dados duplicados caso o fluxo externo precise ser repetido.