Configure o Openflow Connector for Snowflake to Kafka

Nota

O conector está sujeito aos termos do conector.

Este tópico descreve as etapas para configurar o Openflow Connector for Snowflake to Kafka.

Pré-requisitos

  1. Certifique-se de ter revisado Sobre a Openflow Connector for Snowflake to Kafka.

  2. Certifique-se de ter configurado o Openflow.

  3. Crie um fluxo Snowflake que será consultado para obter as alterações.

  4. Crie um tópico Kafka que receberá mensagens de CDC do fluxo Snowflake.

Configure a conta Snowflake

Como administrador de conta Snowflake, execute as seguintes tarefas:

  1. Crie o banco de dados, a tabela de origem e o objeto de fluxo que o conector usará para ler os eventos de CDC. Por exemplo:

    create database stream_db;
    use database stream_db;
    create table stream_source (user_id varchar, data varchar);
    create stream stream_on_table on table stream_source;
    
    Copy
  2. Crie uma nova função ou use uma função existente e conceda o privilégio SELECT no fluxo e no objeto de origem do fluxo. O conector também precisará do privilégio USAGE no banco de dados e no esquema que contém o fluxo e o objeto de origem do fluxo. Por exemplo:

    create role stream_reader;
    grant usage on database stream_db to role stream_reader;
    grant usage on schema stream_db.public to role stream_reader;
    grant select on stream_source to role stream_reader;
    grant select on stream_on_table to role stream_reader;
    
    Copy
  3. Crie um novo usuário de serviço Snowflake com o tipo SERVICE. Por exemplo:

    create user stream_user type = service;
    
    Copy
  4. Conceda ao usuário do serviço Snowflake a função que você criou nas etapas anteriores. Por exemplo:

    grant role stream_reader to user stream_user;
    
    Copy
  5. Configure com autenticação por pares de chaves para o usuário Snowflake SERVICE a partir da etapa 3.

  6. A Snowflake recomenda enfaticamente essa etapa. Configure um gerenciador de segredos compatível com o Openflow, por exemplo, AWS, Azure e Hashicorp, e armazene as chaves públicas e privadas no armazenamento de segredos. No entanto, observe que a chave privada gerada na etapa 4 pode ser usada diretamente como um parâmetro de configuração para a configuração do conector. Nesse caso, a chave privada é armazenada na configuração de tempo de execução do Openflow.

    Nota

    Se, por algum motivo, você não quiser usar um gerenciador de segredos, será responsável por proteger os arquivos de chave pública e chave privada usados para autenticação de pares de chaves de acordo com as políticas de segurança de sua organização.

    1. Depois que o gerenciador de segredos estiver configurado, determine como você se autenticará nele. No AWS, é recomendável que você use a função de instância EC2 associada ao Openflow, pois dessa forma nenhum outro segredo precisa ser mantido.

    2. No Openflow, configure um provedor de parâmetros associado a esse gerenciador de segredos, no menu de três traços no canto superior direito. Navegue até Controller Settings » Parameter Provider e, em seguida, busque os valores dos parâmetros.

    3. Nesse momento, todas as credenciais podem ser referenciadas com os caminhos de parâmetros associados e nenhum valor sensível precisa ser mantido no Openflow.

  7. Designar um warehouse para o conector usar. Um conector pode replicar uma única tabela para um único tópico Kafka. Para esse tipo de processamento, você pode selecionar o menor warehouse.

Configuração do conector

Como engenheiro de dados, execute as seguintes tarefas para instalar e configurar um conector:

  1. Navegue até a página Visão geral do Openflow. Na seção Featured connectors, selecione View more connectors.

  2. Na página de conectores do Openflow, localize e escolha o conector, dependendo do tipo de instância do agente Kafka com a qual o conector deve se comunicar.

    • Versão de mTLS: escolha esse conector se estiver usando o protocolo de segurança SSL (TLS mútuo) ou se estiver usando o protocolo SASL_SSL e se conectando ao corretor que usa certificados autoassinados.

    • Versão de SASL: escolha esse conector se você estiver usando qualquer outro protocolo de segurança

  3. Selecione Add to runtime.

  4. Na caixa de diálogo Select runtime, selecione seu tempo de execução na lista suspensa Available runtimes.

  5. Selecione Add.

  6. Autentique-se na implementação com as credenciais de sua conta Snowflake e selecione Allow quando solicitado para permitir que o aplicativo de tempo de execução acesse sua conta Snowflake. O processo de instalação do conector leva alguns minutos para ser concluído.

  7. Autentique-se no tempo de execução com as credenciais de sua conta Snowflake.

    A tela do Openflow é exibida com o grupo de processos do conector adicionado a ela.

  8. Clique com o botão direito do mouse no grupo de processos importado e selecione Parameters.

  9. Preencha os valores dos parâmetros necessários conforme descrito em Parâmetros de fluxo.

Parâmetros de fluxo

Esta seção descreve os parâmetros de fluxo que podem ser configurados com base nos contextos de parâmetros a seguir:

Parâmetros de origem do coletor Kafka

Parâmetro

Descrição

Obrigatório

Identificador de conta Snowflake

Nome da conta Snowflake formatado como [organization-name]-[account-name] onde os dados serão mantidos. Exemplo: example.snowflakecomputing.com

Sim

Estratégia de autenticação Snowflake

Estratégia de autenticação para o Snowflake. Possíveis valores:

  • SNOWFLAKE_SESSION_TOKEN: quando você estiver usando o conector no SPCS

  • KEY_PAIR: quando você deseja configurar o acesso usando uma chave privada

Sim

Banco de dados de origem

Banco de dados de origem. Esse banco de dados deve conter o objeto Snowflake Stream que será consumido.

Sim

Senha de chave privada Snowflake

A senha associada à chave privada do Snowflake. Ela não é necessária quando a chave privada usada não é protegida por uma senha.

Não

Função Snowflake

Função Snowflake usada durante a execução da consulta

Sim

Nome de usuário do Snowflake

Nome de usuário usado para se conectar à instância Snowflake

Sim

Warehouse Snowflake

Warehouse Snowflake usado para executar consultas

Sim

Chave privada Snowflake

A chave privada RSA utilizada para autenticação. A chave RSA deve ser formatada de acordo com os padrões PKCS8 e ter cabeçalhos e rodapés no padrão PEM. Observe que o arquivo de chave privada Snowflake ou a chave privada Snowflake devem ser definidos.

Sim

Arquivo de chave privada Snowflake

O arquivo que contém a chave privada RSA usada para autenticação no Snowflake, formatado de acordo com os padrões PKCS8 e com cabeçalhos e rodapés no padrão PEM. A linha do cabeçalho começa com -----BEGIN PRIVATE. Marque a caixa de seleção Reference asset para carregar o arquivo de chave privada.

Não

Esquema de origem

O esquema de origem. Esse esquema deve conter o objeto Snowflake Stream que será consumido.

Sim

Parâmetros de destino do coletor Kafka

Parâmetro

Descrição

Obrigatório

Servidores de bootstrap Kafka

Uma lista separada por vírgulas de corretores Kafka para os quais enviar dados.

Sim

Mecanismo Kafka SASL

Mecanismo SASL usado para autenticação. Corresponde à propriedade sasl.mechanism do cliente Kafka. Possíveis valores:

  • PLAIN

  • SCRAM-SHA-256

  • SCRAM-SHA-512

  • AWS_MSK_IAM

Sim

Nome de usuário Kafka SASL

O nome de usuário para autenticação no Kafka

Sim

Senha Kafka SASL

A senha para autenticação no Kafka

Sim

Protocolo de segurança do Kafka

Protocolo de segurança usado para se comunicar com os corretores. Corresponde à propriedade security.protocol do cliente Kafka. Possíveis valores:

  • PLAINTEXT

  • SASL_PLAINTEXT

  • SASL_SSL

  • SSL

Sim

Tópico Kafka

O tópico Kafka para onde as CDCs do Snowflake Stream serão enviadas

Sim

Campo-chave da mensagem Kafka

Especifique o nome da coluna do banco de dados que será usada como chave de mensagem do Kafka. Se não for especificado, a chave de mensagem não será definida. Se especificado, o valor dessa coluna será usado como uma chave de mensagem. O valor desse parâmetro diferencia maiúsculas de minúsculas.

Não

Nome de arquivo do keystore do Kafka

Um caminho completo para um keystore que armazena uma chave de cliente e um certificado para o método de autenticação mTLS. Necessário para a autenticação mTLS e quando o protocolo de segurança for SSL.

Não

Tipo de keystore Kafka

O tipo de keystore. Necessário para a autenticação mTLS. Possíveis valores:

  • PKCS12

  • JKS

  • BCFKS

Não

Senha do keystore do Kafka

A senha usada para proteger o arquivo do keystore.

Não

Senha da chave Kafka

Uma senha para a chave privada armazenada no keystore. Necessário para a autenticação mTLS.

Não

Nome de arquivo do truststore Kafka

Um caminho completo para um truststore que armazena certificados de corretor. O cliente usará o certificado desse truststore para verificar a identidade do corretor.

Não

Tipo de truststore do Kafka

O tipo de arquivo truststore. Possíveis valores:

  • PKCS12

  • JKS

  • BCFKS

Não

Senha do truststore Kafka

Uma senha para o arquivo truststore.

Não

Parâmetros de ingestão do coletor Kafka

Parâmetro

Descrição

Obrigatório

FQN Snowflake Stream

Nome do fluxo Snowflake totalmente qualificado.

Sim

Execute o fluxo

  1. Clique com o botão direito do mouse no plano e selecione Enable all Controller Services.

  2. Clique com o botão direito do mouse no grupo de processos importado e selecione Start. O conector inicia a ingestão de dados.