Configure o Openflow Connector para Kafka

Nota

O conector está sujeito aos termos do conector.

Pré-requisitos

  1. Certifique-se de ter revisado Openflow Connector para Kafka.

  2. Certifique-se de ter configurado o Openflow.

Tipos de conectores

O Openflow Connector para Kafka está disponível em três configurações diferentes, cada uma otimizada para casos de uso específicos. Você pode baixar essas definições de conector na galeria de conectores:

Apache Kafka para o formato de dados JSON

Conector simplificado para ingestão de mensagens JSON com evolução de esquema e mapeamento de tópico para tabela

Apache Kafka para o formato de dados AVRO

Conector simplificado para ingestão de mensagens AVRO com evolução de esquema e mapeamento de tópico para tabela

Apache Kafka com DLQ e metadados

Conector completo com suporte a fila de mensagens não entregues (DLQ), tratamento de metadados e paridade de recursos com o conector Snowflake para Kafka

Para obter a configuração detalhada de tipos específicos de conectores, consulte:

Qual conector você deve escolher?

Escolha a variante de conector que melhor corresponde ao seu formato de dados, requisitos operacionais e necessidades de recursos:

Escolha o Apache Kafka para o formato de dados JSON ou AVRO quando:

  • Suas mensagens do Kafka estão no formato JSON ou AVRO

  • Você precisa de recursos básicos de evolução de esquema

  • Você deseja uma configuração simples com configuração mínima

  • Você não precisa de tratamento de erros avançado ou funcionalidade de fila de mensagens não entregues

  • Você está configurando uma nova integração e deseja começar rapidamente

Considerações específicas do formato:*

  • **Formato JSON: mais flexível para estruturas de dados variadas, mais fácil de depurar e inspecionar

  • **Formato AVRO: dados fortemente tipados com integração de registro de esquema integrada, melhor para pipelines de dados estruturados

Escolha Apache Kafka com DLQ e metadados quando:

  • Você está migrando do conector Snowflake para Kafka e precisa de paridade de recursos com funcionalidade compatível

  • Você precisa de tratamento de erros robusto e suporte para fila de mensagens não entregues para mensagens com falha

  • Você precisa de metadados detalhados sobre a ingestão de mensagens (carimbos de data/hora, deslocamentos, cabeçalhos)

Considerações sobre a migração

Se você estiver usando o conector Snowflake para Kafka, escolha o conector Apache Kafka com DLQ e metadados para uma experiência de migração sem problemas e compatibilidade de recursos.

Diferenças no tratamento de nomes de campo: o conector Openflow para Kafka trata caracteres especiais em nomes de campo de forma diferente do conector Snowflake para Kafka. Após a migração, o conector Openflow para Kafka pode criar novas colunas Snowflake com nomes diferentes devido a essas diferenças nas convenções de nomenclatura. Para obter informações detalhadas sobre como os nomes de campo são transformados, consulte Mapeamento de nomes de campo e tratamento de caracteres especiais.

Considerações de desempenho

  • Os conectores de formato JSON e AVRO oferecem melhor desempenho para casos de uso simples devido ao design simplificado

  • O conector de metadados e DLQ fornece monitoramento e tratamento de erros mais abrangentes, mas com um uso de recursos ligeiramente maior

Configure a conta Snowflake

Como administrador de conta Snowflake, execute as seguintes tarefas:

  1. Crie um novo usuário de serviço Snowflake com o tipo SERVICE.

  2. Crie uma nova função ou use uma função existente e conceda a Privilégios de banco de dados.

    Como o conector tem a capacidade de criar automaticamente a tabela de destino, caso ela ainda não exista, certifique-se de que o usuário tenha os privilégios necessários para criar e gerenciar objetos Snowflake:

    Objeto

    Privilégio

    Notas

    Banco de dados

    USAGE

    Esquema

    USAGE . CREATE TABLE .

    Após a criação dos objetos de nível de esquema, os privilégios CREATE object podem ser revogados.

    Tabela

    OWNERSHIP

    Necessário somente ao usar o conector Kafka para ingerir dados em uma tabela existente. . Se o conector criar uma tabela nova de destino para registros do tópico Kafka, a função padrão para o usuário especificado na configuração se tornará o proprietário da tabela.

    A Snowflake recomenda a criação de um usuário e uma função separados para cada instância do Kafka para ter melhor controle de acesso.

    Você pode usar o script a seguir para criar e configurar uma função personalizada (requer SECURITYADMIN ou equivalente):

    USE ROLE securityadmin;
    
    CREATE ROLE kafka_connector_role_1;
    GRANT USAGE ON DATABASE kafka_db TO ROLE kafka_connector_role_1;
    GRANT USAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
    GRANT CREATE TABLE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
    
    -- Only for existing tables
    GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE kafka_connector_role_1;
    
    Copy

    Observe que os privilégios devem ser concedidos diretamente à função do conector e não podem ser herdados.

  3. Conceda ao usuário do serviço Snowflake a função que você criou nas etapas anteriores.

    A função deve ser atribuída como a função padrão para o usuário:

    GRANT ROLE kafka_connector_role_1 TO USER kafka_connector_user_1;
    ALTER USER kafka_connector_user_1 SET DEFAULT_ROLE = kafka_connector_role_1;
    
    Copy
  4. Configure com key-pair auth para o usuário SERVICE do Snowflake da etapa 1.

  5. A Snowflake recomenda enfaticamente essa etapa. Configure um gerenciador de segredos compatível com o Openflow, por exemplo, AWS, Azure e Hashicorp, e armazene as chaves públicas e privadas no armazenamento de segredos.

    Nota

    Se, por algum motivo, você não quiser usar um gerenciador de segredos, será responsável por proteger os arquivos de chave pública e chave privada usados para autenticação de pares de chaves de acordo com as políticas de segurança de sua organização.

    1. Depois que o gerenciador de segredos estiver configurado, determine como você se autenticará nele. No AWS, é recomendável que você use a função de instância EC2 associada ao Openflow, pois dessa forma nenhum outro segredo precisa ser mantido.

    2. No Openflow, configure um provedor de parâmetros associado a esse gerenciador de segredos, no menu de três traços no canto superior direito. Navegue até Controller Settings » Parameter Provider e, em seguida, busque os valores dos parâmetros.

    3. Nesse momento, todas as credenciais podem ser referenciadas com os caminhos de parâmetros associados e nenhum valor sensível precisa ser mantido no Openflow.

  6. Se qualquer outro usuário Snowflake precisar de acesso aos documentos e tabelas brutos ingeridos pelo conector (por exemplo, para processamento personalizado no Snowflake), conceda a esses usuários a função criada na etapa 1.

Configuração do conector

Como engenheiro de dados, execute as seguintes tarefas para instalar e configurar o conector:

Instalação do conector

  1. Navegue até a página Visão geral do Openflow. Na seção Featured connectors, selecione View more connectors.

  2. Na página de conectores do Openflow, localize o conector e selecione Add to runtime.

  3. Na caixa de diálogo Select runtime, selecione seu tempo de execução na lista suspensa Available runtimes.

  4. Selecione Add.

    Nota

    Antes de instalar o conector, verifique se você criou um banco de dados e um esquema no Snowflake para que o conector armazene os dados ingeridos.

  5. Autentique-se na implementação com as credenciais de sua conta Snowflake e selecione Allow quando solicitado para permitir que o aplicativo de tempo de execução acesse sua conta Snowflake. O processo de instalação do conector leva alguns minutos para ser concluído.

  6. Autentique-se no tempo de execução com as credenciais de sua conta Snowflake.

A tela do Openflow é exibida com o grupo de processos do conector adicionado a ela.

Configuração do conector

  1. Preencher os parâmetros do grupo de processos

    1. Clique com o botão direito do mouse no grupo de processos importado e selecione Parâmetros.

    2. Preencha os valores de parâmetros necessários, conforme descrito em Parâmetros comuns.

Parâmetros comuns

Todas as variantes do conector Kafka compartilham contextos de parâmetro comuns para conectividade e autenticação básicas.

Parâmetros de destino do Snowflake

Parâmetro

Descrição

Obrigatório

Banco de dados de destino

O banco de dados onde os dados serão mantidos. Ele já deve existir no Snowflake

Sim

Esquema de destino

O esquema onde os dados serão persistidos. Ele já deve existir no Snowflake. Esse parâmetro diferencia maiúsculas de minúsculas. Forneça esse parâmetro em letras maiúsculas, a menos que o esquema tenha sido criado com um nome entre aspas duplas. Em seguida, certifique-se de que o uso de maiúsculas e minúsculas seja idêntico, mas não insira as aspas duplas. Veja os exemplos a seguir:

  • CREATE SCHEMA SCHEMA_NAME ou CREATE SCHEMA schema_name – use SCHEMA_NAME

  • CREATE SCHEMA "schema_name" ou CREATE SCHEMA "SCHEMA_NAME" – use schema_name ou SCHEMA_NAME, respectivamente

Sim

Identificador de conta Snowflake

Nome da conta Snowflake formatado como [organization-name]-[account-name] onde os dados serão mantidos

Sim

Estratégia de autenticação Snowflake

Estratégia de autenticação para o Snowflake. Valores possíveis: SNOWFLAKE_SESSION_TOKEN quando estivermos executando o fluxo em SPCS, e KEY_PAIR quando quisermos definir o acesso usando a chave privada

Sim

Chave privada Snowflake

A chave privada RSA utilizada para autenticação. A chave RSA deve ser formatada de acordo com os padrões PKCS8 e ter cabeçalhos e rodapés no padrão PEM. Observe que o arquivo de chave privada do Snowflake ou a chave privada do Snowflake devem ser definidos

Não

Arquivo de chave privada Snowflake

O arquivo que contém a chave privada RSA usada para autenticação no Snowflake, formatado de acordo com os padrões PKCS8 e com cabeçalhos e rodapés no padrão PEM. A linha do cabeçalho começa com -----BEGIN PRIVATE. Marque a caixa de seleção Reference asset para carregar o arquivo de chave privada.

Não

Senha de chave privada Snowflake

A senha associada ao arquivo de chave privada Snowflake

Não

Função Snowflake

Função Snowflake usada durante a execução da consulta

Sim

Nome de usuário do Snowflake

Nome de usuário usado para se conectar à instância Snowflake

Sim

Parâmetros de origem do Kafka (autenticação SASL)

Parâmetro

Descrição

Obrigatório

Protocolo de segurança do Kafka

Protocolo de segurança usado para se comunicar com os corretores. Corresponde à propriedade security.protocol do cliente Kafka. Uma das seguintes opções: SASL_PLAINTEXT/SASL_SSL

Sim

Mecanismo Kafka SASL

Mecanismo SASL usado para autenticação. Corresponde à propriedade sasl.mechanism do cliente Kafka. Uma das seguintes opções: PLAIN/SCRAM-SHA-256/SCRAM-SHA-512

Sim

Nome de usuário Kafka SASL

O nome de usuário para autenticação no Kafka

Sim

Senha Kafka SASL

A senha para autenticação no Kafka

Sim

Servidores de bootstrap Kafka

Uma lista de corretores Kafka separados por vírgulas para buscar dados, que deve conter a porta, por exemplo, kafka-broker:9092. A mesma instância é usada para o tópico DLQ.

Sim

Parâmetros de ingestão Kafka

Parâmetro

Descrição

Obrigatório

Formato do tópico Kafka

Uma das seguintes opções: nomes/padrão. Especifica se os “tópicos Kafka” fornecidos são uma lista de nomes separados por vírgula ou uma única expressão regular.

Sim

Tópicos Kafka

Uma lista separada por vírgulas de tópicos Kafka ou uma expressão regular.

Sim

ID do grupo Kafka

O ID de um grupo de consumidores usado pelo conector. Pode ser arbitrário, mas deve ser exclusivo.

Sim

Redefinição automática de deslocamento do Kafka

A configuração automática de deslocamento é aplicada quando não é encontrado nenhum deslocamento anterior do consumidor correspondente à propriedade Kafka auto.offset.reset. Uma das seguintes opções: earliest/latest. Padrão: latest

Sim

Mapa de tópicos para tabelas

Esse parâmetro opcional permite que o usuário especifique quais tópicos devem ser mapeados para quais tabelas. Cada tópico e seu nome de tabela devem ser separados por dois pontos (ver exemplo abaixo). Este nome de tabela deve ser um identificador válido do Snowflake, sem aspas. As expressões regulares não podem ser ambíguas – qualquer tópico correspondente deve corresponder apenas a uma única tabela de destino. Se estiver vazio ou nenhuma correspondência for encontrada, o nome do tópico será usado como nome da tabela. Observação: o mapeamento não pode conter espaços após as vírgulas.

Não

Exemplo de valores de Topic To Table Map:

  • topic1:low_range,topic2:low_range,topic5:high_range,topic6:high_range

  • topic[0-4]:low_range,topic[5-9]:high_range

  • .*:destination_table – mapeia todos os tópicos para destination_table

Configure as definições específicas de variante

Após configurar os parâmetros comuns, você precisa definir as configurações específicas da variante de conector escolhida:

Para os conectores Apache Kafka para o formato de dados JSON e Apache Kafka para o formato de dados AVRO:

Consulte Apache Kafka para o formato de dados JSON/AVRO para obter parâmetros específicos de JSON/AVRO.

Para Apache Kafka com DLQ e conector de metadados:

Consulte Apache Kafka com DLQ e metadados para obter parâmetros avançados, incluindo a configuração de DLQ, configurações de esquematização, suporte à tabela Iceberg e opções de formato de mensagem.

Autenticação

Todas as variantes do conector oferecem suporte à autenticação SASL configurada por meio de contextos de parâmetros, conforme descrito em Parâmetros de origem do Kafka (autenticação SASL).

Para outros métodos de autenticação, incluindo mTLS e AWS MSK IAM, consulte Configure outros métodos de autenticação para o Openflow Connector para Kafka.

Execute o fluxo

  1. Clique com o botão direito do mouse no plano e clique em Ativar todos os Controller Services.

  2. Clique com o botão direito do mouse no plano e clique em Iniciar. O conector inicia a ingestão de dados.