Configure o Openflow Connector for Kinesis

Nota

O conector está sujeito aos termos do conector.

Este tópico descreve as etapas para configurar o Openflow Connector for Kinesis.

Pré-requisitos

  1. Certifique-se de ter revisado Sobre a Openflow Connector for Kinesis.

  2. Certifique-se de ter configurado o Openflow.

Configure um fluxo Kinesis

Como administrador do AWS, execute as seguintes ações em sua conta AWS:

  1. Certifique-se de que você tenha uma conta AWS com permissões de IAM para acessar o Kinesis Streams e DynamoDB.

  2. Opcionalmente, crie um fluxo Kinesis de fila de mensagens não entregues (DLQ). As mensagens que não podem ser analisadas com sucesso podem ser redirecionadas para uma DLQ designada.

Configure a conta Snowflake

Como administrador de conta Snowflake, execute as seguintes tarefas:

  1. Crie uma nova função ou use uma função existente e conceda a Privilégios de banco de dados.

  2. Crie um banco de dados de destino e um esquema de destino que serão usados para criar tabelas de destino para armazenar os dados.

    1. Se você planeja usar o recurso do conector para criar automaticamente a tabela de destino se ela ainda não existir, certifique-se de que o usuário tenha os privilégios necessários para criar e gerenciar objetos Snowflake:

      Objeto

      Privilégio

      Notas

      Banco de dados

      USAGE

      Esquema

      USAGE . CREATE TABLE .

      Após a criação dos objetos de nível de esquema, os privilégios CREATE object podem ser revogados.

      Tabela

      OWNERSHIP

      Necessário somente ao usar o conector Kinesis para ingerir dados em uma tabela existente. . Se o conector criar uma nova tabela de destino para registros do fluxo Kinesis, a função padrão do usuário especificado na configuração se tornará o proprietário da tabela.

      Você pode usar o script a seguir para criar e configurar uma função personalizada (requer SECURITYADMIN ou equivalente):

      USE ROLE SECURITYADMIN;
      
      CREATE ROLE kinesis_connector_role_1;
      GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role_1;
      GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role_1;
      GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role_1;
      
      -- Only for existing tables
      GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE kinesis_connector_role_1;
      
      Copy
  3. Crie um novo usuário de serviço Snowflake com o tipo SERVICE.

  4. Conceda ao usuário do serviço Snowflake a função que você criou nas etapas anteriores.

    GRANT ROLE kinesis_connector_role_1 TO USER kinesis_connector_user_1;
    ALTER USER kinesis_connector_user_1 SET DEFAULT_ROLE = kinesis_connector_role_1;
    
    Copy
  5. Configure com autenticação por pares de chaves para o usuário Snowflake SERVICE a partir da etapa 3.

  6. A Snowflake recomenda enfaticamente essa etapa. Configure um gerenciador de segredos compatível com o Openflow, por exemplo, AWS, Azure e Hashicorp, e armazene as chaves públicas e privadas no armazenamento de segredos.

    Nota

    Se, por algum motivo, você não quiser usar um gerenciador de segredos, será responsável por proteger os arquivos de chave pública e chave privada usados para autenticação de pares de chaves de acordo com as políticas de segurança de sua organização.

    1. Depois que o gerenciador de segredos estiver configurado, determine como você se autenticará nele. No AWS, é recomendável que você use a função de instância EC2 associada ao Openflow, pois dessa forma nenhum outro segredo precisa ser mantido.

    2. No Openflow, configure um provedor de parâmetros associado a esse gerenciador de segredos, no menu de três traços no canto superior direito. Navegue até Controller Settings » Parameter Provider e, em seguida, busque os valores dos parâmetros.

    3. Nesse momento, todas as credenciais podem ser referenciadas com os caminhos de parâmetros associados e nenhum valor sensível precisa ser mantido no Openflow.

  7. Se qualquer outro usuário Snowflake precisar de acesso aos documentos e tabelas brutos ingeridos pelo conector (por exemplo, para processamento personalizado no Snowflake), conceda a esses usuários a função criada na etapa 1.

  8. Designar um warehouse para o conector usar. Comece com o menor tamanho de warehouse e, em seguida, experimente o tamanho, dependendo do número de tabelas sendo replicadas e da quantidade de dados transferidos. Os números de tabelas grandes normalmente são mais bem dimensionados com warehouses multicluster, em vez de warehouses maiores.

Configuração do conector

Como engenheiro de dados, execute as seguintes tarefas para instalar e configurar o conector:

Instalação do conector

  1. Navegue até a página Visão geral do Openflow. Na seção Featured connectors, selecione View more connectors.

  2. Na página de conectores do Openflow, localize o conector e selecione Add to runtime.

  3. Na caixa de diálogo Select runtime, selecione seu tempo de execução na lista suspensa Available runtimes.

  4. Selecione Add.

    Nota

    Antes de instalar o conector, verifique se você criou um banco de dados e um esquema no Snowflake para que o conector armazene os dados ingeridos.

  5. Autentique-se na implementação com as credenciais de sua conta Snowflake e selecione Allow quando solicitado para permitir que o aplicativo de tempo de execução acesse sua conta Snowflake. O processo de instalação do conector leva alguns minutos para ser concluído.

  6. Autentique-se no tempo de execução com as credenciais de sua conta Snowflake.

A tela do Openflow é exibida com o grupo de processos do conector adicionado a ela.

Configuração do conector

  1. Clique com o botão direito do mouse no grupo de processos importado e selecione Parameters.

  2. Preencha os valores dos parâmetros necessários conforme descrito em Parâmetros de fluxo.

Parâmetros de fluxo

Esta seção descreve os parâmetros de fluxo que podem ser configurados com base nos contextos de parâmetros a seguir:

Parâmetros de origem do Kinesis

Parâmetro

Descrição

Código da região AWS

A região AWS onde seu Kinesis Stream está localizado, por exemplo, us-west-2.

ID da chave de acesso AWS

O ID da chave de acesso AWS para se conectar ao seu Kinesis Stream e DynamoDB.

Chave de acesso secreta AWS

A chave de acesso secreta AWS para se conectar ao seu Kinesis Stream e DynamoDB.

URL do registro de esquema

URL do registro de esquema AVRO. Isso é necessário se o parâmetro Estratégia de acesso ao esquema AVRO estiver definido como schema-reference-reader.

Tipo de autenticação do registro de esquema

O tipo de autenticação usado pelo registro de esquema AVRO. Isso é necessário se o parâmetro Estratégia de acesso ao esquema AVRO estiver definido como schema-reference-reader.

Os valores possíveis são:
  • NONE: nenhuma autenticação usada

  • BASIC: método de autenticação de nome de usuário/senha usado

Nome de usuário do registro de esquema

O nome de usuário usado para autenticação BASIC no registro de esquema AVRO. Isso é necessário se o parâmetro Tipo de autenticação do registro de esquema estiver definido como BASIC.

Senha do registro de esquema

A senha usada para autenticação BASIC no registro de esquema AVRO. Isso é necessário se o parâmetro Tipo de autenticação do registro de esquema estiver definido como BASIC.

Parâmetros de destino do Kinesis

Parâmetro

Descrição

Banco de dados de destino

O banco de dados onde os dados serão mantidos. Ele já deve existir no Snowflake.

Esquema de destino

O esquema em que os dados serão mantidos. Ele já deve existir no Snowflake. Esse parâmetro diferencia maiúsculas de minúsculas.

Identificador de conta Snowflake

Nome da conta Snowflake formatado como [organization-name]-[account-name] onde os dados serão mantidos.

Estratégia de autenticação Snowflake

Estratégia de autenticação para o Snowflake. Valores possíveis: SNOWFLAKE_SESSION_TOKEN quando você estiver executando o fluxo em SPCS, e KEY_PAIR quando quiser configurar o acesso usando uma chave privada.

Chave privada Snowflake

A chave privada RSA utilizada para autenticação. A chave RSA deve ser formatada de acordo com os padrões PKCS8 e ter cabeçalhos e rodapés no padrão PEM. Observe que o arquivo de chave privada Snowflake ou a chave privada Snowflake devem ser definidos.

Arquivo de chave privada Snowflake

O arquivo que contém a chave privada RSA usada para autenticação no Snowflake, formatado de acordo com os padrões PKCS8 e com cabeçalhos e rodapés no padrão PEM. A linha do cabeçalho começa com -----BEGIN PRIVATE. Marque a caixa de seleção Reference asset para carregar o arquivo de chave privada.

Senha de chave privada Snowflake

A senha associada ao arquivo de chave privada Snowflake.

Função Snowflake

Função Snowflake usada durante a execução da consulta.

Nome de usuário do Snowflake

Nome de usuário usado para se conectar à instância Snowflake.

Warehouse Snowflake

Warehouse Snowflake usado para executar consultas. Esse parâmetro diferencia maiúsculas de minúsculas.

Parâmetros de ingestão do Kinesis

Parâmetro

Descrição

Nome do aplicativo Kinesis

O nome usado para o nome de tabela DynamoDB para rastrear o progresso do aplicativo no consumo do Kinesis Stream.

Nome do fluxo do Kinesis

Nome do fluxo do Kinesis AWS para consumir dados.

Posição inicial do fluxo do Kinesis

A posição inicial do fluxo a partir da qual os dados começam a ser replicados.

Os valores possíveis são:
  • LATEST: último registro armazenado

  • TRIM_HORIZON: registro armazenado mais antigo

Nome do fluxo DLQ do Kinesis

O nome do fluxo para o qual são enviados todos os registros que falharam no processamento. Se esse parâmetro não for adicionado, você pode esperar um sinal de aviso na parte relacionada à DLQ do conector na tela do Openflow.

Formato de mensagem

O formato das mensagens no Kinesis.

Os valores possíveis são:
  • JSON: JSON é um formato de mensagem que pode ser lido por humanos e cujo esquema pode ser inferido a partir da própria mensagem.

  • AVRO: AVRO é um formato de mensagem que precisa de um esquema para acessar os dados da mensagem.

Estratégia de acesso ao esquema AVRO

Para acessar os dados no formato de mensagem AVRO, o esquema é necessário. Esse parâmetro define a estratégia para acessar o esquema AVRO de uma determinada mensagem. Se o parâmetro Formato da mensagem for definido como AVRO, o esquema será usado.

Possíveis valores:
  • embedded-avro-schema: o esquema está incorporado no próprio registro

  • schema-reference-reader: o esquema é armazenado no Confluent Schema Registry

Mapa de fluxo Kinesis para tabela

Esse parâmetro opcional permite que o usuário especifique quais fluxos devem ser mapeados para quais tabelas. Cada fluxo e seu nome de tabela devem ser separados por dois pontos. Este nome de tabela deve ser um identificador válido do Snowflake, sem aspas. As expressões regulares não podem ser ambíguas e qualquer fluxo correspondente deve corresponder apenas a uma única tabela de destino. Se não for encontrada nenhuma correspondência ou se estiver vazia, o nome do fluxo será usado como nome da tabela.

Exemplos:
  • stream1:low_range,stream2:low_range,stream5:high_range,stream6:high_range

  • stream[0-4]:low_range,stream[5-9]:high_range

Iceberg ativado

Especifica se o processador ingere dados em uma tabela Iceberg. O processador falhará se essa propriedade não corresponder ao tipo de tabela real.

Possíveis valores:
  • true

  • false

Execute o fluxo

  1. Clique com o botão direito do mouse no plano e selecione Enable all Controller Services.

  2. Clique com o botão direito do mouse no grupo de processos importado e selecione Start.

O conector inicia a ingestão de dados.

Esquema

A tabela Snowflake carregada pelo conector contém colunas nomeadas pelas chaves de suas mensagens do Kinesis. Abaixo está o exemplo de uma tabela desse tipo.

Linha

ACCOUNT

SYMBOL

SIDE

QUANTITY

1

ABC123

ZTEST

BUY

3572

2

XYZ789

ZABZX

SELL

3024

3

XYZ789

ZTEST

SELL

799

4

ABC123

ZABZX

BUY

2033

5

ABC123

ZTEST

BUY

1558

Evolução do esquema

Atualmente, quando Iceberg Enabled é definido como false. Se o conector criar a tabela de destino, a evolução do esquema será ativada por padrão. Se você quiser ativar ou desativar a evolução do esquema na tabela existente, use o comando ALTER TABLE para definir o parâmetro ENABLE_SCHEMA_EVOLUTION. Você também deve usar uma função com o privilégio OWNERSHIP na tabela. Para obter mais informações, consulte Evolução do esquema da tabela.

No entanto, se a evolução do esquema estiver desativada para uma tabela existente, o conector tentará enviar as linhas com esquemas incompatíveis para as filas de mensagens não entregues configuradas (DLQ).

Para o caso em que Iceberg Enabled está definido como true, consulte o parágrafo Evolução do esquema para as tabelas Apache Iceberg™.

Usando o Openflow Connector for Kinesis com as tabelas Apache Iceberg™

O Openflow Connector for Kinesis pode ingerir dados em uma tabela Apache Iceberg™ gerenciada pelo Snowflake.

Requisitos e limitações

Antes de configurar o conector para ingestão de tabelas Iceberg, observe os seguintes requisitos e limitações:

  • É necessário criar uma tabela Iceberg antes de executar o conector.

  • Certifique-se de que o usuário tenha acesso à inserção de dados nas tabelas criadas.

  • A evolução do esquema não é compatível com as tabelas Iceberg.

Configuração e instalação

Para configurar o conector para ingestão da tabela Iceberg, siga as instruções em Configure o conector com algumas diferenças descritas nas seções a seguir.

Habilite a ingestão na tabela Iceberg

Para permitir a ingestão em uma tabela Iceberg, você deve definir o parâmetro Iceberg Enabled como true.

Criação de uma tabela Iceberg para ingestão

Antes de executar o conector, é necessário criar uma tabela Iceberg. Como não há suporte para a evolução do esquema, você deve criar a tabela com todos os campos que a mensagem do Kinesis contém.

Ao criar uma tabela Iceberg, é possível usar tipos de dados Iceberg ou tipos Snowflake compatíveis. O tipo VARIANT semiestruturado não é compatível. Em vez disso, use um OBJECT ou MAP estruturado.

Por exemplo, considere a seguinte mensagem:

{
    "id": 1,
    "name": "Steve",
    "body_temperature": 36.6,
    "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
    "animals_possessed":
    {
        "dogs": true,
        "cats": false
    },
    "date_added": "2024-10-15"
}
Copy

Para criar uma tabela Iceberg para a mensagem de exemplo, use a seguinte instrução:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    id INT,
    body_temperature FLOAT,
    name STRING,
    approved_coffee_types ARRAY(STRING),
    animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
    date_added DATE
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

Nota

Os nomes de campo dentro de estruturas aninhadas, como dogs ou cats, diferenciam maiúsculas de minúsculas.

Evolução do esquema para as tabelas Apache Iceberg™

Atualmente, o conector não oferece suporte à evolução do esquema das tabelas Apache Iceberg™.

Problemas conhecidos

  • O grupo de processos do conector tem uma porta de saída denominada “Falha de upload”. Ela pode ser usada para lidar com FlowFiles que não foram carregados com sucesso no Snowflake. Se essa porta não estiver conectada fora do grupo de processos do conector, ela exibirá um sinal de aviso que pode ser ignorado.

  • Todos os processadores, quando parados, podem ser ordenados para funcionar uma vez. O processador ConsumeKinesisStream, devido à arquitetura interna, não fará nenhum trabalho significativo quando solicitado a ser executado uma vez. Para que o processador comece a funcionar, ele precisa ser iniciado e funcionar por cerca de dois minutos.