Instalação e configuração do conector Kafka

O conector Kafka é fornecido como um arquivo JAR (executável Java).

O Snowflake fornece duas versões do conector:

O conector Kafka está sujeito aos Termos de Terceiros.

As instruções neste tópico especificam quais passos se aplicam somente a uma das versões do conector.

Neste tópico:

Configuração do controle de acesso para objetos Snowflake

Privilégios obrigatórios

Criar e gerenciar objetos Snowflake usados pelo conector Kafka requer uma função com os seguintes privilégios mínimos:

Objeto

Privilégio

Notas

Banco de dados

USAGE

Esquema

USAGE . CREATE TABLE . CREATE STAGE . CREATE PIPE

Após a criação dos objetos de nível de esquema, os privilégios CREATE object podem ser revogados.

Tabela

OWNERSHIP

Somente necessário ao utilizar o conector Kafka para ingerir dados em uma tabela existente. . Se o conector criar uma nova tabela de destino para registros do tópico Kafka, a função padrão para o usuário especificado no arquivo de configuração do Kafka torna-se o proprietário da tabela (ou seja, tem o privilégio OWNERSHIP para a tabela).

Estágio

READ . WRITE

Somente necessário ao utilizar o conector Kafka para preparar arquivos de dados do Kafka em um estágio interno existente (não recomendado). . Se o conector criar um novo estágio para armazenar temporariamente arquivos de dados consumidos do tópico Kafka, a função padrão para o usuário especificado no arquivo de configuração do Kafka torna-se o proprietário do estágio (ou seja, tem o privilégio OWNERSHIP para o estágio).

A Snowflake recomenda que você crie um usuário (usando CREATE USER) e função (usando CREATE ROLE) separados para cada instância do Kafka para que os privilégios de acesso possam ser revogados individualmente, se necessário. A função deve ser atribuída como a função padrão para o usuário.

Criação de uma função para usar o conector Kafka

O seguinte script cria uma função personalizada para uso pelo conector Kafka (por exemplo, KAFKA_CONNECTOR_ROLE_1). Qualquer função que possa conceder privilégios (por exemplo, SECURITYADMIN ou qualquer função com o privilégio MANAGE GRANTS) pode conceder esta função personalizada a qualquer usuário para permitir que o conector Kafka crie os objetos Snowflake necessários e insira os dados nas tabelas. O script faz referência a um banco de dados e esquema específicos existentes e ao usuário (kafka_db.kafka_schema) (kafka_connector_user_1):

-- Use a role that can create and manage roles and privileges.
USE ROLE securityadmin;

-- Create a Snowflake role with the privileges to work with the connector.
CREATE ROLE kafka_connector_role_1;

-- Grant privileges on the database.
GRANT USAGE ON DATABASE kafka_db TO ROLE kafka_connector_role_1;

-- Grant privileges on the schema.
GRANT USAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE TABLE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE STAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE PIPE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;

-- Only required if the Kafka connector will load data into an existing table.
GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE kafka_connector_role_1;

-- Only required if the Kafka connector will stage data files in an existing internal stage: (not recommended).
GRANT READ, WRITE ON STAGE existing_stage1 TO ROLE kafka_connector_role_1;

-- Grant the custom role to an existing user.
GRANT ROLE kafka_connector_role_1 TO USER kafka_connector_user_1;

-- Set the custom role as the default role for the user.
-- If you encounter an 'Insufficient privileges' error, verify the role that has the OWNERSHIP privilege on the user.
ALTER USER kafka_connector_user_1 SET DEFAULT_ROLE = kafka_connector_role_1;
Copy

Para obter mais informações sobre a criação de funções personalizadas e hierarquias de funções, consulte Configuração do controle de acesso.

Pré-requisitos de instalação

  • O conector Kafka suporta as seguintes versões de pacote:

    Pacote

    Versão do conector Kafka para Snowflake

    Suporte de pacote (testado pelo Snowflake)

    Apache Kafka

    2.0.0 (ou superior)

    Apache Kafka 2.5.1, 2.8.1, 3.2.1

    Confluent

    2.0.0 (ou superior)

    Confluent 6.2.6, 7.2.1

  • O conector Kafka foi criado para ser usado com Kafka Connect API 3.2.3. Quaisquer versões mais recentes do Kafka Connect API não foram testadas. Qualquer versão mais antiga que 3.2.3 é compatível com o conector. Para obter mais informações, consulte Compatibilidade do Kafka.

  • Quando você tiver o conector Kafka e os arquivos jar do driver JDBC em seu ambiente, certifique-se de que sua versão JDBC corresponda à versão snowflake-jdbc especificada no arquivo pom.xml da versão pretendida do conector Kafka. Você pode acessar sua versão preferida do conector Kafka, por exemplo, v2.0.1. Depois procure o arquivo pom.xml para descobrir a versão de snowflake-jdbc.

  • Se você usa o formato Avro para ingerir dados:

  • Configure o Kafka com o tempo desejado de retenção de dados e/ou limite de armazenamento.

  • Instale e configure o cluster Kafka Connect.

    Cada nó de cluster Kafka Connect deve incluir RAM suficiente para o conector Kafka. A quantidade mínima recomendada é 5 MB por partição Kafka. Isto é além da RAM necessária para qualquer outro trabalho que o Kafka Connect está realizando.

  • Recomendamos utilizar as mesmas versões no Kafka Broker e no Kafka Connect Runtime.

  • Recomendamos executar sua instância do Kafka Connect na mesma região de provedor de nuvem que sua conta Snowflake. Isto não é estritamente necessário, mas normalmente melhora o rendimento.

Para obter uma lista dos sistemas operacionais compatíveis com os clientes Snowflake, consulte Suporte ao sistema operacional.

Instalação do conector

Esta seção fornece instruções para instalar e configurar o conector Kafka para Confluent. Consulte a tabela a seguir para ver as versões de conectores Kafka:

Série de lançamento

Status

Notas

2.x.x

Com suporte oficial

Versão mais recente e fortemente recomendada.

1.9.x

Com suporte oficial

Atualização recomendada.

1.8.x

Sem suporte

Não utilize esta série de lançamento.

1.7.x

Sem suporte

Não utilize esta série de lançamento.

Nota

Observe que em um cenário de alto rendimento (por exemplo, 50 MB/s por conector), esse recurso pode resultar em latência ou custo mais alto. Recomendamos que você desative esse recurso para cenários de alto rendimento definindo enable.streaming.client.optimization como falso. Para obter mais informações, consulte Uso do conector Snowflake para Kafka com Snowpipe Streaming.

Instalação do conector para Confluent

Baixe os arquivos do conector Kafka

Baixe o arquivo JAR do conector Kafka de um dos seguintes locais:

Hub Confluent

https://www.confluent.io/hub/

O pacote inclui todas as dependências necessárias para usar uma chave privada criptografada ou não criptografada para autenticação do par de chaves. Para obter mais informações, consulte Uso da autenticação de par de chaves e rodízio de chaves (neste tópico).

Repositório central Maven

https://mvnrepository.com/artifact/com.snowflake

O arquivo JAR não requer nenhuma dependência adicional para usar uma chave privada não criptografada para autenticação do par de chaves. Para usar uma chave privada criptografada, baixe a biblioteca de criptografia Bouncy Castle (um arquivo JAR). O Snowflake usa o Bouncy Castle para descriptografar chaves privadas RSA criptografadas usadas para fazer login:

Baixe estes arquivos para a mesma pasta local que o arquivo JAR do conector Kafka.

O código fonte para o conector está disponível em https://github.com/snowflakedb/snowflake-kafka-connector.

Instalação do conector Kafka

Instale o conector Kafka usando as instruções fornecidas para a instalação de outros conectores:

Instalação do conector para o Apache Kafka de código aberto

Esta seção fornece instruções para instalar e configurar o conector Kafka para o Apache Kafka de código aberto.

Instalação do Apache Kafka

  1. Baixe o pacote Kafka de seu site oficial: https://kafka.apache.org/downloads.

  2. Em uma janela do terminal, mude para o diretório onde você baixou o arquivo do pacote.

  3. Execute o seguinte comando para descompactar o arquivo kafka_<versão_scala>-<versão_kafka>.tgz:

    tar xzvf kafka_<scala_version>-<kafka_version>.tgz
    
    Copy

Instale o JDK

Instale e configure o Java Development Kit (JDK). O Snowflake é testado com o Standard Edition (SE) do JDK. A Enterprise Edition (EE) deve ser compatível, mas não foi testada.

Se já completou esta etapa, você pode pular esta seção.

  1. Baixe o JDK em https://www.oracle.com/technetwork/java/javase/downloads/index.html.

  2. Instale ou descompacte o JDK.

  3. Seguindo as instruções para seu sistema operacional, defina a variável de ambiente JAVA_HOME para apontar para o diretório que contém o JDK.

Download dos arquivos JAR do conector Kafka

  1. Baixe o arquivo JAR do conector Kafka do Repositório central Maven:

    https://mvnrepository.com/artifact/com.snowflake

  2. O arquivo JAR não requer nenhuma dependência adicional para usar uma chave privada não criptografada para autenticação do par de chaves. Para usar uma chave privada criptografada, baixe a biblioteca de criptografia Bouncy Castle (um arquivo JAR). O Snowflake usa o Bouncy Castle para descriptografar chaves privadas RSA criptografadas usadas para fazer login:

  3. Se seus dados Kafka são transmitidos no formato Apache Avro, então baixe o arquivo Avro JAR:

    https://mvnrepository.com/artifact/org.apache.avro/avro

O código fonte para o conector está disponível em https://github.com/snowflakedb/snowflake-kafka-connector.

Instalação do conector Kafka

Copie os arquivos JAR que você baixou em Download dos arquivos JAR do conector Kafka para a pasta <dir_kafka>/libs.

Configuração do conector Kafka

O conector é configurado criando um arquivo que especifica parâmetros tais como as credenciais de login do Snowflake, nome(s) do(s) tópico(s), nome(s) da(s) tabela(s) Snowflake, etc.

Importante

A estrutura do Kafka Connect transmite as configurações do conector Kafka desde o nó mestre até os nós de trabalhadores. As configurações incluem informações confidenciais (especificamente, o nome de usuário Snowflake e a chave privada). Certifique-se de proteger o canal de comunicação entre os nós do Kafka Connect. Para instruções, consulte a documentação de seu software Apache Kafka.

Cada arquivo de configuração especifica os tópicos e tabelas correspondentes para um banco de dados e um esquema nesse banco de dados. Observe que um conector pode ingerir mensagens de qualquer número de tópicos, mas as tabelas correspondentes devem ser todas armazenadas em um único banco de dados e esquema.

Esta seção fornece instruções tanto para o modo distribuído como para o modo autônomo.

Para descrições dos campos de configuração, consulte Propriedades de configuração do Kafka.

Importante

Como o arquivo de configuração normalmente contém informações relacionadas à segurança, tais como a chave privada, defina privilégios de leitura/gravação adequadamente no arquivo para limitar o acesso.

Além disso, considere o armazenamento do arquivo de configuração em um local externo seguro ou em um serviço de gerenciamento de chaves. Para obter mais informações, consulte Externalização de segredos (neste tópico).

Modo distribuído

Criar o arquivo de configuração do Kafka, por exemplo, <caminho>/<arquivo_config>.json. Preencha o arquivo com todas as informações de configuração do conector. O arquivo deve estar no formato JSON.

Exemplos de arquivo de configuração

{
  "name":"XYZCompanySensorData",
  "config":{
    "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
    "tasks.max":"8",
    "topics":"topic1,topic2",
    "snowflake.topic2table.map": "topic1:table1,topic2:table2",
    "buffer.count.records":"10000",
    "buffer.flush.time":"60",
    "buffer.size.bytes":"5000000",
    "snowflake.url.name":"myorganization-myaccount.snowflakecomputing.com:443",
    "snowflake.user.name":"jane.smith",
    "snowflake.private.key":"xyz123",
    "snowflake.private.key.passphrase":"jkladu098jfd089adsq4r",
    "snowflake.database.name":"mydb",
    "snowflake.schema.name":"myschema",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
    "value.converter.schema.registry.url":"http://localhost:8081",
    "value.converter.basic.auth.credentials.source":"USER_INFO",
    "value.converter.basic.auth.user.info":"jane.smith:MyStrongPassword"
  }
}
Copy

Modo autônomo

Crie um arquivo de configuração; por exemplo, <dir_kafka>/config/SF_connect.properties. Preencha o arquivo com todas as informações de configuração do conector.

Exemplos de arquivo de configuração

connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=8
topics=topic1,topic2
snowflake.topic2table.map= topic1:table1,topic2:table2
buffer.count.records=10000
buffer.flush.time=60
buffer.size.bytes=5000000
snowflake.url.name=myorganization-myaccount.snowflakecomputing.com:443
snowflake.user.name=jane.smith
snowflake.private.key=xyz123
snowflake.private.key.passphrase=jkladu098jfd089adsq4r
snowflake.database.name=mydb
snowflake.schema.name=myschema
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=com.snowflake.kafka.connector.records.SnowflakeAvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.basic.auth.user.info=jane.smith:MyStrongPassword
Copy

Propriedades de configuração do Kafka

As seguintes propriedades podem ser definidas no arquivo de configuração do Kafka para o modo distribuído ou modo autônomo:

Propriedades obrigatórias

name

Nome do aplicativo. Isto deve ser único em todos os conectores Kafka utilizados pelo cliente. Este nome deve ser um identificador válido do Snowflake, sem aspas. Para obter mais informações sobre identificadores válidos, consulte Requisitos para identificadores.

connector.class

com.snowflake.kafka.connector.SnowflakeSinkConnector

topics

Lista de tópicos separados por vírgula. Por padrão, o Snowflake considera que o nome da tabela é o mesmo que o nome do tópico. Se o nome da tabela não for o mesmo que o nome do tópico, então use o parâmetro opcional topic2table.map (abaixo) para especificar o mapeamento do nome do tópico para o nome da tabela. Este nome de tabela deve ser um identificador válido do Snowflake, sem aspas. Para obter mais informações sobre nomes válidos de tabelas, consulte Requisitos para identificadores.

Nota

Ou topics ou topics.regex é necessário; não ambos.

topics.regex

Esta é uma expressão regular (“regex”) que especifica os tópicos que contêm as mensagens a serem carregadas nas tabelas do Snowflake. O conector carrega dados de qualquer nome de tópico que coincida com a expressão regex. A expressão regex deve seguir as regras para expressões regulares Java (isto é, ser compatível com java.util.regex.Pattern). O arquivo de configuração deve conter ou topics ou topics.regex, não ambos.

snowflake.url.name

O URL para acessar sua conta Snowflake. Este URL deve incluir seu identificador da conta. Observe que o protocolo (https://) e o número da porta são opcionais.

snowflake.user.name

Nome de login do usuário para a conta Snowflake.

snowflake.private.key

A chave privada para autenticar o usuário. Inclua apenas a chave, não o cabeçalho ou rodapé. Se a chave estiver dividida em várias linhas, remova as quebras de linha. Você pode fornecer uma chave não criptografada, ou pode fornecer uma chave criptografada e fornecer o parâmetro snowflake.private.key.passphrase para permitir que o Snowflake descriptografe a chave. Use este parâmetro se e somente se o valor do parâmetro snowflake.private.key estiver criptografado. Isto descriptografa chaves privadas que foram criptografadas de acordo com as instruções em Uso da autenticação de par de chaves e rodízio de chaves (neste tópico).

Nota

Consulte também snowflake.private.key.passphrase em Propriedades opcionais (neste tópico).

snowflake.database.name

O nome do banco de dados que contém a tabela onde inserir as linhas.

snowflake.schema.name

O nome do esquema que contém a tabela onde inserir as linhas.

header.converter

Obrigatório somente se os registros forem formatados em Avro e incluírem um cabeçalho. O valor é "org.apache.kafka.connect.storage.StringConverter".

key.converter

Este é o conversor de chave do registro Kafka (por exemplo, "org.apache.kafka.connect.storage.StringConverter"). Não é usado pelo conector Kafka, mas é exigido pela plataforma Kafka Connect.

Consulte Limitações do conector Kafka para as limitações atuais.

value.converter

Se os registros forem formatados em JSON, deve ser "com.snowflake.kafka.connector.records.SnowflakeJsonConverter".

Se os registros forem formatados em Avro e utilizarem o Schema Registry Service do Kafka, deve ser "com.snowflake.kafka.connector.records.SnowflakeAvroConverter".

Se os registros forem formatados em Avro e contiverem o esquema (e portanto não precisarem do Schema Registry Service do Kafka), deve ser "com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry".

Se os registros forem formatados em texto simples, deve ser "org.apache.kafka.connect.storage.StringConverter".

Consulte Limitações do conector Kafka para as limitações atuais.

Propriedades opcionais

snowflake.private.key.passphrase

Se o valor deste parâmetro não estiver vazio, o Kafka usa esta frase para tentar decifrar a chave privada.

tasks.max

Número de tarefas, geralmente o mesmo que o número de núcleos de CPU dos nós de trabalhadores no cluster Kafka Connect. Este número pode ser definido como mais baixo ou mais alto; no entanto, a Snowflake não recomenda defini-lo como mais alto.

snowflake.topic2table.map

Este parâmetro opcional permite que um usuário especifique quais tópicos devem ser mapeados para quais tabelas. Cada tópico e seu nome de tabela devem ser separados por dois pontos (ver exemplo abaixo). Este nome de tabela deve ser um identificador válido do Snowflake, sem aspas. Para obter mais informações sobre nomes válidos de tabelas, consulte Requisitos para identificadores.

buffer.count.records

Número de registros armazenados em buffer por partição Kafka antes da ingestão no Snowflake. O valor padrão é 10000 registros.

buffer.flush.time

Número de segundos entre liberações do buffer, onde a liberação vai desde o cache de memória do Kafka até o estágio interno. O valor padrão é 120 segundos.

buffer.size.bytes

Tamanho cumulativo em bytes dos registros armazenados em buffer por partição Kafka antes de serem ingeridos no Snowflake como arquivos de dados. O valor padrão é 5000000 (5 MB).

Os registros são compactados quando gravados em arquivos de dados. Como resultado, o tamanho dos registros no buffer pode ser maior do que o tamanho dos arquivos de dados criados a partir dos registros.

value.converter.schema.registry.url

Se o formato for Avro e você estiver usando um Schema Registry Service, deve ser o URL do Schema Registry Service. Caso contrário, este campo deve estar vazio.

value.converter.break.on.schema.registry.error

Se carregar dados Avro do Schema Registry Service, esta propriedade determina se o conector Kafka deve parar de consumir registros se encontrar um erro ao buscar a identificação do esquema. O valor padrão é false. Defina o valor como true para permitir este comportamento.

Com suporte pela versão de conector Kafka 1.4.2 (e superior).

jvm.proxy.host

Para permitir que o conector do Kafka para Snowflake possa acessar o Snowflake através de um servidor proxy, defina este parâmetro para especificar o host desse servidor proxy.

jvm.proxy.port

Para permitir que o conector do Kafka para Snowflake possa acessar o Snowflake através de um servidor proxy, defina este parâmetro para especificar a porta desse servidor proxy.

jvm.proxy.username

Nome de usuário que se autentica com o servidor proxy.

Com suporte pela versão de conector Kafka 1.4.4 (e superior).

jvm.proxy.password

Senha para o nome de usuário que se autentica com o servidor proxy.

Com suporte pela versão de conector Kafka 1.4.4 (e superior).

value.converter.basic.auth.credentials.source

Se você estiver usando o formato de dados Avro e precisar de acesso seguro ao registro do esquema Kafka, defina este parâmetro com a cadeia de caracteres “USER_INFO“, e defina o parâmetro value.converter.basic.auth.user.info descrito abaixo. Caso contrário, omita este parâmetro.

value.converter.basic.auth.user.info

Se você estiver usando o formato de dados Avro e precisar de acesso seguro ao registro do esquema Kafka, defina este parâmetro com a cadeia de caracteres “<ID_usuário>:<senha>” e defina o parâmetro value.converter.basic.auth.credentials.source descrito acima. Caso contrário, omita este parâmetro.

snowflake.metadata.createtime

Se o valor for definido como FALSE, o valor da propriedade CreateTime será omitido dos metadados na coluna RECORD_METADATA. O valor padrão é TRUE.

Com suporte pelo conector Kafka 1.2.0 (e superior).

snowflake.metadata.topic

Se o valor for definido como FALSE, o valor da propriedade topic será omitido dos metadados na coluna RECORD_METADATA. O valor padrão é TRUE.

Com suporte pelo conector Kafka 1.2.0 (e superior).

snowflake.metadata.offset.and.partition

Se o valor for definido como FALSE, os valores de propriedade Offset e Partition são omitidos dos metadados na coluna RECORD_METADATA. O valor padrão é TRUE.

Com suporte pelo conector Kafka 1.2.0 (e superior).

snowflake.metadata.all

Se o valor for definido como FALSE, os metadados na coluna RECORD_METADATA estarão completamente vazios. O valor padrão é TRUE.

Com suporte pelo conector Kafka 1.2.0 (e superior).

transforms

Especifique para ignorar os registros de marca de exclusão encontrados pelo conector Kafka e não carregá-los na tabela de destino. Um registro de marca de exclusão é definido como um registro onde todo o campo de valor é nulo.

Ajuste o valor da propriedade para "tombstoneHandlerExample".

Nota

Use esta propriedade somente com os conversores da comunidade Kafka (ou seja, valor da propriedade value.converter) (por exemplo, org.apache.kafka.connect.json.JsonConverter ou org.apache.kafka.connect.json.AvroConverter). Para gerenciar o manuseio de registros de marcas de exclusão com os conversores do Snowflake, use a propriedade behavior.on.null.values em seu lugar.

transforms.tombstoneHandlerExample.type

Necessário ao definir a propriedade transforms.

Defina o valor da propriedade como "io.confluent.connect.transforms.TombstoneHandler"

behavior.on.null.values

Especifique como o conector Kafka deve lidar com os registros de marca de exclusão. Um registro de marca de exclusão é definido como um registro onde todo o campo de valor é nulo. Para Snowpipe, esta propriedade é compatível com a versão 1.5.5 do conector Kafka e posterior. Para Snowpipe Streaming, essa propriedade é compatível com o conector Kafka versão 2.1.0 e posterior.

Esta propriedade suporta os seguintes valores:

DEFAULT

Quando o conector Kafka encontra um registro de marca de exclusão, insere uma cadeia de caracteres JSON vazia na coluna de conteúdo.

IGNORE

O conector Kafka ignora registros de marca de exclusão e não insere linhas para estes registros.

O valor padrão é DEFAULT.

Nota

A ingestão de registros Tombstone varia de acordo com os métodos de ingestão:

  • Para Snowpipe, o conector Kafka usa apenas conversores Snowflake. Para gerenciar o manuseio de registros de marcas de exclusão com os conversores da comunidade Kafka, use as propriedades transform e transforms.tombstoneHandlerExample.type.

  • Para Snowpipe Streaming, o conector Kafka usa apenas conversores de comunidade.

Os registros enviados aos brokers do Kafka não devem ser NULL porque esses registros serão eliminados pelo conector Kafka, resultando em offsets ausentes. Os offsets ausentes destruirão o conector Kafka em casos de uso específicos. É recomendável usar registros de marca de exclusão em vez de registros NULL.

Uso da autenticação de par de chaves e rodízio de chaves

O conector Kafka depende da autenticação de par de chaves e não da autenticação básica (ou seja, nome de usuário e senha). Este método de autenticação requer um par de chaves RSA de 2048 bits (mínimo). Gere o par de chaves pública-privada usando OpenSSL. A chave pública é atribuída ao usuário do Snowflake definido no arquivo de configuração.

Após completar as instruções de autenticação do par de chaves nesta página e as instruções de rodízio do par de chaves, avalie a recomendação para Externalização de segredos (neste tópico).

Para configurar o par de chaves pública/privada:

  1. A partir da linha de comando em uma janela terminal, gere uma chave privada.

    Você pode gerar tanto uma versão criptografada quanto uma versão não criptografada da chave privada.

    Nota

    O conector Kafka suporta algoritmos de criptografia que são validados para atender aos requisitos do Federal Information Processing Standard (140-2) (ou seja, FIPS 140-2). Para obter mais informações, consulte FIPS 140-2.

    Para gerar uma versão não criptografada, use o seguinte comando:

    $ openssl genrsa -out rsa_key.pem 2048
    
    Copy

    Para gerar uma versão criptografada, use o seguinte comando:

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 <algorithm> -inform PEM -out rsa_key.p8
    
    Copy

    Onde o <algoritmo> é um algoritmo de criptografia compatível com FIPS 140-2.

    Por exemplo, para especificar AES 256 como o algoritmo de criptografia:

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 aes256 -inform PEM -out rsa_key.p8
    
    Copy

    Se você gerar uma versão criptografada da chave privada, registre a senha. Mais tarde, você especificará a senha na propriedade snowflake.private.key.passphrase no arquivo de configuração do Kafka.

    Exemplo de chave privada PEM

    -----BEGIN ENCRYPTED PRIVATE KEY-----
    MIIE6TAbBgkqhkiG9w0BBQMwDgQILYPyCppzOwECAggABIIEyLiGSpeeGSe3xHP1
    wHLjfCYycUPennlX2bd8yX8xOxGSGfvB+99+PmSlex0FmY9ov1J8H1H9Y3lMWXbL
    ...
    -----END ENCRYPTED PRIVATE KEY-----
    
    Copy
  2. A partir da linha de comando, gere a chave pública, referenciando a chave privada:

    Supondo que a chave privada esteja criptografada e contida no arquivo chamado rsa_key.p8, use o seguinte comando:

    $ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
    
    Copy

    Exemplo de chave pública PEM

    -----BEGIN PUBLIC KEY-----
    MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy+Fw2qv4Roud3l6tjPH4
    zxybHjmZ5rhtCz9jppCV8UTWvEXxa88IGRIHbJ/PwKW/mR8LXdfI7l/9vCMXX4mk
    ...
    -----END PUBLIC KEY-----
    
    Copy
  3. Copie os arquivos de chaves públicas e privadas em um diretório local para armazenamento. Registre o caminho para os arquivos. Observe que a chave privada é armazenada usando o formato PKCS8 (Public Key Cryptography Standards) e é criptografada usando a senha especificada no passo anterior; entretanto, o arquivo ainda deve ser protegido contra acesso não autorizado usando o mecanismo de permissão de arquivo fornecido por seu sistema operacional. É sua responsabilidade proteger o arquivo quando ele não estiver sendo usado.

  4. Faça login no Snowflake. Atribua a chave pública ao usuário do Snowflake usando ALTER USER. Por exemplo:

    ALTER USER jsmith SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
    
    Copy

    Nota

    • Somente administradores de segurança (isto é, usuários com a função SECURITYADMIN ou superior) podem alterar um usuário.

    • Exclua o cabeçalho e o rodapé da chave pública na instrução SQL.

    Verifique a impressão digital da chave pública do usuário usando DESCRIBE USER:

    DESC USER jsmith;
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    | property                      | value                                               | default | description                                                                   |
    |-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------|
    | NAME                          | JSMITH                                              | null    | Name                                                                          |
    ...
    ...
    | RSA_PUBLIC_KEY_FP             | SHA256:nvnONUsfiuycCLMXIEWG4eTp4FjhVUZQUQbNpbSHXiA= | null    | Fingerprint of user's RSA public key.                                         |
    | RSA_PUBLIC_KEY_2_FP           | null                                                | null    | Fingerprint of user's second RSA public key.                                  |
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    
    Copy

    Nota

    A propriedade RSA_PUBLIC_KEY_2_FP está descrita em Configuração da rotação do par de chaves.

  5. Copie e cole a chave privada inteira no campo snowflake.private.key no arquivo de configuração. Salve o arquivo.

Externalização de segredos

O Snowflake recomenda a externalização de segredos como a chave privada e seu armazenamento em forma criptografada ou em um serviço de gerenciamento de chaves como o AWS Key Management Service (KMS), Microsoft Azure Key Vault ou HashiCorp Vault. Isto pode ser feito usando uma implementação ConfigProvider em seu cluster Kafka Connect.

Para obter mais informações, consulte a descrição do Confluent deste serviço.

Como iniciar o Kafka

Inicie o Kafka usando as instruções fornecidas na documentação de terceiros para Confluent ou Apache Kafka.

Iniciando o conector Kafka

Você pode iniciar o conector Kafka tanto no modo distribuído quanto no modo autônomo. As instruções para cada um deles são mostradas abaixo:

Modo distribuído

Em uma janela terminal, execute o seguinte comando:

curl -X POST -H "Content-Type: application/json" --data @<path>/<config_file>.json http://localhost:8083/connectors
Copy

Modo autônomo

Em uma janela terminal, execute o seguinte comando:

<kafka_dir>/bin/connect-standalone.sh <kafka_dir>/<path>/connect-standalone.properties <kafka_dir>/config/SF_connect.properties
Copy

(Uma instalação padrão de Apache Kafka ou Confluent Kafka já deve ter o arquivo connect-standalone.properties).

Teste e uso do conector Kafka

Recomendamos testar o conector Kafka com uma pequena quantidade de dados antes de usar o conector em um sistema de produção. O processo para testes é o mesmo que o processo para usar o conector normalmente:

  1. Verifique se o Kafka e o Kafka Connect estão funcionando.

  2. Verifique se você criou o tópico Kafka apropriado.

  3. Crie um editor de mensagens ou use um existente. Certifique-se de que as mensagens publicadas para o tópico tenham o formato correto (JSON, Avro ou texto simples).

  4. Crie um arquivo de configuração que especifique o tópico a assinar e a tabela Snowflake onde gravar. Para instruções, consulte Configuração do conector Kafka (neste tópico).

  5. (Opcional) Crie uma tabela onde gravar dados. Esta etapa é opcional; se você não criar a tabela, o conector Kafka cria a tabela para você. Se você não planeja usar o conector para adicionar dados a uma tabela existente e não vazia, então recomendamos que deixe o conector criar a tabela para que você minimize a possibilidade de uma incompatibilidade entre esquemas.

  6. Conceda os privilégios mínimos exigidos nos objetos Snowflake (banco de dados, esquema, tabela de destino, etc.) para a função que será usada para ingerir dados.

  7. Publique um conjunto de dados de exemplo para o tópico Kafka configurado.

  8. Aguarde alguns minutos para que os dados se propaguem pelo sistema e, em seguida, verifique a tabela do Snowflake para verificar se os registros foram inseridos.

Dica

Considere verificar sua conexão de rede com o Snowflake usando SnowCD antes de carregar dados para o Snowflake em seus ambientes de teste e produção.