Configure o Openflow Connector para Kafka¶
Nota
O conector está sujeito aos Termos do conector Snowflake.
Pré-requisitos¶
Certifique-se de ter revisado Snowflake Openflow Connector para Kafka.
Certifique-se de ter revisado Configuração do Openflow - BYOC ou Configuração do Openflow – Implantações do Snowflake.
Se você usa o Openflow – Implantações do Snowflake, certifique-se de ter revisado a configuração dos domínios necessários e concedido acesso aos domínios necessários para o conector Kafka. O conector deve poder se conectar a todos os brokers Kafka no cluster.
Configure a conta Snowflake¶
Como administrador de conta Snowflake, execute as seguintes tarefas:
Crie um novo usuário de serviço Snowflake com o tipo SERVICE.
Crie uma nova função ou use uma função existente e conceda os privilégios de banco de dados.
O conector exige que um usuário crie a tabela de destino. Certifique-se de que o usuário tenha os privilégios necessários para gerenciar objetos Snowflake:
Objeto
Privilégio
Notas
Banco de dados
USAGE
Esquema
USAGE
Tabela
OWNERSHIP
Necessário para que o conector faça a ingestão de dados em uma tabela.
A Snowflake recomenda a criação de um usuário e uma função separados para cada cluster Kafka para ter melhor controle de acesso.
Você pode usar o script a seguir para criar e configurar uma função personalizada (requer SECURITYADMIN ou equivalente):
Nota
Os privilégios devem ser concedidos diretamente à função do conector e não podem ser herdados.
Configurar a tabela de destino
A Snowflake recomenda fortemente usar a evolução de esquema do lado do servidor para alterações de esquema e uma tabela de erros para registro de erros DML. O exemplo abaixo mostra como criar uma tabela e adicionar as permissões OWNERSHIP apropriadas.
O conector é compatível com a detecção e a evolução automáticas de esquema. A estrutura das tabelas no Snowflake é definida e evoluída automaticamente para oferecer suporte à estrutura dos novos dados carregados pelo conector. Ela mapeia automaticamente as chaves de primeiro nível do conteúdo do registro para as colunas da tabela correspondentes por nome (sem distinção entre maiúsculas e minúsculas).
Quando a evolução de esquema está habilitada, o Snowflake pode expandir automaticamente a tabela de destino adicionando novas colunas que são detectadas no fluxo de entrada e descartando as restrições NOT NULL para acomodar novos padrões de dados. Para obter mais informações, consulte Evolução do esquema de tabela.
Se ENABLE_SCHEMA_EVOLUTION não estiver habilitada, será necessário criar o esquema manualmente estendendo a definição da tabela. O conector tenta corresponder as chaves de primeiro nível do conteúdo do registro às colunas da tabela por nome. Se as chaves do JSON não corresponderem às colunas da tabela, o conector vai ignorá-las.
(Opcional) Configurar um gerenciador de segredos
A Snowflake recomenda enfaticamente essa etapa. Configure um gerenciador de segredos compatível com o Openflow, por exemplo, AWS, Azure e Hashicorp, e armazene as chaves públicas e privadas no armazenamento de segredos.
Determine como você se autenticará no gerenciador de segredos depois que ele estiver configurado. Na AWS, a Snowflake recomenda usar a função da instância do EC2 associada ao Openflow; portanto, nenhum outro segredo precisa ser persistente.
Configure um provedor de parâmetros associado a esse gerenciador de segredos no menu de opções do Openflow no canto superior direito. Navegue até Controller Settings > Parameter Provider e busque seus valores de parâmetro.
Referencie todas as credenciais com os caminhos de parâmetros associados para que nenhum valor confidencial precise ser persistente no Openflow.
Conceder acesso a usuários
Para outros usuários do Snowflake que precisem de acesso aos dados brutos ingeridos pelo conector (por exemplo, para processamento personalizado no Snowflake), conceda a eles a função criada na etapa 1.
Configuração do conector¶
Como engenheiro de dados, execute as seguintes tarefas para instalar e configurar o conector:
Instalação do conector¶
Para instalar o conector, faça o seguinte:
Navegue até a página de visão geral do Openflow. Na seção Featured connectors, selecione View more connectors.
Na página de conectores do Openflow, localize o conector e selecione Add to runtime.
Na caixa de diálogo Select runtime, selecione o tempo de execução na lista suspensa Available runtimes e selecione Add.
Nota
Antes de instalar o conector, verifique se você criou um banco de dados, um esquema e uma tabela no Snowflake para que o conector armazene os dados ingeridos.
Autentique-se na implantação com as credenciais da sua conta Snowflake e selecione Allow quando solicitado para permitir que o aplicativo de tempo de execução acesse sua conta Snowflake. O processo de instalação do conector leva alguns minutos para ser concluído.
Autentique-se no tempo de execução com as credenciais de sua conta Snowflake.
A tela do Openflow é exibida com o grupo de processos do conector adicionado a ela.
Configuração do conector¶
Se necessário, personalize a configuração do conector antes de configurar os parâmetros internos.
Preencher os parâmetros do grupo de processos
Clique com o botão direito do mouse no grupo de processos importado e selecione Parameters.
Preencha os valores dos parâmetros necessários.
Parâmetros¶
A tabela a seguir descreve os parâmetros do Openflow Connector para Kafka:
Parâmetro |
Descrição |
Obrigatório |
|---|---|---|
Redefinição automática de deslocamento do Kafka |
A configuração automática de deslocamento é aplicada quando não é encontrado nenhum deslocamento anterior do consumidor correspondente à propriedade Kafka Valores possíveis: earliest: redefinir automaticamente o deslocamento como o anterior, latest: redefinir automaticamente o deslocamento como o mais recente, none: emitir uma exceção ao consumidor se nenhum deslocamento anterior for encontrado para o grupo de consumidores. Padrão: latest |
Sim |
Servidores de bootstrap Kafka |
Uma lista de servidores de bootstrap Kafka separados por vírgula, que deve conter uma porta, por exemplo |
Sim |
ID do grupo de consumidores Kafka |
O ID de um grupo de consumidores usado pelo conector. Pode ser arbitrário, mas deve ser exclusivo. |
Sim |
Senha Kafka SASL |
Senha fornecida com a senha configurada ao usar o mecanismo SCRAM SASL512 |
|
Nome de usuário Kafka SASL |
Nome de usuário fornecido com a senha configurada ao usar o mecanismo SCRAM SASL512 |
|
Formato do tópico Kafka |
Um destes: nomes/padrão. Especifica se os “tópicos Kafka” fornecidos são uma lista de nomes separados por vírgula ou uma única expressão regular. |
Sim |
Tópicos Kafka |
Uma lista separada por vírgulas de tópicos Kafka ou uma expressão regular. |
Sim |
Banco de dados de destino do Snowflake |
O banco de dados em que os dados persistem. Ele já deve existir no Snowflake. O nome diferencia maiúsculas de minúsculas. Para identificadores sem aspas, forneça o nome em maiúsculas. |
Sim |
Esquema de destino do Snowflake |
O esquema em que os dados serão persistentes, que já deve existir no Snowflake. O nome diferencia maiúsculas de minúsculas. Para identificadores sem aspas, forneça o nome em maiúsculas. Veja os exemplos a seguir:
|
Sim |
Tabela de destino do Snowflake |
A tabela em que os dados persistem. Ele já deve existir no Snowflake. O nome diferencia maiúsculas de minúsculas. Para identificadores sem aspas, forneça o nome em maiúsculas. |
Sim |
Iniciar o conector¶
Clique com o botão direito do mouse no plano e selecione Enable all Controller Services.
Clique com o botão direito do mouse no plano e selecione Start. O conector inicia a ingestão de dados.
Explicando a coluna KAFKAMETADATA¶
O conector preenche a estrutura de KAFKAMETADATA com os metadados sobre o registro do Kafka. A estrutura contém as seguintes informações:
Campo |
Tipo de dados |
Descrição |
|---|---|---|
topic |
Cadeia de caracteres |
O nome do tópico Kafka que deu origem ao registro. |
partição |
number |
O número da partição dentro do tópico. (Note que esta é a partição do Kafka, não a micropartição do Snowflake). |
offset |
number |
O offset dessa partição. |
timestamp |
number |
Carimbo de data/hora em que o registro foi adicionado ao Kafka. |
key |
Cadeia de caracteres |
Se a mensagem for uma KeyedMessage do Kafka, esta é a chave da mensagem. Para que o conector armazene a chave em RECORD_METADATA, o parâmetro |
headers |
Objeto |
Um cabeçalho é um par chave-valor associado ao registro e definido pelo usuário. Cada registro pode ter 0, 1 ou vários cabeçalhos. |
Medindo a latência de ingestão¶
Para rastreamento de alterações, processamento incremental e consultas do Time Travel com base no horário de modificação da linha, é possível usar o recurso ROW_TIMESTAMP.
Para habilitá-lo, execute o seguinte comando na tabela de destino:
Após a habilitação dos carimbos de data/hora de linha, as tabelas expõem a coluna METADATA$ROW_LAST_COMMIT_TIME, que retorna o carimbo de data/hora da última modificação de cada linha.
Para obter mais informações, consulte METADATA$ROW_LAST_COMMIT_TIME.
Nota
O carimbo de data/hora de linha não está disponível para tabelas interativas. Para obter mais informações, consulte Tabelas interativas e warehouses interativos do Snowflake.
Usando o conector com tabelas Apache Iceberg™¶
O conector pode ingerir dados em tabelas Apache Iceberg™ gerenciadas pelo Snowflake, mas é necessário atender aos seguintes requisitos:
Você deve ter recebido o privilégio USAGE no volume externo associado à sua tabela Apache Iceberg™.
Você deve criar uma tabela Apache Iceberg™ antes de executar o conector.
Concessão de uso em um volume externo¶
Por exemplo, se a tabela Iceberg usar o volume externo kafka_external_volume e o conector usar a função openflow_kafka_connector_role, execute a seguinte instrução:
Criar uma tabela Apache Iceberg™ para ingestão¶
O conector não cria tabelas Iceberg automaticamente e não oferece suporte à evolução de esquema. Antes de executar o conector, você deve criar uma tabela Iceberg manualmente.
Ao criar uma tabela Iceberg, é possível usar tipos de dados Iceberg (incluindo VARIANT) ou tipos compatíveis com o Snowflake.
Por exemplo, considere a seguinte mensagem:
Para criar uma tabela Iceberg para a mensagem de exemplo, use uma das seguintes instruções:
Usando o conector com tabelas interativas¶
As tabelas interativas são um tipo especial de tabela do Snowflake otimizada para consultas de baixa latência e alta simultaneidade. Para obter mais informações, consulte Tabelas interativas e warehouses interativos do Snowflake.
Crie uma tabela interativa:
Considerações importantes:
As tabelas interativas têm limitações e restrições de consulta específicas. Revise Tabelas interativas e warehouses interativos do Snowflake antes de usá-las com o conector.
Para tabelas interativas, as transformações necessárias devem ser tratadas na definição da tabela.
Warehouses interativos são necessários para consultar tabelas interativas de forma eficiente.
Usando o conector com um esquema definido pelo cliente para a tabela de destino¶
O conector trata cada registro do Kafka como uma linha a ser inserida em uma tabela do Snowflake. Por exemplo, se você tem um tópico do Kafka com o conteúdo da mensagem estruturado como neste JSON:
Por padrão, você não precisa especificar todos os campos do JSON, graças ao recurso ENABLE_SCHEMA_EVOLUTION = TRUE. Entretanto, se você preferir um esquema estático, ele pode ser criado executando:
Usando o conector com um PIPE definido pelo cliente¶
Se você criar o próprio canal, poderá definir a lógica de transformação de dados na instrução COPY INTO do canal. Você pode renomear as colunas conforme exigido e converter os tipos de dados conforme necessário. Por exemplo:
Quando você define o próprio canal, as colunas da tabela de destino não precisam corresponder às chaves JSON. Você pode renomear as colunas para os nomes desejados e converter os tipos de dados se necessário.
Para ajustar o conector para funcionar com um canal personalizado, execute as seguintes tarefas:
Clique com o botão direito no processador PublishSnowpipeStreaming usado no fluxo de ingestão do Kafka na tela do Openflow.
Selecione Configure no menu de contexto.
Navegue até a guia Properties.
No campo Destination type, escolha Pipe.
No campo Pipe, digite o nome do seu PIPE.
Selecione Apply para salvar a configuração.
Personalizando o tratamento de erros¶
O tratamento de erros é dividido entre falhas no lado do Openflow e falhas no lado do servidor no serviço Snowpipe Streaming.
Erros do Openflow (falhas no lado do cliente): erros, como cargas úteis não analisáveis ou falhas na transformação personalizada, ocorrem antes que os registros cheguem ao Snowflake. Por padrão, esses registros são descartados. É possível processar esses erros no Openflow: use FlowFiles do relacionamento de falha de análise no processador ConsumeKafka.
Erros do Snowpipe Streaming (falhas no lado do servidor): erros de registros que chegam ao Snowflake, mas que são incompatíveis com o esquema da tabela de destino (por exemplo, incompatibilidades de tipo), são capturados pela infraestrutura do Snowflake. Quando o registro de erros está habilitado na tabela de destino (
error_logging = true), as linhas com falha são automaticamente ingeridas na tabela de erros de destino.