Configure o Openflow Connector para Kafka

Nota

O conector está sujeito aos termos do conector.

Pré-requisitos

  1. Certifique-se de ter revisado Sobre a Openflow Connector for Kafka.

  2. Certifique-se de ter seguido as etapas em Configuração do Openflow - BYOC ou Configuração do Openflow - Implantação do Snowflake - Visão geral da tarefa.

Tipos de conectores

O Openflow Connector para Kafka está disponível em três configurações diferentes, cada uma otimizada para casos de uso específicos. Você pode baixar essas definições de conector na galeria de conectores:

Apache Kafka para o formato de dados JSON

Conector simplificado para ingestão de mensagens JSON com evolução de esquema e mapeamento de tópico para tabela

Apache Kafka para o formato de dados AVRO

Conector simplificado para ingestão de mensagens AVRO com evolução de esquema e mapeamento de tópico para tabela

Apache Kafka com DLQ e metadados

Conector completo com suporte a fila de mensagens não entregues (DLQ), tratamento de metadados e paridade de recursos com o conector Snowflake para Kafka

Para obter a configuração detalhada de tipos específicos de conectores, consulte:

Qual conector você deve escolher?

Escolha a variante de conector que melhor corresponde ao seu formato de dados, requisitos operacionais e necessidades de recursos:

Escolha o Apache Kafka para o formato de dados JSON ou AVRO quando:

  • Suas mensagens do Kafka estão no formato JSON ou AVRO

  • Você precisa de recursos básicos de evolução de esquema

  • Você deseja uma configuração simples com configuração mínima

  • Você não precisa de tratamento de erros avançado ou funcionalidade de fila de mensagens não entregues

  • Você está configurando uma nova integração e deseja começar rapidamente

Considerações específicas do formato:*

  • **Formato JSON: mais flexível para estruturas de dados variadas, mais fácil de depurar e inspecionar

  • **Formato AVRO: dados fortemente tipados com integração de registro de esquema integrada, melhor para pipelines de dados estruturados

Escolha Apache Kafka com DLQ e metadados quando:

  • Você está migrando do conector Snowflake para Kafka e precisa de paridade de recursos com funcionalidade compatível

  • Você precisa de tratamento de erros robusto e suporte para fila de mensagens não entregues para mensagens com falha

  • Você precisa de metadados detalhados sobre a ingestão de mensagens (carimbos de data/hora, deslocamentos, cabeçalhos)

Considerações sobre a migração

Se você estiver usando o conector Snowflake para Kafka, escolha o conector Apache Kafka com DLQ e metadados para uma experiência de migração sem problemas e compatibilidade de recursos.

Diferenças no tratamento de nomes de campo: o conector Openflow para Kafka trata caracteres especiais em nomes de campo de forma diferente do conector Snowflake para Kafka. Após a migração, o conector Openflow para Kafka pode criar novas colunas Snowflake com nomes diferentes devido a essas diferenças nas convenções de nomenclatura. Para obter informações detalhadas sobre como os nomes de campo são transformados, consulte Mapeamento de nomes de campo e tratamento de caracteres especiais.

Considerações de desempenho

  • Os conectores de formato JSON e AVRO oferecem melhor desempenho para casos de uso simples devido ao design simplificado

  • O conector de metadados e DLQ fornece monitoramento e tratamento de erros mais abrangentes, mas com um uso de recursos ligeiramente maior

Configure a conta Snowflake

Como administrador de conta Snowflake, execute as seguintes tarefas:

  1. Crie um novo usuário de serviço Snowflake com o tipo SERVICE.

  2. Crie uma nova função ou use uma função existente e conceda a Privilégios de banco de dados.

    Como o conector tem a capacidade de criar automaticamente a tabela de destino, caso ela ainda não exista, certifique-se de que o usuário tenha os privilégios necessários para criar e gerenciar objetos Snowflake:

    Objeto

    Privilégio

    Notas

    Banco de dados

    USAGE

    Esquema

    USAGE . CREATE TABLE .

    Após a criação dos objetos de nível de esquema, os privilégios CREATE object podem ser revogados.

    Tabela

    OWNERSHIP

    Necessário somente ao usar o conector Kafka para ingerir dados em uma tabela existente. . Se o conector criar uma tabela nova de destino para registros do tópico Kafka, a função padrão para o usuário especificado na configuração se tornará o proprietário da tabela.

    A Snowflake recomenda a criação de um usuário e uma função separados para cada instância do Kafka para ter melhor controle de acesso.

    Você pode usar o script a seguir para criar e configurar uma função personalizada (requer SECURITYADMIN ou equivalente):

    USE ROLE securityadmin;
    
    CREATE ROLE kafka_connector_role_1;
    GRANT USAGE ON DATABASE kafka_db TO ROLE kafka_connector_role_1;
    GRANT USAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
    GRANT CREATE TABLE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
    
    -- Only for existing tables
    GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE kafka_connector_role_1;
    
    Copy

    Observe que os privilégios devem ser concedidos diretamente à função do conector e não podem ser herdados.

  3. Conceda ao usuário do serviço Snowflake a função que você criou nas etapas anteriores.

    A função deve ser atribuída como a função padrão para o usuário:

    GRANT ROLE kafka_connector_role_1 TO USER kafka_connector_user_1;
    ALTER USER kafka_connector_user_1 SET DEFAULT_ROLE = kafka_connector_role_1;
    
    Copy
  4. Configure com key-pair auth para o usuário SERVICE do Snowflake da etapa 1.

  5. A Snowflake recomenda enfaticamente essa etapa. Configure um gerenciador de segredos compatível com o Openflow, por exemplo, AWS, Azure e Hashicorp, e armazene as chaves públicas e privadas no armazenamento de segredos.

    Nota

    Se, por algum motivo, você não quiser usar um gerenciador de segredos, será responsável por proteger os arquivos de chave pública e chave privada usados para autenticação de pares de chaves de acordo com as políticas de segurança de sua organização.

    1. Depois que o gerenciador de segredos estiver configurado, determine como você se autenticará nele. No AWS, é recomendável que você use a função de instância EC2 associada ao Openflow, pois dessa forma nenhum outro segredo precisa ser mantido.

    2. No Openflow, configurar um provedor de parâmetros associado a este Secrets Manager, a partir do menu de configuração no canto superior direito. Navegue até Controller Settings » Parameter Provider e depois buscar seus valores de parâmetro.

    3. Nesse momento, todas as credenciais podem ser referenciadas com os caminhos de parâmetros associados e nenhum valor sensível precisa ser mantido no Openflow.

  6. Se qualquer outro usuário Snowflake precisar de acesso aos documentos e tabelas brutos ingeridos pelo conector (por exemplo, para processamento personalizado no Snowflake), conceda a esses usuários a função criada na etapa 1.

Configuração do conector

Como engenheiro de dados, execute as seguintes tarefas para instalar e configurar o conector:

Instalação do conector

  1. Navegue até a página Visão geral do Openflow. Na seção Featured connectors, selecione View more connectors.

  2. Na página de conectores do Openflow, localize o conector e selecione Add to runtime.

  3. Na caixa de diálogo Select runtime, selecione seu tempo de execução na lista suspensa Available runtimes.

  4. Selecione Add.

    Nota

    Antes de instalar o conector, verifique se você criou um banco de dados e um esquema no Snowflake para que o conector armazene os dados ingeridos.

  5. Autentique-se na implementação com as credenciais de sua conta Snowflake e selecione Allow quando solicitado para permitir que o aplicativo de tempo de execução acesse sua conta Snowflake. O processo de instalação do conector leva alguns minutos para ser concluído.

  6. Autentique-se no tempo de execução com as credenciais de sua conta Snowflake.

A tela do Openflow é exibida com o grupo de processos do conector adicionado a ela.

Configuração do conector

  1. Preencher os parâmetros do grupo de processos

    1. Clique com o botão direito do mouse no grupo de processos importado e selecione Parâmetros.

    2. Preencha os valores de parâmetros necessários, conforme descrito em Parâmetros comuns.

Parâmetros comuns

Todas as variantes do conector Kafka compartilham contextos de parâmetro comuns para conectividade e autenticação básicas.

Parâmetros de destino do Snowflake

Parâmetro

Descrição

Obrigatório

Banco de dados de destino

O banco de dados onde os dados serão persistidos. Ele já deve existir no Snowflake. O nome diferencia maiúsculas de minúsculas. Para identificadores sem aspas, forneça o nome em maiúsculas.

Sim

Esquema de destino

O esquema onde os dados serão persistidos, que já deve existir no Snowflake. O nome diferencia maiúsculas de minúsculas. Para identificadores sem aspas, forneça o nome em maiúsculas.

Veja os exemplos a seguir:

  • CREATE SCHEMA SCHEMA_NAME ou CREATE SCHEMA schema_name: use SCHEMA_NAME

  • CREATE SCHEMA "schema_name" ou CREATE SCHEMA "SCHEMA_NAME": use schema_name ou SCHEMA_NAME, respectivamente

Sim

Identificador de conta Snowflake

Ao utilizar:

  • Session Token Authentication Strategy: deve ficar em branco.

  • KEY_PAIR: nome da conta Snowflake formatado como [nome-da-organização]-[nome-da-conta], onde os dados serão persistentes.

Sim

Estratégia de autenticação Snowflake

Ao utilizar:

  • Implantação do Snowflake Openflow: Use SNOWFLAKE_SESSION_TOKEN. Esse token é gerenciado automaticamente pelo Snowflake.

  • BYOC: use KEY_PAIR como o valor para a estratégia de autenticação.

Sim

Chave privada Snowflake

Ao utilizar:

  • Session Token Authentication Strategy: deve ficar em branco.

  • KEY_PAIR: deve ser a chave privada RSA utilizada para a autenticação.

    A chave RSA deve ser formatada de acordo com os padrões PKCS8 e têm os cabeçalhos e rodapés PEM padrão. Observe que ou o arquivo de chave privada do Snowflake ou a chave privada do Snowflake deve ser definido.

Não

Arquivo de chave privada Snowflake

Ao utilizar:

  • Estratégia de autenticação de token de sessão: o arquivo de chave privada deve estar em branco.

  • KEY_PAIR: carregue o arquivo que contém a chave privada RSA usada para autenticação no Snowflake, formatado de acordo com os padrões PKCS8 e incluindo cabeçalhos e rodapés PEM padrão. A linha do cabeçalho começa com -----BEGIN PRIVATE. Para carregar o arquivo de chave privada, marque a caixa de seleção Reference asset.

Não

Senha de chave privada Snowflake

Ao usar

  • Session Token Authentication Strategy: deve ficar em branco.

  • KEY_PAIR: forneça a senha associada ao arquivo de chave privada do Snowflake.

Não

Função Snowflake

Ao usar

  • Estratégia de autenticação de tokens de sessão: use sua função de tempo de execução. Você pode encontrar sua função de tempo de execução na UI do Openflow, navegando para View Details no seu tempo de execução.

  • Estratégia de autenticação de KEY_PAIR: use uma função válida configurada para o usuário do seu serviço.

Sim

Nome de usuário do Snowflake

Ao usar

  • Session Token Authentication Strategy: deve ficar em branco.

  • KEY_PAIR: forneça o nome de usuário usado para se conectar à instância do Snowflake.

Sim

Warehouse Snowflake

Warehouse Snowflake usado para executar consultas.

Sim

Parâmetros de origem do Kafka (autenticação SASL)

Parâmetro

Descrição

Obrigatório

Protocolo de segurança do Kafka

Protocolo de segurança usado para se comunicar com os corretores. Corresponde à propriedade security.protocol do cliente Kafka. Uma das seguintes opções: SASL_PLAINTEXT/SASL_SSL

Sim

Mecanismo Kafka SASL

Mecanismo SASL usado para autenticação. Corresponde à propriedade sasl.mechanism do cliente Kafka. Uma das seguintes opções: PLAIN/SCRAM-SHA-256/SCRAM-SHA-512

Sim

Nome de usuário Kafka SASL

O nome de usuário para autenticação no Kafka

Sim

Senha Kafka SASL

A senha para autenticação no Kafka

Sim

Servidores de bootstrap Kafka

Uma lista de corretores Kafka separados por vírgulas para buscar dados, que deve conter a porta, por exemplo, kafka-broker:9092. A mesma instância é usada para o tópico DLQ.

Sim

Parâmetros de ingestão Kafka

Parâmetro

Descrição

Obrigatório

Formato do tópico Kafka

Uma das seguintes opções: nomes/padrão. Especifica se os “tópicos Kafka” fornecidos são uma lista de nomes separados por vírgula ou uma única expressão regular.

Sim

Tópicos Kafka

Uma lista separada por vírgulas de tópicos Kafka ou uma expressão regular.

Sim

ID do grupo Kafka

O ID de um grupo de consumidores usado pelo conector. Pode ser arbitrário, mas deve ser exclusivo.

Sim

Redefinição automática de deslocamento do Kafka

A configuração automática de deslocamento é aplicada quando não é encontrado nenhum deslocamento anterior do consumidor correspondente à propriedade Kafka auto.offset.reset. Uma das seguintes opções: earliest/latest. Padrão: latest

Sim

Mapa de tópicos para tabelas

Esse parâmetro opcional permite que o usuário especifique quais tópicos devem ser mapeados para quais tabelas. Cada tópico e seu nome de tabela devem ser separados por dois pontos (ver exemplo abaixo). Este nome de tabela deve ser um identificador válido do Snowflake, sem aspas. As expressões regulares não podem ser ambíguas – qualquer tópico correspondente deve corresponder apenas a uma única tabela de destino. Se estiver vazio ou nenhuma correspondência for encontrada, o nome do tópico será usado como nome da tabela. Observação: o mapeamento não pode conter espaços após as vírgulas.

Não

Exemplo de valores de Topic To Table Map:

  • topic1:low_range,topic2:low_range,topic5:high_range,topic6:high_range

  • topic[0-4]:low_range,topic[5-9]:high_range

  • .*:destination_table – mapeia todos os tópicos para destination_table

Configure as definições específicas de variante

Após configurar os parâmetros comuns, você precisa definir as configurações específicas da variante de conector escolhida:

Para os conectores Apache Kafka para o formato de dados JSON e Apache Kafka para o formato de dados AVRO:

Consulte Apache Kafka para o formato de dados JSON/AVRO para obter parâmetros específicos de JSON/AVRO.

Para Apache Kafka com DLQ e conector de metadados:

Consulte Apache Kafka com DLQ e metadados para obter parâmetros avançados, incluindo a configuração de DLQ, configurações de esquematização, suporte à tabela Iceberg e opções de formato de mensagem.

Autenticação

Todas as variantes do conector oferecem suporte à autenticação SASL configurada por meio de contextos de parâmetros, conforme descrito em Parâmetros de origem do Kafka (autenticação SASL).

Para outros métodos de autenticação, incluindo mTLS e AWS MSK IAM, consulte Configure outros métodos de autenticação para o Openflow Connector para Kafka.

Execute o fluxo

  1. Clique com o botão direito do mouse no plano e clique em Ativar todos os Controller Services.

  2. Clique com o botão direito do mouse no plano e clique em Iniciar. O conector inicia a ingestão de dados.