Snowpipe Streaming¶
Chamar o API Snowpipe Streaming (“API”) desencadeia cargas de linhas de dados de baixa latência usando o Snowflake Ingest SDK e seu próprio código de aplicativo gerenciado. O API de ingestão de streaming escreve linhas de dados nas tabelas Snowflake, ao contrário de cargas de dados em massa ou Snowpipe, que escrevem dados de arquivos preparados. Esta arquitetura resulta em latências de carga mais baixas, com custos correspondentes mais baixos para carregar volumes de dados similares, o que a torna uma ferramenta poderosa para lidar com fluxos de dados em tempo real.
Este tópico descreve os conceitos para aplicativos personalizados do cliente que chamam o API. Para instruções relacionadas ao Conector Snowflake para Kafka (“Conector Kafka”), consulte Uso do conector Snowflake para Kafka com Snowpipe Streaming.
Neste tópico:
API Snowpipe Streaming vs Snowpipe¶
O API destina-se a complementar o Snowpipe, não substituí-lo. Use o Snowpipe Streaming API em cenários de streaming onde os dados são transmitidos por linhas (por exemplo, tópicos Apache Kafka) em vez de escritos em arquivos. O API se encaixa em um fluxo de trabalho de ingestão que inclui um aplicativo Java personalizado existente que produz ou recebe registros. O API elimina a necessidade de criar arquivos para carregar dados nas tabelas do Snowflake, e permite o carregamento automático e contínuo dos fluxos de dados no Snowflake à medida que os dados se tornam disponíveis.
A tabela a seguir descreve as diferenças entre Snowpipe Streaming e Snowpipe:
Categoria |
Snowpipe Streaming |
Snowpipe |
---|---|---|
Forma dos dados a serem carregados |
Linhas. |
Arquivos. Se seu pipeline de dados existente gerar arquivos em armazenamento de blobs, recomendamos o uso do Snowpipe em vez do API. |
Requisitos de software de terceiros |
Wrapper de código do aplicativo Java personalizado para Snowflake Ingest SDK. |
Nenhum. |
Ordenação de dados |
Inserções ordenadas dentro de cada canal. |
Sem suporte. O Snowpipe pode carregar dados de arquivos em uma ordem diferente dos carimbos de data/hora de criação de arquivos no armazenamento em nuvem. |
Histórico de carregamento |
Histórico de carregamento registrado na exibição SNOWPIPE_STREAMING_FILE_MIGRATION_HISTORY (Account Usage). |
Histórico de carregamento registrado na exibição LOAD_HISTORY (Account Usage) e função COPY_HISTORY (Information Schema). |
Objeto de canal |
Não requer um objeto de canal. O API escreve registros diretamente nas tabelas de destino. |
Exige um objeto de canal que enfileira e carrega dados do arquivo preparado nas tabelas de destino. |
Requisitos de software¶
SDK de Java¶
O serviço Snowpipe Streaming é atualmente implementado como um conjunto de APIs para o Snowflake Ingest SDK. O SDK está disponível para download no repositório central do Maven.
O SDK oferece suporte à versão Java 8 (ou superior) e requer Java Cryptography Extension (JCE) Unlimited Strength Jurisdiction Policy Files.
Importante
O SDK faz chamadas REST API para o Snowflake. Talvez seja necessário ajustar as regras de firewall de sua rede para permitir a conectividade.
Aplicativo de cliente personalizado¶
O API requer uma interface de aplicativo Java personalizada capaz de bombear linhas de dados e tratar os erros encontrados. Você é responsável por garantir que o aplicativo funcione continuamente e possa se recuperar de falhas. Para um determinado conjunto de linhas, o API oferece suporte ao equivalente a ON_ERROR = CONTINUE | ABORT
. ABORT
aborta o lote inteiro após o primeiro erro ser encontrado e é a configuração padrão, e CONTINUE
continua a carregar os dados se forem encontrados erros.
O aplicativo deve capturar os erros usando a resposta dos métodos insertRow
(linha única) ou insertRows
(conjunto de linhas).
Canais¶
O API ingere linhas através de um ou mais canais. Um canal representa uma conexão lógica nomeada de streaming para o Snowflake para carregar dados em uma tabela. Um único canal mapeia exatamente uma tabela no Snowflake; no entanto, vários canais podem apontar para a mesma tabela. O cliente SDK tem a capacidade de abrir vários canais para várias tabelas, porém o SDK não pode abrir canais através das contas. A ordenação das linhas e seus respectivos tokens offset são preservados dentro de um canal, mas não através de canais que apontam para a mesma tabela.
Você pode executar o comando SHOW CHANNELS para listar os canais para os quais você tem privilégios de acesso. Para obter mais informações, consulte SHOW CHANNELS.
Nota
Os canais inativos juntamente com tokens offset são excluídos automaticamente após 30 dias.
Tokens offset¶
Um token offset é uma cadeia de caracteres que um cliente pode incluir em solicitações de método insertRow
(linha única) ou insertRows
(conjunto de linhas) para acompanhar o progresso da ingestão por canal. O token é inicializado como NULL na criação do canal e é atualizado quando as linhas com um token de offset fornecido são confirmadas no Snowflake por meio de um processo assíncrono. Os clientes podem periodicamente fazer solicitações de método getLatestCommittedOffsetToken
para obter o último token offset comprometido para um determinado canal e usar isso para cogitar o progresso da ingestão. Note que este token não é usado pelo Snowflake para realizar a desduplicação; no entanto, os clientes podem usar este token para realizar a desduplicação usando seu código personalizado.
Quando um cliente reabre um canal, o último token offset persistente é devolvido. O cliente pode redefinir sua posição na fonte de dados usando o token para evitar o envio dos mesmos dados duas vezes. Observe que quando um evento de reabertura de canal ocorre, qualquer dado armazenado em Snowflake é descartado para evitar que seja comprometido.
Você pode usar o token de offset confirmado mais recente para realizar os seguintes casos de uso comuns:
Rastreamento do progresso da ingestão.
Verificar se um offset específico foi confirmado ou não, comparando-o com o último token de offset confirmado.
Avançar o offset de origem e limpar os dados que já foram confirmados.
Possibilitando a desduplicação e garantindo a entrega exata de dados uma única vez.
Por exemplo, o conector Kafka poderia ler um token offset do tópico como <partição>:<offset>
ou simplesmente <offset>
se a partição estiver codificada no nome do canal. Considere o seguinte cenário:
O conector Kafka fica online e abre um canal correspondente a
Partition 1
no tópico KafkaT
com o nome do canalT:P1
.O conector começa a ler registros da partição Kafka. O conector chama o API, fazendo uma solicitação de método
insertRows
com o offset associado ao registro como o token offset. Por exemplo, o token offset poderia ser10
, referindo-se ao décimo registro na partição Kafka.O conector faz periodicamente solicitações de método
getLatestCommittedOffsetToken
para determinar o progresso de ingestão.
Se o conector Kafka falhar, então o seguinte fluxo poderia ser completado para retomar a leitura dos registros do offset correto para a partição Kafka:
O conector Kafka volta a ficar online e reabre o canal usando o mesmo nome do anterior.
O conector chama o API, fazendo uma solicitação de método
getLatestCommittedOffsetToken
para obter o último offset comprometido para a partição. Por exemplo, suponha que o token offset persistente mais recente seja20
.O conector usa APIs de leitura Kafka para reiniciar um cursor correspondente ao offset mais 1:
21
neste exemplo.O conector retoma os registros de leitura. Nenhum dado duplicado é recuperado depois que o cursor de leitura é reposicionado com sucesso.
Como outro exemplo, suponha que você tenha um aplicativo que leia logs de um diretório e exporte esses logs para o Snowflake usando o Snowpipe Streaming Client SDK. Você poderia criar um aplicativo de exportação de logs que fizesse o seguinte:
Liste arquivos no diretório de logs. Suponha que a estrutura de registro gere arquivos de log que podem ser ordenados lexicograficamente e que novos arquivos de registro são posicionados no fim desta ordenação.
Lê um arquivo de log linha por linha e chama o método API, fazendo solicitações de método
insertRows
com um token offset correspondente ao nome do arquivo de log e à contagem de linha ou posição do byte. Por exemplo, um token offset poderia sermessages_1.log:20
, ondemessages_1.log
é o nome do arquivo de log e20
é o número da linha.
Se o aplicativo travar ou precisar ser reiniciado, ele então chamaria o API, fazendo uma solicitação de método getLatestCommittedOffsetToken
para recuperar um token offset correspondente ao último arquivo e linha de log exportado. Continuando com o exemplo, isto poderia ser messages_1.log:20
. O aplicativo abriria então messages_1.log
e buscaria a linha 21
para evitar que a mesma linha de registro fosse ingerida duas vezes.
Nota
É possível que as informações do token de offset sejam perdidas. O token de offset é vinculado a um objeto de canal, e um canal é automaticamente limpo se nenhuma nova ingestão for realizada usando o canal por um período de 30 dias. Para evitar a perda do token de offset, é recomendável manter um offset separado e redefinir o token de offset do canal, se necessário.
Práticas recomendadas de entrega exata uma vez¶
Conseguir uma entrega exatamente única pode ser um desafio, e a adesão aos seguintes princípios em seu código personalizado é fundamental:
Para garantir a recuperação adequada de exceções, falhas ou travamentos, você deve sempre reabrir o canal e reiniciar a ingestão usando o último token de offset confirmado.
Embora seu aplicativo possa manter seu próprio offset, é fundamental usar o último token de offset confirmado fornecido pelo Snowflake como a fonte da verdade e redefinir seu próprio offset de acordo.
A única instância em que seu próprio offset deve ser tratado como a fonte da verdade é quando o token de offset do Snowflake é definido ou redefinido como NULL. Um token de offset NULL geralmente significa uma das seguintes opções:
Esse é um novo canal, portanto, nenhum token de offset foi definido.
A tabela de destino é descartada e recriada, portanto, o canal é considerado novo.
Se não houver atividade de ingestão em um canal por 30 dias, o canal será automaticamente limpo, o que resultará na perda das informações do token de offset.
Se necessário, você pode limpar periodicamente os dados de origem que já foram confirmados com base no último token de offset confirmado e avançar seu próprio offset.
Para obter mais informações sobre como o conector Kafka com o Snowpipe Streaming consegue uma entrega exata, consulte Entrega exata.
Migração para arquivos otimizados¶
O API grava as linhas dos canais em blobs no armazenamento em nuvem, que são então enviados para a tabela de destino. Inicialmente, os dados gravados em uma tabela de destino são armazenados em um formato de arquivo intermediário temporário. Neste estágio, a tabela é considerada uma “tabela mista”, porque os dados particionados são armazenados em uma mistura de arquivos nativos e intermediários. Um processo automatizado em segundo plano migra os dados dos arquivos intermediários ativos para arquivos nativos otimizados para consultas e operações DML conforme a necessidade.
Replicação¶
Snowpipe Streaming oferece suporte à replicação e failover de tabelas do Snowflake preenchidas por Snowpipe Streaming e seus offsets de canal associados de uma conta de origem para uma conta de destino em diferentes regiões e em plataformas de nuvem.
Para obter mais informações, consulte Replicação e Snowpipe Streaming.
Operações somente de inserção¶
O API está atualmente limitado a inserir linhas. Para modificar, apagar ou combinar dados, escreva os registros “brutos” em uma ou mais tabelas de preparação. Mescle, junte ou transforme os dados usando pipeline de dados contínuos para inserir dados modificados nas tabelas de relatórios de destino.
Classes e interfaces¶
Para a documentação sobre as classes e interfaces, consulte API Snowflake Ingest SDK.
Tipos de dados Java suportados¶
A tabela a seguir resume quais tipos de dados Java são suportados para ingestão nas colunas Snowflake:
Tipo de coluna Snowflake |
Tipo de dados Java permitidos |
---|---|
|
|
|
|
|
|
|
|
|
Consulte detalhes da conversão booleana. |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Limitações¶
As tabelas com qualquer uma das seguintes configurações de coluna não são suportadas:
AUTOINCREMENT
ouIDENTITY
Valor padrão da coluna diferente de NULL.
Os tipos de dados GEOGRAPHY e GEOMETRY não são suportados.
As colunas com agrupamentos não são suportadas.
O Snowpipe Streaming só oferece suporte ao uso de chaves AES 256-bit para criptografia de dados.
Tabelas TRANSIENT ou TEMPORARY não são suportadas.