Snowpipe Streaming

Chamar o API Snowpipe Streaming (“API”) desencadeia cargas de linhas de dados de baixa latência usando o Snowflake Ingest SDK e seu próprio código de aplicativo gerenciado. O API de ingestão de streaming escreve linhas de dados nas tabelas Snowflake, ao contrário de cargas de dados em massa ou Snowpipe, que escrevem dados de arquivos preparados. Esta arquitetura resulta em latências de carga mais baixas, com custos correspondentes mais baixos para carregar volumes de dados similares, o que a torna uma ferramenta poderosa para lidar com fluxos de dados em tempo real.

Este tópico descreve os conceitos para aplicativos personalizados do cliente que chamam o API. Para instruções relacionadas ao Conector Snowflake para Kafka (“Conector Kafka”), consulte Uso do conector Snowflake para Kafka com Snowpipe Streaming.

Neste tópico:

API Snowpipe Streaming vs Snowpipe

O API destina-se a complementar o Snowpipe, não substituí-lo. Use o Snowpipe Streaming API em cenários de streaming onde os dados são transmitidos por linhas (por exemplo, tópicos Apache Kafka) em vez de escritos em arquivos. O API se encaixa em um fluxo de trabalho de ingestão que inclui um aplicativo Java personalizado existente que produz ou recebe registros. O API elimina a necessidade de criar arquivos para carregar dados nas tabelas do Snowflake, e permite o carregamento automático e contínuo dos fluxos de dados no Snowflake à medida que os dados se tornam disponíveis.

Snowpipe Streaming

A tabela a seguir descreve as diferenças entre Snowpipe Streaming e Snowpipe:

Categoria

Snowpipe Streaming

Snowpipe

Forma dos dados a serem carregados

Linhas.

Arquivos. Se seu pipeline de dados existente gerar arquivos em armazenamento de blobs, recomendamos o uso do Snowpipe em vez do API.

Requisitos de software de terceiros

Wrapper de código do aplicativo Java personalizado para Snowflake Ingest SDK.

Nenhum.

Ordenação de dados

Inserções ordenadas dentro de cada canal.

Sem suporte. O Snowpipe pode carregar dados de arquivos em uma ordem diferente dos carimbos de data/hora de criação de arquivos no armazenamento em nuvem.

Histórico de carregamento

Histórico de carregamento registrado na exibição SNOWPIPE_STREAMING_FILE_MIGRATION_HISTORY (Account Usage).

Histórico de carregamento registrado na exibição LOAD_HISTORY (Account Usage) e função COPY_HISTORY (Information Schema).

Objeto de canal

Não requer um objeto de canal. O API escreve registros diretamente nas tabelas de destino.

Exige um objeto de canal que enfileira e carrega dados do arquivo preparado nas tabelas de destino.

Requisitos de software

SDK de Java

O serviço Snowpipe Streaming é atualmente implementado como um conjunto de APIs para o Snowflake Ingest SDK. O SDK está disponível para download no repositório central do Maven: https://mvnrepository.com/artifact/net.snowflake/snowflake-ingest-sdk. O SDK oferece suporte à versão Java 8 (ou superior) e requer Java Cryptography Extension (JCE) Unlimited Strength Jurisdiction Policy Files.

Importante

O SDK faz chamadas REST API para o Snowflake. Talvez seja necessário ajustar as regras de firewall de sua rede para permitir a conectividade.

Aplicativo de cliente personalizado

O API requer uma interface de aplicativo Java personalizada capaz de bombear linhas de dados e tratar os erros encontrados. Você é responsável por garantir que o aplicativo funcione continuamente e possa se recuperar de falhas. Para um determinado conjunto de linhas, o API oferece suporte ao equivalente a ON_ERROR = CONTINUE | ABORT. ABORT aborta o lote inteiro após o primeiro erro ser encontrado e é a configuração padrão, e CONTINUE continua a carregar os dados se forem encontrados erros.

O aplicativo deve capturar os erros usando a resposta dos métodos insertRow (linha única) ou insertRows (conjunto de linhas).

Canais

O API ingere linhas através de um ou mais canais. Um canal representa uma conexão lógica nomeada de streaming para o Snowflake para carregar dados em uma tabela. Um único canal mapeia exatamente uma tabela no Snowflake; no entanto, vários canais podem apontar para a mesma tabela. O cliente SDK tem a capacidade de abrir vários canais para várias tabelas, porém o SDK não pode abrir canais através das contas. A ordenação das linhas e seus respectivos tokens offset são preservados dentro de um canal, mas não através de canais que apontam para a mesma tabela.

Snowpipe streaming client channel table mapping

Nota

Os canais inativos juntamente com tokens offset são excluídos automaticamente após 14 dias.

Tokens offset

Um token offset é uma cadeia de caracteres que um cliente pode incluir em solicitações de método insertRow (linha única) ou insertRows (conjunto de linhas) para acompanhar o progresso da ingestão por canal. Os clientes podem periodicamente fazer solicitações de método getLatestCommittedOffsetToken para obter o último token offset comprometido para um determinado canal e usar isso para cogitar o progresso da ingestão. Note que este token não é usado pelo Snowflake para realizar a desduplicação; no entanto, os clientes podem usar este token para realizar a desduplicação usando seu código personalizado.

Quando um cliente reabre um canal, o último token offset persistente é devolvido. O cliente pode redefinir sua posição na fonte de dados usando o token para evitar o envio dos mesmos dados duas vezes. Observe que quando um evento de reabertura de canal ocorre, qualquer dado armazenado em Snowflake é descartado para evitar que seja comprometido.

Por exemplo, o conector Kafka poderia ler um token offset do tópico como <partição>:<offset> ou simplesmente <offset> se a partição estiver codificada no nome do canal. Considere o seguinte cenário:

  1. O conector Kafka fica online e abre um canal correspondente a Partition 1 no tópico Kafka T com o nome do canal T:P1.

  2. O conector começa a ler registros da partição Kafka. O conector chama o API, fazendo uma solicitação de método insertRows com o offset associado ao registro como o token offset. Por exemplo, o token offset poderia ser 10, referindo-se ao décimo registro na partição Kafka.

  3. O conector faz periodicamente solicitações de método getLatestCommittedOffsetToken para determinar o progresso de ingestão.

Se o conector Kafka falhar, então o seguinte fluxo poderia ser completado para retomar a leitura dos registros do offset correto para a partição Kafka:

  1. O conector Kafka volta a ficar online e reabre o canal usando o mesmo nome do anterior.

  2. O conector chama o API, fazendo uma solicitação de método getLatestCommittedOffsetToken para obter o último offset comprometido para a partição. Por exemplo, suponha que o último token offset persistente seja 20.

  3. O conector usa APIs de leitura Kafka para reiniciar um cursor correspondente ao offset mais 1: 21 neste exemplo.

  4. O conector retoma os registros de leitura. Nenhum dado duplicado é recuperado depois que o cursor de leitura é reposicionado com sucesso.

Como outro exemplo, suponha que você tenha um aplicativo que leia logs de um diretório e exporte esses logs para o Snowflake usando o Snowpipe Streaming Client SDK. Você poderia criar um aplicativo de exportação de logs que fizesse o seguinte:

  1. Liste arquivos no diretório de logs. Suponha que a estrutura de registro gere arquivos de log que podem ser ordenados lexicograficamente e que novos arquivos de registro são posicionados no fim desta ordenação.

  2. Lê um arquivo de log linha por linha e chama o método API, fazendo solicitações de método insertRows com um token offset correspondente ao nome do arquivo de log e à contagem de linha ou posição do byte. Por exemplo, um token offset poderia ser messages_1.log:20, onde messages_1.log é o nome do arquivo de log e 20 é o número da linha.

Se o aplicativo travar ou precisar ser reiniciado, ele então chamaria o API, fazendo uma solicitação de método getLatestCommittedOffsetToken para recuperar um token offset correspondente ao último arquivo e linha de log exportado. Continuando com o exemplo, isto poderia ser messages_1.log:20. O aplicativo abriria então messages_1.log e buscaria a linha 21 para evitar que a mesma linha de registro fosse ingerida duas vezes.

Migração para arquivos otimizados

O API grava as linhas dos canais em blobs no armazenamento em nuvem, que são então enviados para a tabela de destino. Inicialmente, os dados gravados em uma tabela de destino são armazenados em um formato de arquivo intermediário temporário. Neste estágio, a tabela é considerada uma “tabela mista”, porque os dados particionados são armazenados em uma mistura de arquivos nativos e intermediários. Um processo automatizado em segundo plano migra os dados dos arquivos intermediários ativos para arquivos nativos otimizados para consultas e operações DML conforme a necessidade.

Operações somente de inserção

O API está atualmente limitado a inserir linhas. Para modificar, apagar ou combinar dados, escreva os registros “brutos” em uma ou mais tabelas de preparação. Mescle, junte ou transforme os dados usando pipeline de dados contínuos para inserir dados modificados nas tabelas de relatórios de destino.

Classes e interfaces

Para a documentação sobre as classes e interfaces, consulte API Snowflake Ingest SDK.

Tipos de dados Java suportados

A tabela a seguir resume quais tipos de dados Java são suportados para ingestão nas colunas Snowflake:

Tipo de coluna Snowflake

Tipo de dados Java permitidos

  • CHAR

  • VARCHAR

  • Cadeia de caracteres

  • tipos de dados primitivos (int, booleano, char,…)

  • BigInteger, BigDecimal

  • BINARY

  • byte[]

  • Cadeia de caracteres (codificação hexadecimal)

  • NUMBER

  • tipos numéricos (BigInteger, BigDecimal, byte, int, double,…)

  • Cadeia de caracteres

  • FLOAT

  • tipos numéricos (BigInteger, BigDecimal, byte, int, double,…)

  • Cadeia de caracteres

  • BOOLEAN

  • booleano

  • tipos numéricos (BigInteger, BigDecimal, byte, int, double,…)

  • Cadeia de caracteres

Consulte detalhes da conversão booleana.

  • TIME

  • java.time.LocalTime

  • java.time.OffsetTime

  • Cadeia de caracteres

    • Tempo inteiro armazenado

    • HH24:MI:SS.FFTZH:TZM (por exemplo, 20:57:01.123456789+07:00)

    • HH24:MI:SS.FF (por exemplo, 20:57:01.123456789)

    • HH24:MI:SS (por exemplo, 20:57:01)

    • HH24:MI (por exemplo, 20:57)

  • DATE

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.time.Instant

  • Cadeia de caracteres

    • Data de inteiro armazenado

    • YYYY-MM-DD (por exemplo, 2013-04-28)

    • YYYY-MM-DDTHH24:MI:SS.FFTZH:TZM (por exemplo, 2013-04-28T20:57:01.123456789+07:00)

    • YYYY-MM-DDTHH24:MI:SS.FF (por exemplo, 2013-04-28T20:57:01.123456)

    • YYYY-MM-DDTHH24:MI:SS (por exemplo, 2013-04-28T20:57:01)

    • YYYY-MM-DDTHH24:MI (por exemplo, 2013-04-28T20:57)

    • YYYY-MM-DDTHH24:MI:SSTZH:TZM (por exemplo, 2013-04-28T20:57:01-07:00)

    • YYYY-MM-DDTHH24:MITZH:TZM (por exemplo, 2013-04-28T20:57-07:00)

  • TIMESTAMP_NTZ

  • TIMESTAMP_LTZ

  • TIMESTAMP_TZ

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.time.Instant

  • Cadeia de caracteres

    • Carimbo de data/hora do inteiro armazenado

    • YYYY-MM-DD (por exemplo, 2013-04-28)

    • YYYY-MM-DDTHH24:MI:SS.FFTZH:TZM (por exemplo, 2013-04-28T20:57:01.123456789+07:00)

    • YYYY-MM-DDTHH24:MI:SS.FF (por exemplo, 2013-04-28T20:57:01.123456)

    • YYYY-MM-DDTHH24:MI:SS (por exemplo, 2013-04-28T20:57:01)

    • YYYY-MM-DDTHH24:MI (por exemplo, 2013-04-28T20:57)

    • YYYY-MM-DDTHH24:MI:SSTZH:TZM (por exemplo, 2013-04-28T20:57:01-07:00)

    • YYYY-MM-DDTHH24:MITZH:TZM (por exemplo, 2013-04-28T20:57-07:00)

  • VARIANT

  • ARRAY

  • Cadeia de caracteres (deve ser um JSON válido)

  • tipos de dados primitivos e suas matrizes

  • BigInteger, BigDecimal

  • java.time.LocalTime

  • java.time.OffsetTime

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.util.map<String, T>, onde T é um tipo VARIANT válido.

  • T[], onde T é um tipo VARIANT válido.

  • List<T>, onde T é um tipo VARIANT válido.

  • OBJECT

  • Cadeia de caracteres (deve ser um objeto JSON válido)

  • Map<String, T>, onde T é um tipo de variante válida

  • GEOGRAPHY

  • Sem suporte

  • GEOMETRY

  • Sem suporte

Limitações

  • As tabelas com qualquer uma das seguintes configurações de coluna não são suportadas:

    • AUTOINCREMENT ou IDENTITY

    • Valor padrão da coluna diferente de NULL.

  • Os tipos de dados GEOGRAPHY e GEOMETRY não são suportados.

  • As colunas com agrupamentos não são suportadas.

  • O Snowpipe Streaming só oferece suporte ao uso de chaves AES 256-bit para criptografia de dados.

  • Tabelas TRANSIENT ou TEMPORARY não são suportadas.