Snowflake High Performance connector for Kafka: Configurar o Kafka¶
Este tópico descreve as etapas para instalar e configurar o Kafka para Snowflake High Performance connector for Kafka.
Instalando o conector Kafka¶
O conector Kafka é fornecido como um arquivo JAR (executável Java).
O Snowflake fornece duas versões do conector:
Uma versão para a implementação Confluent do Kafka Connect.
Uma versão para o open source software (OSS) Apache Kafka package https://mvnrepository.com/artifact/com.snowflake/snowflake-kafka-connector/.
Ambas as versões do conector estão disponíveis na versão preliminar privada do Snowflake e devem ser adquiridas por meio da Snowflake. Entre em contato com a equipe de conta Snowflake para obter o arquivo JAR do conector.
Se você não tiver certeza de qual versão usar, consulte Escolhendo uma versão de conector. Configurando o conector Kafka ==============================================================================
A configuração do conector é específica do fornecedor. Algumas implementações, como Amazon MSK Connect, têm uma UI para configurar o conector e aceitam a configuração em JSON e também no formato de arquivo de propriedades.
Esta seção é uma referência geral para os nomes e valores dos parâmetros do conector. Tenha em mente que fornecedores de nuvem diferentes podem ter requisitos de configuração um pouco distintos.
Importante
A estrutura do Kafka Connect transmite as configurações do conector Kafka desde o nó mestre até os nós de trabalhadores. As configurações incluem informações confidenciais (especificamente, o nome de usuário Snowflake e a chave privada). Certifique-se de proteger o canal de comunicação entre os nós do Kafka Connect. Para instruções, consulte a documentação de seu software Apache Kafka.
Cada arquivo de configuração especifica os tópicos e tabelas correspondentes para um banco de dados e um esquema nesse banco de dados. Observe que um conector pode ingerir mensagens de qualquer número de tópicos, mas todas as tabelas correspondentes devem estar localizadas em um único banco de dados e esquema.
Para descrições dos campos de configuração, consulte Propriedades de configuração do conector.
Importante
Como o arquivo de configuração normalmente contém informações relacionadas à segurança, tais como a chave privada, defina privilégios de leitura/gravação adequadamente no arquivo para limitar o acesso.
Além disso, considere o armazenamento do arquivo de configuração em um local externo seguro ou em um serviço de gerenciamento de chaves.
Exemplo de arquivo json de configuração
{
"name":"XYZCompanySensorData",
"config":{
"connector.class": "com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector",
"tasks.max": "1",
"snowflake.topic2table.map": "topic1:table_1,topic2:table_2",
"snowflake.url.name": "myorganization-myaccount.snowflakecomputing.com:443",
"snowflake.warehouse.name": "WH",
"snowflake.private.key": "-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n",
"snowflake.schema.name": "MY_SCHEMA",
"snowflake.database.name": "MY_DATABASE",
"snowflake.role.name": "MY_ROLE",
"snowflake.user.name": "MY_USER",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"errors.log.enable": "true",
"topics": "topic1,topic2",
"value.converter.schemas.enable": "false",
"errors.tolerance": "all"
}
}
Exemplo de arquivo de propriedades de configuração
connector.class=com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector
tasks.max=1
snowflake.topic2table.map=topic1:table_1,topic2:table_2
snowflake.url.name=myorganization-myaccount.snowflakecomputing.com:443
snowflake.warehouse.name=WH
snowflake.private.key=-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n
snowflake.schema.name=MY_SCHEMA
snowflake.database.name=MY_DATABASE
snowflake.role.name=MY_ROLE
snowflake.user.name=MY_USER
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
errors.log.enable=true
topics=topic1,topic2
name=XYZCompanySensorData
value.converter.schemas.enable=false
errors.tolerance=all
Propriedades de configuração do conector¶
Propriedades obrigatórias¶
nameNome do aplicativo. Isto deve ser único em todos os conectores Kafka utilizados pelo cliente. Este nome deve ser um identificador válido do Snowflake, sem aspas. Para obter mais informações sobre identificadores válidos, consulte Requisitos para identificadores.
connector.classcom.snowflake.kafka.connector.SnowflakeStreamingSinkConnectortopicsLista de tópicos separados por vírgula. Por padrão, o Snowflake considera que o nome da tabela é o mesmo que o nome do tópico. Se o nome da tabela não for o mesmo que o nome do tópico, então use o parâmetro opcional
topic2table.map(abaixo) para especificar o mapeamento do nome do tópico para o nome da tabela. Este nome de tabela deve ser um identificador válido do Snowflake, sem aspas. Para obter mais informações sobre nomes válidos de tabelas, consulte Requisitos para identificadores.Nota
Ou
topicsoutopics.regexé necessário; não ambos.topics.regexEsta é uma expressão regular (“regex”) que especifica os tópicos que contêm as mensagens a serem carregadas nas tabelas do Snowflake. O conector carrega dados de qualquer nome de tópico que coincida com a expressão regex. A expressão regex deve seguir as regras para expressões regulares Java (isto é, ser compatível com java.util.regex.Pattern). O arquivo de configuração deve conter ou
topicsoutopics.regex, não ambos.snowflake.url.nameO URL para acessar sua conta Snowflake. Este URL deve incluir seu identificador da conta. Observe que o protocolo (
https://) e o número da porta são opcionais.snowflake.user.nameNome de login do usuário para a conta Snowflake.
snowflake.role.nameO nome da função que o conector usará para inserir dados na tabela.
snowflake.private.keyA chave privada para autenticar o usuário. Inclua apenas a chave, não o cabeçalho ou rodapé. Se a chave estiver dividida em várias linhas, remova as quebras de linha. Você pode fornecer uma chave não criptografada, ou pode fornecer uma chave criptografada e fornecer o parâmetro
snowflake.private.key.passphrasepara permitir que o Snowflake descriptografe a chave. Use este parâmetro se e somente se o valor do parâmetrosnowflake.private.keyestiver criptografado. Ele descriptografa chaves privadas que foram criptografadas de acordo com as instruções em Autenticação de pares de chaves e rotação de pares de chaves.Nota
Consulte também
snowflake.private.key.passphraseem Propriedades opcionais.snowflake.database.nameO nome do banco de dados que contém a tabela onde inserir as linhas.
snowflake.schema.nameO nome do esquema que contém a tabela onde inserir as linhas.
header.converterObrigatório somente se os registros forem formatados em Avro e incluírem um cabeçalho. O valor é
"org.apache.kafka.connect.storage.StringConverter".key.converterEste é o conversor de chave do registro Kafka (por exemplo,
"org.apache.kafka.connect.storage.StringConverter"). Não é usado pelo conector Kafka, mas é exigido pela plataforma Kafka Connect.Consulte Limitações do conector Kafka para as limitações atuais.
value.converterO conector é compatível com os conversores padrão da comunidade Kafka. Escolha o conversor apropriado com base em seu formato de dados:
Para registros JSON:
"org.apache.kafka.connect.json.JsonConverter"Para registros Avro com Schema Registry:
"io.confluent.connect.avro.AvroConverter"
Consulte Limitações do conector Kafka para as limitações atuais.
Propriedades opcionais¶
snowflake.private.key.passphraseSe o valor desse parâmetro não estiver vazio, o conector usará essa frase para tentar descriptografar a chave privada.
tasks.maxNúmero de tarefas, geralmente o mesmo que o número de núcleos de CPU dos nós de trabalhadores no cluster Kafka Connect. Para obter o melhor desempenho, a Snowflake recomenda definir o número de tarefas igual ao número total de partições Kafka, mas não excedendo o número de núcleos da CPU. Um alto número de tarefas pode resultar em maior consumo de memória e redistribuições frequentes.
snowflake.topic2table.mapEste parâmetro opcional permite que um usuário especifique quais tópicos devem ser mapeados para quais tabelas. Cada tópico e seu nome de tabela devem ser separados por dois pontos (ver exemplo abaixo). Este nome de tabela deve ser um identificador válido do Snowflake, sem aspas. Para obter mais informações sobre nomes válidos de tabelas, consulte Requisitos para identificadores. A configuração do tópico permite o uso de expressões regulares para definir tópicos, assim como o uso de
topics.regex. As expressões regulares não podem ser ambíguas — qualquer tópico correspondente deve corresponder apenas a uma única tabela de destino.Exemplo:
topics="topic1,topic2,topic5,topic6" snowflake.topic2table.map="topic1:low_range,topic2:low_range,topic5:high_range,topic6:high_range"
poderia ser escrito como:
topics.regex="topic[0-9]" snowflake.topic2table.map="topic[0-4]:low_range,topic[5-9]:high_range"
value.converter.schema.registry.urlSe o formato for Avro e você estiver usando um Schema Registry Service, deve ser o URL do Schema Registry Service. Caso contrário, este campo deve estar vazio.
value.converter.break.on.schema.registry.errorSe carregar dados Avro do Schema Registry Service, esta propriedade determina se o conector Kafka deve parar de consumir registros se encontrar um erro ao buscar a identificação do esquema. O valor padrão é
false. Defina o valor comotruepara permitir este comportamento.jvm.proxy.hostPara permitir que o conector do Kafka para Snowflake possa acessar o Snowflake através de um servidor proxy, defina este parâmetro para especificar o host desse servidor proxy.
jvm.proxy.portPara permitir que o conector do Kafka para Snowflake possa acessar o Snowflake através de um servidor proxy, defina este parâmetro para especificar a porta desse servidor proxy.
snowflake.streaming.max.client.lagEspecifica a frequência com que o Snowflake Ingest Java libera os dados para o Snowflake, em segundos.
- Valores:
Mínimo:
1segundoMáximo:
600segundos
- Padrão:
1segundo
jvm.proxy.usernameNome de usuário que se autentica com o servidor proxy.
jvm.proxy.passwordSenha para o nome de usuário que se autentica com o servidor proxy.
snowflake.jdbc.mapExemplo:
"snowflake.jdbc.map": "networkTimeout:20,tracing:WARNING"Propriedades JDBC adicionais (consulte Referência dos parâmetros de conexão do driver JDBC) não são validadas. Essas propriedades adicionais não são validadas e não devem substituir nem ser usadas em vez de propriedades necessárias, como:
jvm.proxy.xxx,snowflake.user.name,snowflake.private.key,snowflake.schema.nameetc.- Especificar qualquer uma das seguintes combinações:
Propriedade
tracingjunto com variávelJDBC_TRACEenvPropriedade
databasejunto comsnowflake.database.name
Resultará em um comportamento ambíguo, e o comportamento será determinado pelo Driver JDBC.
value.converter.basic.auth.credentials.sourceSe você estiver usando o formato de dados Avro e precisar de acesso seguro ao registro do esquema Kafka, defina este parâmetro com a cadeia de caracteres “USER_INFO“, e defina o parâmetro
value.converter.basic.auth.user.infodescrito abaixo. Caso contrário, omita este parâmetro.value.converter.basic.auth.user.infoSe você estiver usando o formato de dados Avro e precisar de acesso seguro ao registro do esquema Kafka, defina este parâmetro com a cadeia de caracteres “<ID_usuário>:<senha>” e defina o parâmetro value.converter.basic.auth.credentials.source descrito acima. Caso contrário, omita este parâmetro.
snowflake.metadata.createtimeSe o valor for definido como FALSE, o valor da propriedade
CreateTimeserá omitido dos metadados na coluna RECORD_METADATA. O valor padrão é TRUE.snowflake.metadata.topicSe o valor for definido como FALSE, o valor da propriedade
topicserá omitido dos metadados na coluna RECORD_METADATA. O valor padrão é TRUE.snowflake.metadata.offset.and.partitionSe o valor for definido como FALSE, os valores de propriedade
OffsetePartitionsão omitidos dos metadados na coluna RECORD_METADATA. O valor padrão é TRUE.snowflake.metadata.allSe o valor for definido como FALSE, os metadados na coluna RECORD_METADATA estarão completamente vazios. O valor padrão é TRUE.
transformsEspecifique para ignorar os registros de marca de exclusão encontrados pelo conector Kafka e não carregá-los na tabela de destino. Um registro de marca de exclusão é definido como um registro onde todo o campo de valor é nulo.
Ajuste o valor da propriedade para
"tombstoneHandlerExample".Nota
Use esta propriedade somente com os conversores da comunidade Kafka (ou seja, valor da propriedade
value.converter) (por exemplo,org.apache.kafka.connect.json.JsonConverterouorg.apache.kafka.connect.json.AvroConverter). Para gerenciar o manuseio de registros de marcas de exclusão com os conversores do Snowflake, use a propriedadebehavior.on.null.valuesem seu lugar.transforms.tombstoneHandlerExample.typeNecessário ao definir a propriedade
transforms.Defina o valor da propriedade como
"io.confluent.connect.transforms.TombstoneHandler"behavior.on.null.valuesEspecifique como o conector Kafka deve lidar com os registros de marca de exclusão. Um registro de marca de exclusão é definido como um registro onde todo o campo de valor é nulo. Para Snowpipe, esta propriedade é compatível com a versão 1.5.5 do conector Kafka e posterior. Para Snowpipe Streaming, essa propriedade é compatível com o conector Kafka versão 2.1.0 e posterior.
Esta propriedade suporta os seguintes valores:
DEFAULTQuando o conector Kafka encontra um registro de marca de exclusão, insere uma cadeia de caracteres JSON vazia na coluna de conteúdo.
IGNOREO conector Kafka ignora registros de marca de exclusão e não insere linhas para estes registros.
O valor padrão é
DEFAULT.Nota
A ingestão de registros Tombstone varia de acordo com os métodos de ingestão:
Para Snowpipe, o conector Kafka usa apenas conversores Snowflake. Para gerenciar o manuseio de registros de marcas de exclusão com os conversores da comunidade Kafka, use as propriedades
transformetransforms.tombstoneHandlerExample.type.Para Snowpipe Streaming, o conector Kafka usa apenas conversores de comunidade.
Os registros enviados aos brokers do Kafka não devem ser NULL porque esses registros serão eliminados pelo conector Kafka, resultando em offsets ausentes. Os offsets ausentes destruirão o conector Kafka em casos de uso específicos. É recomendável usar registros de marca de exclusão em vez de registros NULL.