Snowpipe Streaming

Chamar o API Snowpipe Streaming (“API”) desencadeia o carregamento de linhas de dados de baixa latência usando o Snowflake Ingest SDK e seu próprio código de aplicativo gerenciado. O API de ingestão de streaming escreve linhas de dados nas tabelas Snowflake, ao contrário de cargas de dados em massa ou Snowpipe, que escrevem dados de arquivos preparados. Esta arquitetura resulta em latências de carga mais baixas, com custos correspondentes mais baixos para carregar volumes de dados similares, o que a torna uma ferramenta poderosa para lidar com fluxos de dados em tempo real.

Este tópico descreve os conceitos para aplicativos personalizados do cliente que chamam o API. Para instruções relacionadas ao Conector Snowflake para Kafka (“Conector Kafka”), consulte Uso do conector Snowflake para Kafka com Snowpipe Streaming.

Neste tópico:

API Snowpipe Streaming versus Snowpipe

O API destina-se a complementar o Snowpipe, não substituí-lo. Use o API Snowpipe Streaming em cenários de streaming onde os dados são transmitidos por linhas (por exemplo, tópicos Apache Kafka) em vez de gravados em arquivos. O API se encaixa em um fluxo de trabalho de ingestão que inclui um aplicativo Java personalizado existente que produz ou recebe registros. O API elimina a necessidade de criar arquivos para carregar dados nas tabelas do Snowflake, e permite o carregamento automático e contínuo dos fluxos de dados no Snowflake à medida que os dados se tornam disponíveis.

Snowpipe Streaming

A tabela a seguir descreve as diferenças entre Snowpipe Streaming e Snowpipe:

Categoria

Snowpipe Streaming

Snowpipe

Forma dos dados a serem carregados

Linhas

Arquivos. Se seu pipeline de dados existente gerar arquivos em armazenamento de blobs, recomendamos o uso do Snowpipe em vez do API.

Requisitos de software de terceiros

Wrapper de código do aplicativo Java personalizado para Snowflake Ingest SDK

Nenhum

Ordenação de dados

Inserções ordenadas dentro de cada canal

Sem suporte. O Snowpipe pode carregar dados de arquivos em uma ordem diferente dos carimbos de data/hora de criação de arquivos no armazenamento em nuvem.

Histórico de carregamento

Histórico de carregamento registrado na exibição SNOWPIPE_STREAMING_FILE_MIGRATION_HISTORY (Account Usage)

Histórico de carregamento registrado na exibição LOAD_HISTORY (Account Usage) e função COPY_HISTORY (Information Schema)

Objeto de canal

Não requer um objeto de canal: o API escreve registros diretamente nas tabelas de destino.

Exige um objeto de canal que enfileira e carrega dados do arquivo preparado nas tabelas de destino.

Requisitos de software

SDK de Java

O serviço Snowpipe Streaming é atualmente implementado como um conjunto de APIs para o Snowflake Ingest SDK. O SDK está disponível para download no repositório central do Maven. A Snowflake recomenda usar o Snowflake Ingest SDK versão 2.0.2 ou posterior.

O SDK oferece suporte à versão Java 8 ou superior e requer Java Cryptography Extension (JCE) Unlimited Strength Jurisdiction Policy Files.

Importante

O SDK faz chamadas REST API para o Snowflake. Talvez seja necessário ajustar as regras de firewall de sua rede para permitir a conectividade.

Aplicativo de cliente personalizado

O API requer uma interface de aplicativo Java personalizada capaz de bombear linhas de dados e tratar os erros encontrados. Você é responsável por garantir que o aplicativo funcione continuamente e possa se recuperar de falhas. Para um determinado lote de linhas, a API oferece suporte ao equivalente a ON_ERROR = CONTINUE | SKIP_BATCH | ABORT.

  • CONTINUE: continuar a carregar as linhas de dados aceitáveis e retornar todos os erros.

  • SKIP_BATCH: ignorar o carregamento e retornar todos os erros se algum erro for encontrado em todo o lote de linhas.

  • ABORT (configuração padrão): anular todo o lote de linhas e lançar uma exceção quando o primeiro erro é encontrado.

O aplicativo deve capturar os erros usando a resposta dos métodos insertRow (linha única) ou insertRows (conjunto de linhas).

Canais

O API ingere linhas por meio de um ou mais canais. Um canal representa uma conexão lógica nomeada de streaming para o Snowflake para carregar dados em uma tabela. Um único canal mapeia exatamente uma tabela no Snowflake; no entanto, vários canais podem apontar para a mesma tabela. O cliente SDK consegue abrir vários canais para várias tabelas, porém o SDK não pode abrir canais através das contas. A ordenação das linhas e seus respectivos tokens offset são preservados dentro de um canal, mas não através de canais que apontam para a mesma tabela.

Os canais devem ter vida longa quando um cliente está inserindo dados ativamente e devem ser reutilizados à medida que as informações do token offset são retidas. Os dados dentro do canal são liberados automaticamente a cada 1 segundo por padrão e não precisam ser fechados. Para obter mais informações, consulte Latência.

Mapeamento de tabela de canal do cliente do Snowpipe Streaming

Você pode executar o comando SHOW CHANNELS para listar os canais para os quais você tem privilégios de acesso. Para obter mais informações, consulte SHOW CHANNELS.

Nota

Os canais inativos juntamente com tokens offset são excluídos automaticamente após 30 dias.

Tokens offset

Um token offset é uma cadeia de caracteres que um cliente pode incluir em solicitações de método insertRow (linha única) ou insertRows (conjunto de linhas) para acompanhar o progresso da ingestão por canal. O token é inicializado como NULL na criação do canal e é atualizado quando as linhas com um token de offset fornecido são confirmadas no Snowflake por meio de um processo assíncrono. Os clientes podem periodicamente fazer solicitações de método getLatestCommittedOffsetToken para obter o último token offset comprometido para um determinado canal e usar isso para cogitar o progresso da ingestão. Note que este token não é usado pelo Snowflake para realizar a desduplicação; no entanto, os clientes podem usar este token para realizar a desduplicação usando seu código personalizado.

Quando um cliente reabre um canal, o último token offset persistente é devolvido. O cliente pode redefinir sua posição na fonte de dados ao usar o token para evitar o envio dos mesmos dados duas vezes. Observe que quando um evento de reabertura de canal ocorre, qualquer dado armazenado em Snowflake é descartado para evitar que seja comprometido.

Você pode usar o token de offset confirmado mais recente para realizar os seguintes casos de uso comuns:

  • Rastrear o progresso da ingestão

  • Verificar se um offset específico foi confirmado comparando-o com o último token de offset confirmado

  • Avançar o offset de origem e limpar os dados que já foram confirmados

  • Habilitar a desduplicação e garantir a entrega exata de dados uma única vez.

Por exemplo, o conector Kafka poderia ler um token offset de um tópico como <partição>:<offset> ou simplesmente <offset> se a partição estiver codificada no nome do canal. Considere o seguinte cenário:

  1. O conector Kafka fica online e abre um canal correspondente a Partition 1 no tópico Kafka T com o nome do canal T:P1.

  2. O conector começa a ler registros da partição Kafka.

  3. O conector chama o API, fazendo uma solicitação de método insertRows com o offset associado ao registro como o token offset.

    Por exemplo, o token offset poderia ser 10, referindo-se ao décimo registro na partição Kafka.

  4. O conector faz periodicamente solicitações de método getLatestCommittedOffsetToken para determinar o progresso de ingestão.

Se o conector Kafka falhar, então o seguinte procedimento poderia ser completado para retomar a leitura dos registros do offset correto para a partição Kafka:

  1. O conector Kafka volta a ficar online e reabre o canal usando o mesmo nome do anterior.

  2. O conector chama o API, fazendo uma solicitação de método getLatestCommittedOffsetToken para obter o último offset comprometido para a partição.

    Por exemplo, suponha que o token offset persistente mais recente seja 20.

  3. O conector usa APIs de leitura Kafka para reiniciar um cursor correspondente ao offset mais 1 (21 neste exemplo).

  4. O conector retoma os registros de leitura. Nenhum dado duplicado é recuperado depois que o cursor de leitura é reposicionado com sucesso.

Em outro exemplo, um aplicativo lê logs de um diretório e usa o Snowpipe Streaming Client SDK para exportar esses logs para o Snowflake. Você poderia criar um aplicativo de exportação de logs que fizesse o seguinte:

  1. Liste arquivos no diretório de logs.

    Suponha que a estrutura de registro gere arquivos de log que podem ser ordenados lexicograficamente e que novos arquivos de registro são posicionados no fim desta ordenação.

  2. Lê um arquivo de log linha por linha e chama o método API, fazendo solicitações de método insertRows com um token offset correspondente ao nome do arquivo de log e à contagem de linha ou posição do byte.

    Por exemplo, um token offset poderia ser messages_1.log:20, onde messages_1.log é o nome do arquivo de log e 20 é o número da linha.

Se o aplicativo travar ou precisar ser reiniciado, ele então chamaria o API, fazendo uma solicitação de método getLatestCommittedOffsetToken para recuperar um token offset que corresponde ao último arquivo e linha de log exportado. Continuando com o exemplo, isto poderia ser messages_1.log:20. O aplicativo abriria então messages_1.log e buscaria a linha 21 para evitar que a mesma linha de registro fosse ingerida duas vezes.

Nota

As informações do token offset podem ser perdidas. O token de offset é vinculado a um objeto de canal, e um canal é automaticamente limpo se nenhuma nova ingestão for realizada usando o canal por um período de 30 dias. Para evitar a perda do token offset, considere manter um offset separado e redefinir o token offset do canal, se necessário.

Práticas recomendadas de entrega exatamente um

Conseguir uma entrega exatamente única pode ser um desafio, e a adesão aos seguintes princípios em seu código personalizado é fundamental:

  • Para garantir a recuperação adequada de exceções, falhas ou travamentos, você deve sempre reabrir o canal e reiniciar a ingestão usando o último token de offset confirmado.

  • Embora seu aplicativo possa manter seu próprio offset, é fundamental usar o último token de offset confirmado fornecido pelo Snowflake como a fonte da verdade e redefinir seu próprio offset de acordo.

  • A única instância em que seu próprio offset deve ser tratado como a fonte da verdade é quando o token de offset do Snowflake é definido ou redefinido como NULL. Um token de offset NULL geralmente significa uma das seguintes opções:

    • Esse é um novo canal, portanto, nenhum token de offset foi definido.

    • A tabela de destino foi descartada e recriada, portanto, o canal é considerado novo.

    • Não houve nenhuma atividade de ingestão no canal por 30 dias, então o canal foi limpo automaticamente e as informações do token offset foram perdidas.

  • Se necessário, você pode limpar periodicamente os dados de origem que já foram confirmados com base no último token offset confirmado e avançar seu próprio offset.

Para obter mais informações sobre como o conector Kafka com o Snowpipe Streaming consegue uma entrega exata, consulte Semântica exatamente um.

Latência

O Snowpipe Streaming libera automaticamente os dados nos canais a cada segundo. Você não precisa fechar o canal para que os dados sejam liberados.

Com o Snowflake Ingest SDK versões 2.0.4 e posteriores, você pode configurar a latência usando a opção max_client_lag. A opção padrão é 1 segundo. A latência máxima pode ser configurada em até 10 minutos. Para obter mais informações, consulte MAX_CLIENT_LAG.

Observe que o conector Kafka para Snowpipe Streaming possui seu próprio buffer. Depois que o tempo de liberação do buffer Kafka for atingido, os dados serão enviados com um segundo de latência para o Snowflake por meio do Snowpipe Streaming. Para obter mais informações, consulte buffer.flush.time.

Migração para arquivos otimizados

O API grava as linhas dos canais em blobs no armazenamento em nuvem, que são então enviados para a tabela de destino. Inicialmente, os dados gravados em uma tabela de destino são armazenados em um formato de arquivo intermediário temporário. Neste estágio, a tabela é considerada uma “tabela mista”, porque os dados particionados são armazenados em uma mistura de arquivos nativos e intermediários. Um processo automatizado em segundo plano migra os dados dos arquivos intermediários ativos para arquivos nativos otimizados para consultas e operações DML conforme a necessidade.

Replicação

Snowpipe Streaming oferece suporte à replicação e failover de tabelas do Snowflake preenchidas por Snowpipe Streaming e seus offsets de canal associados de uma conta de origem para uma conta de destino em diferentes regiões e em plataformas de nuvem.

Para obter mais informações, consulte Replicação e Snowpipe Streaming.

Operações somente de inserção

O API está atualmente limitado a inserir linhas. Para modificar, apagar ou combinar dados, escreva os registros “brutos” em uma ou mais tabelas de preparação. Mescle, junte ou transforme os dados ao usar o pipeline de dados contínuos para inserir dados modificados nas tabelas de relatórios de destino.

Classes e interfaces

Para a documentação sobre as classes e interfaces, consulte API Snowflake Ingest SDK.

Tipos de dados Java suportados

A tabela a seguir resume quais tipos de dados Java são suportados para ingestão nas colunas Snowflake:

Tipo de coluna Snowflake

Tipo de dados Java permitidos

  • CHAR

  • VARCHAR

  • Cadeia de caracteres

  • tipos de dados primitivos (int, booleano, char,…)

  • BigInteger, BigDecimal

  • BINARY

  • byte[]

  • Cadeia de caracteres (codificação hexadecimal)

  • NUMBER

  • tipos numéricos (BigInteger, BigDecimal, byte, int, double,…)

  • Cadeia de caracteres

  • FLOAT

  • tipos numéricos (BigInteger, BigDecimal, byte, int, double,…)

  • Cadeia de caracteres

  • BOOLEAN

  • booleano

  • tipos numéricos (BigInteger, BigDecimal, byte, int, double,…)

  • Cadeia de caracteres

Consulte detalhes da conversão booleana.

  • TIME

  • java.time.LocalTime

  • java.time.OffsetTime

  • Cadeia de caracteres

    • Tempo inteiro armazenado

    • HH24:MI:SS.FFTZH:TZM (por exemplo, 20:57:01.123456789+07:00)

    • HH24:MI:SS.FF (por exemplo, 20:57:01.123456789)

    • HH24:MI:SS (por exemplo, 20:57:01)

    • HH24:MI (por exemplo, 20:57)

  • DATE

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.time.Instant

  • Cadeia de caracteres

    • Data de inteiro armazenado

    • YYYY-MM-DD (por exemplo, 2013-04-28)

    • YYYY-MM-DDTHH24:MI:SS.FFTZH:TZM (por exemplo, 2013-04-28T20:57:01.123456789+07:00)

    • YYYY-MM-DDTHH24:MI:SS.FF (por exemplo, 2013-04-28T20:57:01.123456)

    • YYYY-MM-DDTHH24:MI:SS (por exemplo, 2013-04-28T20:57:01)

    • YYYY-MM-DDTHH24:MI (por exemplo, 2013-04-28T20:57)

    • YYYY-MM-DDTHH24:MI:SSTZH:TZM (por exemplo, 2013-04-28T20:57:01-07:00)

    • YYYY-MM-DDTHH24:MITZH:TZM (por exemplo, 2013-04-28T20:57-07:00)

  • TIMESTAMP_NTZ

  • TIMESTAMP_LTZ

  • TIMESTAMP_TZ

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.time.Instant

  • Cadeia de caracteres

    • Carimbo de data/hora do inteiro armazenado

    • YYYY-MM-DD (por exemplo, 2013-04-28)

    • YYYY-MM-DDTHH24:MI:SS.FFTZH:TZM (por exemplo, 2013-04-28T20:57:01.123456789+07:00)

    • YYYY-MM-DDTHH24:MI:SS.FF (por exemplo, 2013-04-28T20:57:01.123456)

    • YYYY-MM-DDTHH24:MI:SS (por exemplo, 2013-04-28T20:57:01)

    • YYYY-MM-DDTHH24:MI (por exemplo, 2013-04-28T20:57)

    • YYYY-MM-DDTHH24:MI:SSTZH:TZM (por exemplo, 2013-04-28T20:57:01-07:00)

    • YYYY-MM-DDTHH24:MITZH:TZM (por exemplo, 2013-04-28T20:57-07:00)

  • VARIANT

  • ARRAY

  • Cadeia de caracteres (deve ser um JSON válido)

  • tipos de dados primitivos e suas matrizes

  • BigInteger, BigDecimal

  • java.time.LocalTime

  • java.time.OffsetTime

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.util.map<String, T>, onde T é um tipo VARIANT válido.

  • T[], onde T é um tipo VARIANT válido.

  • List<T>, onde T é um tipo VARIANT válido.

  • OBJECT

  • Cadeia de caracteres (deve ser um objeto JSON válido)

  • Map<String, T>, onde T é um tipo de variante válida

  • GEOGRAPHY

  • Sem suporte

  • GEOMETRY

  • Sem suporte

Privilégios de acesso obrigatórios

Chamar a Snowpipe Streaming API requer uma função com os seguintes privilégios:

Objeto

Privilégio

Tabela

OWNERSHIP ou um mínimo de INSERT e EVOLVE SCHEMA (obrigatório apenas ao usar a evolução do esquema para o conector Kafka com Snowpipe Streaming)

Banco de dados

USAGE

Esquema

USAGE

Limitações

O Snowpipe Streaming só oferece suporte ao uso de chaves AES 256-bit para criptografia de dados.

Os seguintes objetos ou tipos não são suportados:

  • Os tipos de dados GEOGRAPHY e GEOMETRY

  • Colunas com agrupamentos

  • Tabelas TRANSIENT ou TEMPORARY

  • Tabelas com qualquer uma das seguintes configurações de coluna:

    • AUTOINCREMENT ou IDENTITY

    • Valor padrão da coluna diferente de NULL