Uso do conector Snowflake para Kafka com Snowpipe Streaming

Opcionalmente, substitua o Snowpipe por Snowpipe Streaming em sua cadeia de carregamento de dados de Kafka. Quando o limite de buffer de liberação especificado (tempo ou memória ou número de mensagens) é atingido, o conector chama o API Snowpipe Streaming (“API”) para escrever linhas de dados nas tabelas Snowflake, ao contrário do Snowpipe, que grava dados de arquivos temporários preparados. Esta arquitetura resulta em latências de carga mais baixas, com custos correspondentes mais baixos para carregar volumes de dados similares.

A versão 1.9.1 (ou superior) do conector Kafka é necessária para uso com o Snowpipe Streaming. O conector Kafka com Snowpipe Streaming inclui o Snowflake Ingest SDK e oferece suporte a linhas de streaming de tópicos Apache Kafka diretamente nas tabelas de destino.

Snowpipe Streaming with Kafka connector

Nota

O conector Kafka com Snowpipe Streaming atualmente não oferece suporte à detecção de esquemas ou a evolução de esquemas. Ele usa o mesmo esquema de tabelas que o usado com o Snowpipe.

Neste tópico:

Versão mínima exigida

A versão do conector Kafka 1.9.1 (ou superior) oferece suporte ao Snowpipe Streaming.

Propriedades de configuração do Kafka

Salve suas configurações de conexão no arquivo de propriedades do conector Kafka. Para obter mais informações, consulte Configuração do conector Kafka.

Propriedades obrigatórias

Adicione ou edite suas configurações de conexão no arquivo de propriedades do conector Kafka. Para obter mais informações, consulte Configuração do conector Kafka.

snowflake.ingestion.method

Necessário somente se o conector Kafka for utilizado como o cliente que ingere o streaming. Especifica se deve usar o Snowpipe Streaming ou Snowpipe padrão para carregar seus dados de tópico Kafka. Os valores suportados são:

  • SNOWPIPE_STREAMING

  • SNOWPIPE (padrão)

Nenhuma configuração adicional é necessária para escolher o serviço de back-end para enfileirar e carregar os dados de tópico. Configure propriedades adicionais em seu arquivo de propriedades do conector Kafka como de costume.

snowflake.role.name

Função de controle de acesso a ser usada ao inserir as linhas na tabela.

Propriedades de buffer e sondagem

buffer.flush.time

Número de segundos entre as liberações do buffer, com cada liberação resultando em operações de inserção para os registros em buffer. O conector Kafka chama o API do Snowpipe Streaming (“API”) uma vez após cada liberação.

O valor mínimo suportado para a propriedade buffer.flush.time é 1 (em segundos). Para taxas médias de fluxo de dados mais altas, sugerimos que você diminua o valor padrão para melhorar a latência. Se o custo for uma preocupação maior do que a latência, você poderia aumentar o tempo de liberação do buffer. Tenha cuidado para liberar o buffer de memória Kafka antes que ele fique cheio para evitar exceções fora da memória.

Valores

1 - Sem limite superior.

Padrão

10

buffer.count.records

Número de registros armazenados em buffer por partição Kafka antes da ingestão no Snowflake.

Valores

1 - Sem limite superior.

Padrão

10000

buffer.size.bytes

Tamanho cumulativo em bytes dos registros armazenados em buffer por partição Kafka antes de serem ingeridos no Snowflake como arquivos de dados.

Os registros são compactados quando gravados em arquivos de dados. Como resultado, o tamanho dos registros no buffer pode ser maior do que o tamanho dos arquivos de dados criados a partir dos registros.

Valores

1 - Sem limite superior.

Padrão

20000000 (20 MB)

Além das propriedades do conector Kafka, observe a propriedade max.poll.records do consumidor Kafka, que controla o número máximo de registros devolvidos por Kafka para Kafka Connect em uma única pesquisa. O valor padrão de 500 pode ser aumentado, mas tenha em mente as restrições de memória. Para obter mais informações sobre esta propriedade, consulte a documentação de seu pacote Kafka:

Tratamento de erros e propriedades DLQ

errors.tolerance

Especifica como tratar os erros encontrados pelo conector Kafka:

Esta propriedade suporta os seguintes valores:

NONE

Pare o carregamento de dados quando o primeiro erro for encontrado.

ALL

Ignore todos os erros e continue a carregar os dados.

Padrão

NONE

errors.log.enable

Especifica se deve ou não escrever mensagens de erro no arquivo de log Kafka Connect.

Esta propriedade suporta os seguintes valores:

TRUE

Escreva mensagens de erro.

FALSE

Não escreva mensagens de erro.

Padrão

FALSE

errors.deadletterqueue.topic.name

Especifica o nome do nome do tópico DLQ (fila de mensagens mortas) em Kafka para entregar mensagens ao Kafka que não puderam ser ingeridas nas tabelas do Snowflake. Para obter mais informações, consulte Filas de mensagens mortas (neste tópico).

Valores

Cadeia de caracteres de texto personalizada

Padrão

Nenhum.

Semântica exatamente um

A semântica exatamente um garante a entrega de mensagens Kafka sem duplicação ou perda de dados. Esta garantia de entrega é definida por padrão para o conector Kafka com Snowpipe Streaming.

Conversores

O conector Kafka com Snowpipe Streaming não oferece suporte aos seguintes valores key.converter ou value.converter:

  • com.snowflake.kafka.connector.records.SnowflakeJsonConverter

  • com.snowflake.kafka.connector.records.SnowflakeAvroConverter

  • com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry

Os conversores Snowflake personalizados tratam de erros que impedem o carregamento de dados movendo os arquivos para o estágio da tabela. Este fluxo de trabalho entra em conflito com as filas de mensagens mortas do Snowpipe Streaming.

Filas de mensagens mortas

O conector Kafka com Snowpipe Streaming suporta filas de mensagens mortas (DLQ) para registros quebrados ou registros que não podem ser processados com sucesso devido a uma falha.

Para obter mais informações sobre monitoramento, consulte a documentação do Apache Kafka.

Faturamento e uso

Para obter informações de faturamento do Snowpipe Streaming, consulte Faturamento do Snowpipe Streaming.