Snowpipe Streaming

O Snowpipe Streaming é o serviço Snowflake para carregamento contínuo e de baixa latência de dados de streaming diretamente no Snowflake. Ele permite a ingestão e análise de dados quase em tempo real, o que é crucial para insights oportunos e respostas operacionais imediatas. Grandes volumes de dados de diversas fontes de streaming são disponibilizados para consulta e análise em segundos.

Valor do Snowpipe Streaming

  • Disponibilidade de dados em tempo real: ingere os dados à medida que eles chegam, ao contrário dos métodos tradicionais de carregamento em lote, dando suporte a casos de uso como painéis ao vivo, análises em tempo real e detecção de fraudes.

  • Cargas de trabalho de streaming eficientes: utiliza os SDKs do Snowpipe Streaming para gravar linhas diretamente em tabelas, ignorando a necessidade de preparação de dados em armazenamento intermediário em nuvem. Essa abordagem direta reduz a latência e simplifica a arquitetura de ingestão.

  • Pipelines de dados simplificados: oferece uma abordagem simplificada para pipelines de dados contínuos de fontes como eventos de aplicativos, sensores IoT, fluxos de captura de dados de alterações (CDC) e filas de mensagens (por exemplo, Apache Kafka).

  • Sem servidor e escalonável: como uma oferta sem servidor, ele dimensiona automaticamente os recursos de computação com base na carga de ingestão.

  • Custo-benefício para streaming: o faturamento é otimizado para ingestão de streaming, oferecendo potencialmente soluções mais econômica para fluxos de dados de alto volume e baixa latência.

Com o Snowpipe Streaming, você pode criar aplicativos de dados em tempo real no Snowflake Data Cloud, para tomar decisões com base nos dados mais recentes disponíveis.

Implementações do Snowpipe Streaming

O Snowpipe Streaming oferece duas implementações distintas para atender a diversas necessidades de ingestão de dados e expectativas de desempenho: Snowpipe Streaming com arquitetura de alto desempenho e Snowpipe Streaming com arquitetura clássica:

  • Snowpipe Streaming com arquitetura de alto desempenho

    A Snowflake projetou essa implementação de última geração para aumentar significativamente a taxa de transferência, otimizar o desempenho do streaming e fornecer um modelo de custo previsível, preparando o estágio para recursos avançados de streaming de dados.

    Principais características:

    • SDK: utiliza o novo SDK snowpipe-streaming.

    • Preços: apresenta preços transparentes e baseados na taxa de transferência (créditos por GB não compactado).

    • Gerenciamento do fluxo de dados: utiliza o objeto PIPE para gerenciar o fluxo de dados e permitir transformações leves no momento da ingestão. Os canais são abertos para esse objeto PIPE.

    • Ingestão: oferece uma REST API para ingestão direta e leve de dados por meio do PIPE.

    • Validação de esquema: realizada no lado do servidor durante a ingestão em relação ao esquema definido no PIPE.

    • Desempenho: projetado para aumentar significativamente o rendimento e melhorar a eficiência das consultas nos dados ingeridos.

    Incentivamos você a explorar essa arquitetura avançada, especialmente para novos projetos de streaming.

  • Snowpipe Streaming com arquitetura clássica

    Essa é a implementação original e geralmente disponível, fornecendo uma solução confiável para pipelines de dados estabelecidos.

    Principais características:

    • SDK: utiliza o snowflake-ingest-sdk.

    • Gerenciamento do fluxo de dados: não utiliza o conceito de objeto PIPE para ingestão de streaming. Os canais são configurados e abertos diretamente nas tabelas de destino.

    • Preços: com base em uma combinação de recursos de computação sem servidor utilizados para ingestão e o número de conexões de clientes ativos.

Como escolher sua implementação

Considere suas necessidades imediatas e a estratégia de dados de longo prazo ao escolher uma implementação:

  • Novos projetos de streaming: recomendamos avaliar a arquitetura de alto desempenho do Snowpipe Streaming por seu design avançado, melhor desempenho, escalabilidade e previsibilidade de custos.

  • Requisitos de desempenho: a arquitetura de alto desempenho foi criada para maximizar a taxa de transferência e otimizar o desempenho em tempo real.

  • Preferência de preços: a arquitetura de alto desempenho oferece preços claros e baseados na taxa de transferência, enquanto a arquitetura clássica cobra com base no uso da computação sem servidor e nas conexões do cliente.

  • Configurações existentes: os aplicativos existentes que usam arquitetura clássica podem continuar operando. Para futuras expansões ou reformulações, considere migrar ou incorporar a arquitetura de alto desempenho.

  • Conjunto de recursos e gerenciamento: o objeto PIPE na arquitetura de alto desempenho introduz recursos aprimorados de gerenciamento e transformação que não estão presentes na arquitetura clássica.

Snowpipe Streaming versus Snowpipe

O Snowpipe Streaming foi criado para complementar o Snowpipe, não para substituí-lo. Use a Snowpipe Streaming API em cenários de streaming em que os dados são transmitidos com linhas (por exemplo, tópicos do Apache Kafka) em vez de gravados em arquivos. O API se encaixa em um fluxo de trabalho de ingestão que inclui um aplicativo Java personalizado existente que produz ou recebe registros. Com a API, você não precisa criar arquivos para carregar dados nas tabelas Snowflake e a API permite o carregamento automático e contínuo de fluxos de dados no Snowflake à medida que os dados ficam disponíveis.

Snowpipe Streaming

A tabela a seguir descreve as diferenças entre Snowpipe Streaming e Snowpipe:

Categoria

Snowpipe Streaming

Snowpipe

Forma dos dados a serem carregados

Linhas

Arquivos. Se seu pipeline de dados existente gerar arquivos em armazenamento de blobs, recomendamos o uso do Snowpipe em vez do API.

Requisitos de software de terceiros

Wrapper de código do aplicativo Java personalizado para Snowflake Ingest SDK

Nenhum

Ordenação de dados

Inserções ordenadas dentro de cada canal

Sem suporte. O Snowpipe pode carregar dados de arquivos em uma ordem diferente dos carimbos de data/hora de criação de arquivos no armazenamento em nuvem.

Histórico de carregamento

Histórico de carregamento registrado na exibição SNOWPIPE_STREAMING_FILE_MIGRATION_HISTORY (Account Usage)

Histórico de carga registrado em COPY_HISTORY (Account Usage) e na função COPY_HISTORY (Information Schema)

Objeto de canal

A arquitetura clássica não requer um objeto canal: a API grava registros diretamente nas tabelas de destino. A arquitetura de alto desempenho requer um objeto de canal.

Exige um objeto de canal que enfileira e carrega dados do arquivo preparado nas tabelas de destino.

Canais

A API ingere linhas por meio de um ou mais canais. Um canal representa uma conexão lógica nomeada de streaming para o Snowflake para carregar dados em uma tabela de maneira ordenada. A ordenação das linhas e seus respectivos tokens offset são preservados dentro de um canal, mas não através de canais que apontam para a mesma tabela.

Na arquitetura clássica, um único canal é mapeado para exatamente uma tabela no Snowflake; embora vários canais possam apontar para a mesma tabela. O SDK cliente consegue abrir vários canais para várias tabelas, porém o SDK não pode abrir canais através das contas. Os canais devem ter longa duração quando um cliente está inserindo dados ativamente e devem ser reutilizados após reinicializações do processo do cliente, pois as informações do token de offset são mantidas. Os dados dentro do canal são liberados automaticamente a cada 1 segundo por padrão e o canal não precisa ser fechado. Para obter mais informações, consulte Recomendações de latência.

Mapeamento de tabela de canal do cliente do Snowpipe Streaming

Você pode descartar canais permanentemente usando a DropChannelRequest API quando não precisar mais do canal e dos metadados de deslocamento associados. Existem duas maneiras de descartar um canal:

  • Descartar um canal no fechamento. Os dados dentro do canal são automaticamente liberados antes que o canal seja descartado.

  • Descartar um canal às cegas. Não recomendamos isso porque descartar um canal remove cegamente todos os dados pendentes.

Você pode executar o comando SHOW CHANNELS para listar os canais para os quais você tem privilégios de acesso. Para obter mais informações, consulte SHOW CHANNELS.

Nota

Os canais inativos juntamente com tokens de offset são excluídos automaticamente após 30 dias de inatividade.

Tokens offset

Um token de deslocamento é uma cadeia de caracteres que um cliente pode incluir em suas solicitações de método de envio de linha (por exemplo, para linhas únicas ou múltiplas) para rastrear o progresso da ingestão por canal. Os métodos específicos usados são insertRow ou insertRows para a arquitetura clássica e appendRow ou appendRows para a arquitetura de alto desempenho. O token é inicializado como NULL na criação do canal e é atualizado quando as linhas com um token de offset fornecido são confirmadas no Snowflake por meio de um processo assíncrono. Os clientes podem fazer periodicamente solicitações do método getLatestCommittedOffsetToken para obter o token de deslocamento confirmado mais recente para um canal específico e usá-lo para raciocinar sobre o progresso da ingestão. Observe que esse token não é usado pelo Snowflake para realizar a desduplicação; no entanto, os clientes podem usar esse token para realizar a desduplicação usando seu código personalizado.

Quando um cliente reabre um canal, o último token offset persistente é devolvido. O cliente pode redefinir sua posição na fonte de dados ao usar o token para evitar o envio dos mesmos dados duas vezes. Observe que, quando ocorre um evento de reabertura de canal, quaisquer dados não confirmados armazenados em buffer no Snowflake são descartados para evitar sua confirmação.

Você pode usar o token de offset confirmado mais recente para realizar os seguintes casos de uso comuns:

  • Rastrear o progresso da ingestão

  • Verificar se um offset específico foi confirmado comparando-o com o último token de offset confirmado

  • Avançar o offset de origem e limpar os dados que já foram confirmados

  • Habilitar a desduplicação e garantir a entrega exata de dados uma única vez.

Por exemplo, o conector Kafka poderia ler um token offset de um tópico como <partição>:<offset> ou simplesmente <offset> se a partição estiver codificada no nome do canal. Considere o seguinte cenário:

  1. O conector Kafka fica online e abre um canal correspondente a Partition 1 no tópico Kafka T com o nome do canal T:P1.

  2. O conector começa a ler registros da partição Kafka.

  3. O conector chama o API, fazendo uma solicitação de método insertRows com o offset associado ao registro como o token offset.

    Por exemplo, o token offset poderia ser 10, referindo-se ao décimo registro na partição Kafka.

  4. O conector faz periodicamente solicitações de método getLatestCommittedOffsetToken para determinar o progresso de ingestão.

Se o conector Kafka falhar, então o seguinte procedimento poderia ser completado para retomar a leitura dos registros do offset correto para a partição Kafka:

  1. O conector Kafka volta a ficar online e reabre o canal usando o mesmo nome do anterior.

  2. O conector chama o API, fazendo uma solicitação de método getLatestCommittedOffsetToken para obter o último offset comprometido para a partição.

    Por exemplo, suponha que o token offset persistente mais recente seja 20.

  3. O conector usa APIs de leitura Kafka para reiniciar um cursor correspondente ao offset mais 1 (21 neste exemplo).

  4. O conector retoma os registros de leitura. Nenhum dado duplicado é recuperado depois que o cursor de leitura é reposicionado com sucesso.

Em outro exemplo, um aplicativo lê logs de um diretório e usa o Snowpipe Streaming Client SDK para exportar esses logs para o Snowflake. Você poderia criar um aplicativo de exportação de logs que fizesse o seguinte:

  1. Liste arquivos no diretório de logs.

    Suponha que a estrutura de registro gere arquivos de log que podem ser ordenados lexicograficamente e que novos arquivos de registro são posicionados no fim desta ordenação.

  2. Lê um arquivo de log linha por linha e chama o método API, fazendo solicitações de método insertRows com um token offset correspondente ao nome do arquivo de log e à contagem de linha ou posição do byte.

    Por exemplo, um token offset poderia ser messages_1.log:20, onde messages_1.log é o nome do arquivo de log e 20 é o número da linha.

Se o aplicativo travar ou precisar ser reiniciado, ele então chamaria o API, fazendo uma solicitação de método getLatestCommittedOffsetToken para recuperar um token offset que corresponde ao último arquivo e linha de log exportado. Continuando com o exemplo, isto poderia ser messages_1.log:20. O aplicativo abriria então messages_1.log e buscaria a linha 21 para evitar que a mesma linha de registro fosse ingerida duas vezes.

Nota

As informações do token de offset podem ficar perdidas. O token de offset é vinculado a um objeto de canal, e um canal é automaticamente limpo se nenhuma nova ingestão for realizada usando o canal por um período de 30 dias. Para evitar a perda do token offset, considere manter um offset separado e redefinir o token offset do canal, se necessário.

Funções offsetToken e continuationToken

offsetToken e continuationToken são usados para garantir exatamente uma entrega de dados, mas têm finalidades diferentes e são gerenciados por subsistemas distintos. A principal diferença é quem controla o valor do token e o escopo de uso.

  • continuationToken (só se aplica à arquitetura de alto desempenho e só é usado por usuários diretos da API REST):

    Esse token é gerenciado pelo Snowflake e é essencial para manter o estado de uma única sessão de streaming contínua. Quando um cliente envia dados usando a API Append Rows, o Snowflake retorna um continuationToken. O cliente deve devolver esse token em sua próxima solicitação AppendRows para garantir que os dados sejam recebidos pelo Snowflake na ordem correta e sem lacunas. O Snowflake usa o token para detectar e evitar dados duplicados ou ausentes no caso de uma nova tentativa do SDK.

  • offsetToken (aplica-se tanto à arquitetura clássica quanto à de alto desempenho):

    Esse token é um identificador definido pelo usuário que permite uma entrega exata a partir de uma fonte externa. O Snowflake não usa esse token para suas próprias operações internas ou para impedir a reingestão. Em vez disso, o Snowflake simplesmente armazena esse valor. É responsabilidade do sistema externo (como um conector Kafka) ler o offsetToken do Snowflake e usá-lo para rastrear seu próprio progresso de ingestão, evitando o envio de dados duplicados caso o fluxo externo precise ser repetido.

Operações somente de inserção

O API está atualmente limitado a inserir linhas. Para modificar, apagar ou combinar dados, escreva os registros “brutos” em uma ou mais tabelas de preparação. Mescle, junte ou transforme os dados ao usar o pipeline de dados contínuos para inserir dados modificados nas tabelas de relatórios de destino.

Tipos de dados Java suportados

A tabela a seguir resume quais tipos de dados Java são suportados para ingestão nas colunas Snowflake:

Tipo de coluna Snowflake

Tipo de dados Java permitidos

  • CHAR

  • VARCHAR

  • Cadeia de caracteres

  • tipos de dados primitivos (int, booliano, char,…)

  • BigInteger, BigDecimal

  • BINARY

  • byte[]

  • Cadeia de caracteres (codificação hexadecimal)

  • NUMBER

  • tipos numéricos (BigInteger, BigDecimal, byte, int, double,…)

  • Cadeia de caracteres

  • FLOAT

  • tipos numéricos (BigInteger, BigDecimal, byte, int, double,…)

  • Cadeia de caracteres

  • BOOLEAN

  • booleano

  • tipos numéricos (BigInteger, BigDecimal, byte, int, double,…)

  • Cadeia de caracteres

Consulte detalhes da conversão booliana.

  • TIME

  • java.time.LocalTime

  • java.time.OffsetTime

  • Cadeia de caracteres

    • Tempo inteiro armazenado

    • HH24:MI:SS.FFTZH:TZM (por exemplo, 20:57:01.123456789+07:00)

    • HH24:MI:SS.FF (por exemplo, 20:57:01.123456789)

    • HH24:MI:SS (por exemplo, 20:57:01)

    • HH24:MI (por exemplo, 20:57)

  • DATE

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.time.Instant

  • Cadeia de caracteres

    • Data de inteiro armazenado

    • YYYY-MM-DD (por exemplo, 2013-04-28)

    • YYYY-MM-DDTHH24:MI:SS.FFTZH:TZM (por exemplo, 2013-04-28T20:57:01.123456789+07:00)

    • YYYY-MM-DDTHH24:MI:SS.FF (por exemplo, 2013-04-28T20:57:01.123456)

    • YYYY-MM-DDTHH24:MI:SS (por exemplo, 2013-04-28T20:57:01)

    • YYYY-MM-DDTHH24:MI (por exemplo, 2013-04-28T20:57)

    • YYYY-MM-DDTHH24:MI:SSTZH:TZM (por exemplo, 2013-04-28T20:57:01-07:00)

    • YYYY-MM-DDTHH24:MITZH:TZM (por exemplo, 2013-04-28T20:57-07:00)

  • TIMESTAMP_NTZ

  • TIMESTAMP_LTZ

  • TIMESTAMP_TZ

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.time.Instant

  • Cadeia de caracteres

    • Carimbo de data/hora do inteiro armazenado

    • YYYY-MM-DD (por exemplo, 2013-04-28)

    • YYYY-MM-DDTHH24:MI:SS.FFTZH:TZM (por exemplo, 2013-04-28T20:57:01.123456789+07:00)

    • YYYY-MM-DDTHH24:MI:SS.FF (por exemplo, 2013-04-28T20:57:01.123456)

    • YYYY-MM-DDTHH24:MI:SS (por exemplo, 2013-04-28T20:57:01)

    • YYYY-MM-DDTHH24:MI (por exemplo, 2013-04-28T20:57)

    • YYYY-MM-DDTHH24:MI:SSTZH:TZM (por exemplo, 2013-04-28T20:57:01-07:00)

    • YYYY-MM-DDTHH24:MITZH:TZM (por exemplo, 2013-04-28T20:57-07:00)

  • VARIANT

  • ARRAY

  • Cadeia de caracteres (deve ser um JSON válido)

  • tipos de dados primitivos e suas matrizes

  • BigInteger, BigDecimal

  • java.time.LocalTime

  • java.time.OffsetTime

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.util.map<String, T>, onde T é um tipo VARIANT válido.

  • T[], onde T é um tipo VARIANT válido.

  • List<T>, onde T é um tipo VARIANT válido.

  • OBJECT

  • Cadeia de caracteres (deve ser um objeto JSON válido)

  • Map<String, T>, onde T é um tipo de variante válida

  • GEOGRAPHY

  • Não é compatível com a arquitetura clássica, mas é compatível com a arquitetura de alto desempenho.

  • GEOMETRY

  • Não é compatível com a arquitetura clássica, mas é compatível com a arquitetura de alto desempenho.

Privilégios de acesso obrigatórios

Chamar a Snowpipe Streaming API requer uma função com os seguintes privilégios:

Objeto

Privilégio

Tabela

OWNERSHIP ou um mínimo de INSERT e EVOLVE SCHEMA (obrigatório apenas ao usar a evolução do esquema para o conector Kafka com Snowpipe Streaming)

Banco de dados

USAGE

Esquema

USAGE

Canal

OPERATE (obrigatório apenas para arquitetura de alto desempenho)