Configure o Openflow Connector para Kafka

Nota

O conector está sujeito aos Termos do conector Snowflake.

Pré-requisitos

  1. Certifique-se de ter revisado Snowflake Openflow Connector para Kafka.

  2. Certifique-se de ter revisado Configuração do Openflow - BYOC ou Configuração do Openflow – Implantações do Snowflake.

  3. Se você usa o Openflow – Implantações do Snowflake, certifique-se de ter revisado a configuração dos domínios necessários e concedido acesso aos domínios necessários para o conector Kafka. O conector deve poder se conectar a todos os brokers Kafka no cluster.

Configure a conta Snowflake

Como administrador de conta Snowflake, execute as seguintes tarefas:

  1. Crie um novo usuário de serviço Snowflake com o tipo SERVICE.

  2. Crie uma nova função ou use uma função existente e conceda os privilégios de banco de dados.

    O conector exige que um usuário crie a tabela de destino. Certifique-se de que o usuário tenha os privilégios necessários para gerenciar objetos Snowflake:

    Objeto

    Privilégio

    Notas

    Banco de dados

    USAGE

    Esquema

    USAGE

    Tabela

    OWNERSHIP

    Necessário para que o conector faça a ingestão de dados em uma tabela.

    A Snowflake recomenda a criação de um usuário e uma função separados para cada cluster Kafka para ter melhor controle de acesso.

    Você pode usar o script a seguir para criar e configurar uma função personalizada (requer SECURITYADMIN ou equivalente):

    USE ROLE securityadmin;
    CREATE ROLE openflow_kafka_connector_role_1;
    
    GRANT USAGE ON DATABASE kafka_db TO ROLE openflow_kafka_connector_role_1;
    GRANT USAGE ON SCHEMA kafka_schema TO ROLE openflow_kafka_connector_role_1;
    

    Nota

    Os privilégios devem ser concedidos diretamente à função do conector e não podem ser herdados.

  3. Configurar a tabela de destino

    A Snowflake recomenda fortemente usar a evolução de esquema do lado do servidor para alterações de esquema e uma tabela de erros para registro de erros DML. O exemplo abaixo mostra como criar uma tabela e adicionar as permissões OWNERSHIP apropriadas.

    USE ROLE openflow_kafka_connector_role_1;
    
    CREATE TABLE kafka_db.kafka_schema.<DESTINATION_TABLE_NAME> (
      kafkaMetadata variant
    )
    ENABLE_SCHEMA_EVOLUTION = TRUE
    ERROR_LOGGING = TRUE;
    
    USE ROLE securityadmin;
    GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE openflow_kafka_connector_role_1;
    

    O conector é compatível com a detecção e a evolução automáticas de esquema. A estrutura das tabelas no Snowflake é definida e evoluída automaticamente para oferecer suporte à estrutura dos novos dados carregados pelo conector. Ela mapeia automaticamente as chaves de primeiro nível do conteúdo do registro para as colunas da tabela correspondentes por nome (sem distinção entre maiúsculas e minúsculas).

    Quando a evolução de esquema está habilitada, o Snowflake pode expandir automaticamente a tabela de destino adicionando novas colunas que são detectadas no fluxo de entrada e descartando as restrições NOT NULL para acomodar novos padrões de dados. Para obter mais informações, consulte Evolução do esquema de tabela.

    Se ENABLE_SCHEMA_EVOLUTION não estiver habilitada, será necessário criar o esquema manualmente estendendo a definição da tabela. O conector tenta corresponder as chaves de primeiro nível do conteúdo do registro às colunas da tabela por nome. Se as chaves do JSON não corresponderem às colunas da tabela, o conector vai ignorá-las.

  4. (Opcional) Configurar um gerenciador de segredos

    A Snowflake recomenda enfaticamente essa etapa. Configure um gerenciador de segredos compatível com o Openflow, por exemplo, AWS, Azure e Hashicorp, e armazene as chaves públicas e privadas no armazenamento de segredos.

    1. Determine como você se autenticará no gerenciador de segredos depois que ele estiver configurado. Na AWS, a Snowflake recomenda usar a função da instância do EC2 associada ao Openflow; portanto, nenhum outro segredo precisa ser persistente.

    2. Configure um provedor de parâmetros associado a esse gerenciador de segredos no menu de opções do Openflow no canto superior direito. Navegue até Controller Settings > Parameter Provider e busque seus valores de parâmetro.

    3. Referencie todas as credenciais com os caminhos de parâmetros associados para que nenhum valor confidencial precise ser persistente no Openflow.

  5. Conceder acesso a usuários

    Para outros usuários do Snowflake que precisem de acesso aos dados brutos ingeridos pelo conector (por exemplo, para processamento personalizado no Snowflake), conceda a eles a função criada na etapa 1.

Configuração do conector

Como engenheiro de dados, execute as seguintes tarefas para instalar e configurar o conector:

Instalação do conector

Para instalar o conector, faça o seguinte:

  1. Navegue até a página de visão geral do Openflow. Na seção Featured connectors, selecione View more connectors.

  2. Na página de conectores do Openflow, localize o conector e selecione Add to runtime.

  3. Na caixa de diálogo Select runtime, selecione o tempo de execução na lista suspensa Available runtimes e selecione Add.

    Nota

    Antes de instalar o conector, verifique se você criou um banco de dados, um esquema e uma tabela no Snowflake para que o conector armazene os dados ingeridos.

  4. Autentique-se na implantação com as credenciais da sua conta Snowflake e selecione Allow quando solicitado para permitir que o aplicativo de tempo de execução acesse sua conta Snowflake. O processo de instalação do conector leva alguns minutos para ser concluído.

  5. Autentique-se no tempo de execução com as credenciais de sua conta Snowflake.

    A tela do Openflow é exibida com o grupo de processos do conector adicionado a ela.

Configuração do conector

  1. Se necessário, personalize a configuração do conector antes de configurar os parâmetros internos.

  2. Preencher os parâmetros do grupo de processos

    1. Clique com o botão direito do mouse no grupo de processos importado e selecione Parameters.

    2. Preencha os valores dos parâmetros necessários.

Parâmetros

A tabela a seguir descreve os parâmetros do Openflow Connector para Kafka:

Parâmetro

Descrição

Obrigatório

Redefinição automática de deslocamento do Kafka

A configuração automática de deslocamento é aplicada quando não é encontrado nenhum deslocamento anterior do consumidor correspondente à propriedade Kafka auto.offset.reset.

Valores possíveis: earliest: redefinir automaticamente o deslocamento como o anterior, latest: redefinir automaticamente o deslocamento como o mais recente, none: emitir uma exceção ao consumidor se nenhum deslocamento anterior for encontrado para o grupo de consumidores.

Padrão: latest

Sim

Servidores de bootstrap Kafka

Uma lista de servidores de bootstrap Kafka separados por vírgula, que deve conter uma porta, por exemplo kafka-broker:9092.

Sim

ID do grupo de consumidores Kafka

O ID de um grupo de consumidores usado pelo conector. Pode ser arbitrário, mas deve ser exclusivo.

Sim

Senha Kafka SASL

Senha fornecida com a senha configurada ao usar o mecanismo SCRAM SASL512

Nome de usuário Kafka SASL

Nome de usuário fornecido com a senha configurada ao usar o mecanismo SCRAM SASL512

Formato do tópico Kafka

Um destes: nomes/padrão. Especifica se os “tópicos Kafka” fornecidos são uma lista de nomes separados por vírgula ou uma única expressão regular.

Sim

Tópicos Kafka

Uma lista separada por vírgulas de tópicos Kafka ou uma expressão regular.

Sim

Banco de dados de destino do Snowflake

O banco de dados em que os dados persistem. Ele já deve existir no Snowflake. O nome diferencia maiúsculas de minúsculas. Para identificadores sem aspas, forneça o nome em maiúsculas.

Sim

Esquema de destino do Snowflake

O esquema em que os dados serão persistentes, que já deve existir no Snowflake. O nome diferencia maiúsculas de minúsculas. Para identificadores sem aspas, forneça o nome em maiúsculas.

Veja os exemplos a seguir:

CREATE SCHEMA SCHEMA_NAME ou CREATE SCHEMA schema_name: use SCHEMA_NAME.

CREATE SCHEMA "schema_name" ou CREATE SCHEMA "SCHEMA_NAME": use schema_name ou SCHEMA_NAME, respectivamente.

Sim

Tabela de destino do Snowflake

A tabela em que os dados persistem. Ele já deve existir no Snowflake. O nome diferencia maiúsculas de minúsculas. Para identificadores sem aspas, forneça o nome em maiúsculas.

Sim

Iniciar o conector

  1. Clique com o botão direito do mouse no plano e selecione Enable all Controller Services.

  2. Clique com o botão direito do mouse no plano e selecione Start. O conector inicia a ingestão de dados.

Explicando a coluna KAFKAMETADATA

O conector preenche a estrutura de KAFKAMETADATA com os metadados sobre o registro do Kafka. A estrutura contém as seguintes informações:

Campo

Tipo de dados

Descrição

topic

Cadeia de caracteres

O nome do tópico Kafka que deu origem ao registro.

partição

number

O número da partição dentro do tópico. (Note que esta é a partição do Kafka, não a micropartição do Snowflake).

offset

number

O offset dessa partição.

timestamp

number

Carimbo de data/hora em que o registro foi adicionado ao Kafka.

key

Cadeia de caracteres

Se a mensagem for uma KeyedMessage do Kafka, esta é a chave da mensagem. Para que o conector armazene a chave em RECORD_METADATA, o parâmetro key.converter nas propriedades de configuração do Kafka devem ser definidas como org.apache.kafka.connect.storage.StringConverter; caso contrário, o conector vai ignorar as chaves.

headers

Objeto

Um cabeçalho é um par chave-valor associado ao registro e definido pelo usuário. Cada registro pode ter 0, 1 ou vários cabeçalhos.

Medindo a latência de ingestão

Para rastreamento de alterações, processamento incremental e consultas do Time Travel com base no horário de modificação da linha, é possível usar o recurso ROW_TIMESTAMP.

Para habilitá-lo, execute o seguinte comando na tabela de destino:

ALTER TABLE <DESTINATION_TABLE> SET ROW_TIMESTAMP = TRUE;

Após a habilitação dos carimbos de data/hora de linha, as tabelas expõem a coluna METADATA$ROW_LAST_COMMIT_TIME, que retorna o carimbo de data/hora da última modificação de cada linha.

Para obter mais informações, consulte METADATA$ROW_LAST_COMMIT_TIME.

Nota

O carimbo de data/hora de linha não está disponível para tabelas interativas. Para obter mais informações, consulte Tabelas interativas e warehouses interativos do Snowflake.

Usando o conector com tabelas Apache Iceberg™

O conector pode ingerir dados em tabelas Apache Iceberg™ gerenciadas pelo Snowflake, mas é necessário atender aos seguintes requisitos:

  • Você deve ter recebido o privilégio USAGE no volume externo associado à sua tabela Apache Iceberg™.

  • Você deve criar uma tabela Apache Iceberg™ antes de executar o conector.

Concessão de uso em um volume externo

Por exemplo, se a tabela Iceberg usar o volume externo kafka_external_volume e o conector usar a função openflow_kafka_connector_role, execute a seguinte instrução:

USE ROLE ACCOUNTADMIN;
GRANT USAGE ON EXTERNAL VOLUME kafka_external_volume TO ROLE openflow_kafka_connector_role;

Criar uma tabela Apache Iceberg™ para ingestão

O conector não cria tabelas Iceberg automaticamente e não oferece suporte à evolução de esquema. Antes de executar o conector, você deve criar uma tabela Iceberg manualmente.

Ao criar uma tabela Iceberg, é possível usar tipos de dados Iceberg (incluindo VARIANT) ou tipos compatíveis com o Snowflake.

Por exemplo, considere a seguinte mensagem:

{
  "id": 1,
  "name": "Steve",
  "body_temperature": 36.6,
  "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
  "animals_possessed": {
    "dogs": true,
    "cats": false
  },
  "options": {
    "can_walk": true,
    "can_talk": false
  },
  "date_added": "2024-10-15"
}

Para criar uma tabela Iceberg para a mensagem de exemplo, use uma das seguintes instruções:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
  kafkaMetadata OBJECT(
    topic STRING,
    partition INTEGER,
    offset INTEGER,
    key STRING,
    headers variant,
    timestamp INTEGER
  ),
  id INT,
  name string,
  body_temperature float,
  approved_coffee_types array(string),
  animals_possessed variant,
  date_added date,
  options object(can_walk boolean, can_talk boolean)
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table'
ICEBERG_VERSION = 3;

Usando o conector com tabelas interativas

As tabelas interativas são um tipo especial de tabela do Snowflake otimizada para consultas de baixa latência e alta simultaneidade. Para obter mais informações, consulte Tabelas interativas e warehouses interativos do Snowflake.

  1. Crie uma tabela interativa:

    CREATE INTERACTIVE TABLE REALTIME_METRICS (
      metric_name VARCHAR,
      metric_value NUMBER,
      source_topic VARCHAR,
      timestamp TIMESTAMP_NTZ
    ) AS (SELECT
          $1:M_NAME::VARCHAR,
          $1:M_VALUE::NUMBER,
          $1:RECORD_METADATA.topic::VARCHAR,
          $1:RECORD_METADATA.timestamp::TIMESTAMP_NTZ
          from TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
    

Considerações importantes:

  • As tabelas interativas têm limitações e restrições de consulta específicas. Revise Tabelas interativas e warehouses interativos do Snowflake antes de usá-las com o conector.

  • Para tabelas interativas, as transformações necessárias devem ser tratadas na definição da tabela.

  • Warehouses interativos são necessários para consultar tabelas interativas de forma eficiente.

Usando o conector com um esquema definido pelo cliente para a tabela de destino

O conector trata cada registro do Kafka como uma linha a ser inserida em uma tabela do Snowflake. Por exemplo, se você tem um tópico do Kafka com o conteúdo da mensagem estruturado como neste JSON:

{
  "order_id": 12345,
  "customer_name": "John",
  "order_total": 100.00,
  "isPaid": true
}

Por padrão, você não precisa especificar todos os campos do JSON, graças ao recurso ENABLE_SCHEMA_EVOLUTION = TRUE. Entretanto, se você preferir um esquema estático, ele pode ser criado executando:

CREATE TABLE ORDERS (
  kafkaMetadata OBJECT,
  order_id NUMBER,
  customer_name VARCHAR,
  order_total FLOAT,
  ispaid BOOLEAN
);

Usando o conector com um PIPE definido pelo cliente

Se você criar o próprio canal, poderá definir a lógica de transformação de dados na instrução COPY INTO do canal. Você pode renomear as colunas conforme exigido e converter os tipos de dados conforme necessário. Por exemplo:

CREATE TABLE ORDERS (
  order_id VARCHAR,
  customer_name VARCHAR,
  order_total VARCHAR,
  ispaid VARCHAR
);

CREATE PIPE ORDERS AS
COPY INTO ORDERS
SELECT
  $1:order_id::STRING,
  $1:customer_name,
  $1:order_total::STRING,
  $1:isPaid::STRING
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'));

Quando você define o próprio canal, as colunas da tabela de destino não precisam corresponder às chaves JSON. Você pode renomear as colunas para os nomes desejados e converter os tipos de dados se necessário.

Para ajustar o conector para funcionar com um canal personalizado, execute as seguintes tarefas:

  1. Clique com o botão direito no processador PublishSnowpipeStreaming usado no fluxo de ingestão do Kafka na tela do Openflow.

  2. Selecione Configure no menu de contexto.

  3. Navegue até a guia Properties.

  4. No campo Destination type, escolha Pipe.

  5. No campo Pipe, digite o nome do seu PIPE.

  6. Selecione Apply para salvar a configuração.

Personalizando o tratamento de erros

O tratamento de erros é dividido entre falhas no lado do Openflow e falhas no lado do servidor no serviço Snowpipe Streaming.

  • Erros do Openflow (falhas no lado do cliente): erros, como cargas úteis não analisáveis ou falhas na transformação personalizada, ocorrem antes que os registros cheguem ao Snowflake. Por padrão, esses registros são descartados. É possível processar esses erros no Openflow: use FlowFiles do relacionamento de falha de análise no processador ConsumeKafka.

  • Erros do Snowpipe Streaming (falhas no lado do servidor): erros de registros que chegam ao Snowflake, mas que são incompatíveis com o esquema da tabela de destino (por exemplo, incompatibilidades de tipo), são capturados pela infraestrutura do Snowflake. Quando o registro de erros está habilitado na tabela de destino (error_logging = true), as linhas com falha são automaticamente ingeridas na tabela de erros de destino.

Ajuste de desempenho

Ajuste de desempenho do Openflow Connector para Kafka