Configuração de Openflow Connector for Amazon Kinesis Data Streams

Nota

O conector está sujeito aos Termos do conector Snowflake.

Este tópico descreve como configurar o Openflow Connector for Amazon Kinesis Data Streams.

O Openflow Connector for Amazon Kinesis Data Streams foi projetado para ingestão de mensagens JSON dos fluxos do Kinesis para tabelas do Snowflake, com recursos de evolução de esquema.

Configurar o Openflow Connector para Kinesis

Pré-requisitos

  1. Consulte Openflow Connector for Amazon Kinesis Data Streams.

  2. Certifique-se de ter revisado Configuração do Openflow - BYOC ou Configuração do Openflow – Implantações do Snowflake.

  3. Se você usa o Openflow – Implantações do Snowflake, certifique-se de ter revisado a configuração dos domínios necessários e concedido acesso aos domínios necessários para o conector Kinesis.

Configurar funções e políticas do IAM na AWS

Como administrador do AWS, execute as seguintes ações em sua conta AWS:

  1. Crie um usuário ou função do IAM na AWS que o Openflow usará para acessar o fluxo de dados do Kinesis. Para obter mais informações, consulte como criar usuários do IAM na documentação da AWS.

  2. Garanta que o usuário da AWS tenha credenciais de chave de acesso configuradas.

  3. Conceda ao usuário da AWS as seguintes permissões do IAM:

    Serviço

    Ações

    Recursos (ARNs)

    Objetivo

    Amazon Kinesis Data Streams

    kinesis:DescribeStream, kinesis:DescribeStreamConsumer, kinesis:GetRecords, kinesis:GetShardIterator, kinesis:ListShards, kinesis:RegisterStreamConsumer

    arn:aws:kinesis:${REGION}:${ACCOUNT_ID}:stream/${STREAM_NAME}

    Descobre fragmentos, lê registros por meio da sondagem de taxa de transferência compartilhada, resolve o ARN do fluxo, registra um consumidor de fan-out aprimorado e sonda o status do consumidor durante o registro.

    Amazon Kinesis Data Streams

    kinesis:DeregisterStreamConsumer, kinesis:DescribeStreamConsumer, kinesis:SubscribeToShard

    arn:aws:kinesis:${REGION}:${ACCOUNT_ID}:stream/${STREAM_NAME}/consumer/*

    Descreve, assina e cancela o registro de consumidores de fan-out aprimorado por ARN de consumidor.

    Amazon DynamoDB

    dynamodb:CreateTable, dynamodb:DeleteTable, dynamodb:DescribeTable, dynamodb:GetItem, dynamodb:PutItem, dynamodb:Query, dynamodb:Scan, dynamodb:UpdateItem

    arn:aws:dynamodb:${REGION}:${ACCOUNT_ID}:table/${APPLICATION_NAME}, arn:aws:dynamodb:${REGION}:${ACCOUNT_ID}:table/${APPLICATION_NAME}_migration

    Cria e gerencia a tabela de ponto de verificação/concessão (concessão de fragmentos, pulsações de nós, pontos de verificação) e uma tabela de migração temporária usada durante a migração única de tabelas de ponto de verificação legadas.

    Exemplo de política do IAM:

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "KinesisStreamAccess",
                "Effect": "Allow",
                "Action": [
                    "kinesis:DescribeStream",
                    "kinesis:DescribeStreamConsumer",
                    "kinesis:GetRecords",
                    "kinesis:GetShardIterator",
                    "kinesis:ListShards",
                    "kinesis:RegisterStreamConsumer"
                ],
                "Resource": "arn:aws:kinesis:${REGION}:${ACCOUNT_ID}:stream/${STREAM_NAME}"
            },
            {
                "Sid": "KinesisConsumerAccess",
                "Effect": "Allow",
                "Action": [
                    "kinesis:DeregisterStreamConsumer",
                    "kinesis:DescribeStreamConsumer",
                    "kinesis:SubscribeToShard"
                ],
                "Resource": "arn:aws:kinesis:${REGION}:${ACCOUNT_ID}:stream/${STREAM_NAME}/consumer/*"
            },
            {
                "Sid": "DynamoDBTableAccess",
                "Effect": "Allow",
                "Action": [
                    "dynamodb:CreateTable",
                    "dynamodb:DeleteTable",
                    "dynamodb:DescribeTable",
                    "dynamodb:GetItem",
                    "dynamodb:PutItem",
                    "dynamodb:Query",
                    "dynamodb:Scan",
                    "dynamodb:UpdateItem"
                ],
                "Resource": [
                    "arn:aws:dynamodb:${REGION}:${ACCOUNT_ID}:table/${APPLICATION_NAME}",
                    "arn:aws:dynamodb:${REGION}:${ACCOUNT_ID}:table/${APPLICATION_NAME}_migration"
                ]
            }
        ]
    }
    

    Antes de usar a política de exemplo, substitua os seguintes espaços reservados:

    Espaço reservado

    Descrição

    ${REGION}

    Sua região AWS (por exemplo, us-east-1)

    ${ACCOUNT_ID}

    O ID da sua conta AWS (por exemplo, 123456789012)

    ${STREAM_NAME}

    O valor do parâmetro de conector AWS Kinesis Stream Name

    ${APPLICATION_NAME}

    O valor do parâmetro de conector AWS Kinesis Application Name. Usado como o nome da tabela de ponto de verificação do DynamoDB e como o nome do consumidor registrado de fan-out aprimorado.

    Nota

    • ${APPLICATION_NAME}_migration é uma tabela do DynamoDB temporária criada somente durante uma migração única das tabelas de ponto de verificação legadas para o novo esquema. Ela é excluída automaticamente quando a migração é concluída. Se a sua implantação nunca usou o conector baseado em KCL legado, você pode omitir o ARN da tabela de migração da política.

    • A ação dynamodb:DeleteTable é usada durante o processo de migração e pode ser removida da política depois que a migração é confirmada como concluída.

    • A ação kinesis:DeregisterStreamConsumer é invocada quando o processador é removido da tela. Se a entidade do IAM não tiver essa permissão, o registro do consumidor deverá ser cancelado manualmente por meio do console ou da CLI da AWS.

Configure a conta Snowflake

Como administrador de conta Snowflake, execute as seguintes tarefas:

  1. Crie um novo usuário de serviço Snowflake com o tipo SERVICE.

  2. Crie uma nova função ou use uma função existente e conceda os privilégios de banco de dados.

    O conector exige que o usuário crie a tabela de destino. Certifique-se de que o usuário tenha os privilégios necessários para gerenciar objetos Snowflake:

    Objeto

    Privilégio

    Notas

    Banco de dados

    USAGE

    Esquema

    USAGE

    Tabela

    OWNERSHIP

    Necessário para que o conector faça a ingestão de dados em uma tabela.

    A Snowflake recomenda a criação de um usuário e uma função separados para cada fluxo do Kinesis para ter melhor controle de acesso.

    Você pode usar o script a seguir para criar e configurar uma função personalizada (requer SECURITYADMIN ou equivalente):

    USE ROLE securityadmin;
    
    CREATE ROLE openflow_kinesis_connector_role_1;
    GRANT USAGE ON DATABASE kinesis_db TO ROLE openflow_kinesis_connector_role_1;
    GRANT USAGE ON SCHEMA kinesis_schema TO ROLE openflow_kinesis_connector_role_1;
    

    Nota

    Os privilégios devem ser concedidos diretamente à função do conector e não podem ser herdados.

  3. Configurar a tabela de destino

    Recomendamos fortemente usar a evolução de esquema do lado do servidor para alterações de esquema e uma tabela de erros para registro de erros DML.

    O exemplo abaixo mostra como criar uma tabela e adicionar permissões OWNERSHIP.

    USE ROLE openflow_kinesis_connector_role_1;
    
    CREATE TABLE kinesis_db.kinesis_schema.<DESTINATION_TABLE_NAME> (
      kinesisMetadata object
    )
    ENABLE_SCHEMA_EVOLUTION = TRUE
    ERROR_LOGGING = TRUE;
    
    USE ROLE securityadmin;
    GRANT OWNERSHIP ON TABLE <DESTINATION_TABLE_NAME> TO ROLE openflow_kinesis_connector_role_1;
    

    Esses conectores são compatíveis com a detecção e a evolução automáticas de esquema. A estrutura das tabelas no Snowflake é definida e evoluída automaticamente para oferecer suporte à estrutura dos novos dados carregados pelo conector. Ela mapeia automaticamente as chaves de primeiro nível do conteúdo do registro para as colunas de tabela correspondentes por nome (sem distinção entre maiúsculas e minúsculas).

    Quando a evolução de esquema está habilitada, o Snowflake pode expandir automaticamente a tabela de destino adicionando novas colunas que são detectadas no fluxo de entrada e descartando as restrições NOT NULL para acomodar novos padrões de dados. Para obter mais informações, consulte Evolução do esquema de tabela.

    Se ENABLE_SCHEMA_EVOLUTION não estiver habilitada, será necessário criar o esquema manualmente estendendo a definição da tabela. O conector tenta corresponder as chaves de primeiro nível do conteúdo do registro às colunas da tabela por nome. Se as chaves do JSON não corresponderem às colunas da tabela, o conector vai ignorá-las.

  4. (Opcional) Configurar um gerenciador de segredos

    A Snowflake recomenda enfaticamente essa etapa. Configure um gerenciador de segredos compatível com o Openflow, por exemplo, AWS, Azure e Hashicorp, e armazene as chaves públicas e privadas no armazenamento de segredos.

    1. Depois que o gerenciador de segredos estiver configurado, determine como você se autenticará nele. Na AWS, é recomendável que você use a função de instância EC2 associada ao Openflow, pois dessa forma nenhum outro segredo precisa ser mantido.

    2. Na tela do Openflow, configure um provedor de parâmetros associado a esse gerenciador de segredos, usando o menu de opções no canto superior direito. Navegue até Controller Settings » Parameter Provider e depois buscar seus valores de parâmetro.

    3. Nesse momento, todas as credenciais podem ser referenciadas com os caminhos de parâmetros associados e nenhum valor sensível precisa ser mantido no Openflow.

  5. Conceder acesso a usuários

    Outros usuários do Snowflake que precisem de acesso aos dados brutos ingeridos pelo conector (por exemplo, para processamento personalizado no Snowflake) devem receber a função criada na etapa 2.

Configuração do conector

Como engenheiro de dados, execute as seguintes tarefas para instalar e configurar o conector:

Instalação do conector

  1. Navegue até a página de visão geral do Openflow. Na seção Featured connectors, selecione View more connectors.

  2. Na página de conectores do Openflow, encontre Openflow connector for Amazon Kinesis Data Streams e selecione Add to runtime.

  3. Na caixa de diálogo Select runtime, selecione o tempo de execução na lista suspensa Available runtimes e clique em Add.

    Nota

    Antes de instalar o conector, verifique se você criou um banco de dados, um esquema e uma tabela no Snowflake para que o conector armazene os dados ingeridos.

  4. Autentique-se na implantação com as credenciais da sua conta Snowflake e selecione Allow quando solicitado para permitir que o aplicativo de tempo de execução acesse sua conta Snowflake. O processo de instalação do conector leva alguns minutos para ser concluído.

  5. Autentique-se no tempo de execução com as credenciais de sua conta Snowflake.

    A tela do Openflow é exibida com o grupo de processos do conector adicionado a ela.

Configuração do conector

  1. Se necessário, personalize a configuração do conector antes de configurar os parâmetros internos.

  2. Preencher os parâmetros do grupo de processos

    1. Clique com o botão direito do mouse no grupo de processos importado e selecione Parameters.

    2. Preencha os valores dos parâmetros necessários.

Parâmetros comuns

Parâmetro

Descrição

Obrigatório

ID da chave de acesso AWS

O AWS da chave de acesso ID para se conectar ao seu Kinesis Stream e DynamoDB.

Sim

Região AWS do Kinesis

A região AWS à qual você deve se conectar. Use o formato de região AWS regular, por exemplo: us-west-2, ap-southeast-1, eu-west-1. Consulte a página de regiões AWS.

Sim

Chave de acesso secreta AWS

A chave de acesso secreta AWS para se conectar ao seu Kinesis Stream e DynamoDB.

Sim

Nome do aplicativo AWS Kinesis

O nome usado como nome da tabela do DynamoDB para rastrear o progresso do aplicativo sobre o consumo de fluxos do Kinesis.

Sim

Tipo de consumidor do AWS Kinesis

A estratégia usada para ler os registros de um fluxo do Kinesis.

Deve ser um dos seguintes valores: SHARED_THROUGHPUT, ENHANCED_FAN_OUT.

Para obter mais informações, consulte as diferenças entre o consumidor de taxa de transferência compartilhada e o consumidor de fan-out aprimorado.

Sim

Posição inicial do fluxo do AWS Kinesis

A posição inicial do fluxo a partir da qual os dados começam a ser replicados. Isso entra em vigor somente durante a ativação inicial para um determinado nome de aplicativo AWS Kinesis.

Os valores possíveis são:

LATEST: último registro armazenado.

TRIM_HORIZON: registro armazenado mais antigo.

Sim

Nome do fluxo do AWS Kinesis

Nome do fluxo do AWS Kinesis do qual consumir os dados.

Sim

Banco de dados de destino do Snowflake

O banco de dados onde os dados serão persistidos. Ele já deve existir no Snowflake. O nome diferencia maiúsculas de minúsculas. Para identificadores sem aspas, forneça o nome em maiúsculas.

Sim

Esquema de destino do Snowflake

O esquema onde os dados serão persistidos, que já deve existir no Snowflake. O nome diferencia maiúsculas de minúsculas. Para identificadores sem aspas, forneça o nome em maiúsculas.

Veja os exemplos a seguir:

CREATE SCHEMA SCHEMA_NAME ou CREATE SCHEMA schema_name: use SCHEMA_NAME.

CREATE SCHEMA "schema_name" ou CREATE SCHEMA "SCHEMA_NAME": use schema_name ou SCHEMA_NAME, respectivamente.

Sim

Tabela de destino do Snowflake

A tabela em que os dados serão mantidos. Ele já deve existir no Snowflake. O nome diferencia maiúsculas de minúsculas. Para identificadores sem aspas, forneça o nome em maiúsculas.

Sim

Iniciar o conector

  1. Clique com o botão direito do mouse no plano e selecione Enable all Controller Services.

  2. Clique com o botão direito no plano e selecione Start. O conector inicia a ingestão de dados.

Explicando a coluna KINESISMETADATA

O conector preenche a estrutura de KINESISMETADATA com os metadados sobre o registro do Kinesis. A estrutura contém as seguintes informações:

Nome do campo

Tipo de campo

Valor de exemplo

Descrição

fluxo

Cadeia de caracteres

stream-name

O nome do fluxo do Kinesis de origem do registro.

shardId

Cadeia de caracteres

shardId-000000000001

O identificador do fragmento no fluxo de origem do registro.

approximateArrival

Cadeia de caracteres

2025-11-05T09:12:15.300

A hora aproximada em que o registro foi inserido no fluxo (formato ISO 8601).

partitionKey

Cadeia de caracteres

key-1234

A chave de partição especificada pelo produtor de dados para o registro.

sequenceNumber

Cadeia de caracteres

123456789

O número de sequência exclusivo atribuído pelos fluxos de dados do Kinesis ao registro no fragmento.

subSequenceNumber

Número

2

O número de subsequência do registro (usado para registros agregados com o mesmo número de sequência).

shardedSequenceNumber

Cadeia de caracteres

12345678900002

Uma combinação do número de sequência e do número de subsequência para o registro.

Medindo a latência de ingestão

Para rastreamento de alterações, processamento incremental e consultas do Time Travel com base no horário de modificação da linha, é possível usar o recurso ROW_TIMESTAMP.

Ele pode ser habilitado executando o seguinte comando na tabela de destino:

ALTER TABLE <DESTINATION_TABLE> SET ROW_TIMESTAMP = TRUE;

Após a habilitação dos carimbos de data/hora de linha, as tabelas expõem a coluna METADATA$ROW_LAST_COMMIT_TIME, que retorna o carimbo de data/hora da última modificação de cada linha.

Para obter mais informações, consulte Carimbos de data/hora de linha.

Nota

O carimbo de data/hora de linha não está disponível para tabelas interativas. Para obter mais informações, consulte Limitações das tabelas interativas.

Usando o conector com tabelas Apache Iceberg™

O conector pode ingerir dados em tabelas Apache Iceberg™ gerenciadas pelo Snowflake, mas é necessário atender aos seguintes requisitos:

  • Você deve ter recebido o privilégio USAGE no volume externo associado à sua tabela Apache Iceberg™.

  • Você deve criar uma tabela Apache Iceberg™ antes de executar o conector.

Concessão de uso em um volume externo

Por exemplo, se a tabela Iceberg usar o volume externo kinesis_external_volume e o conector usar a função openflow_kinesis_connector_role_1, execute a seguinte instrução:

USE ROLE ACCOUNTADMIN;
GRANT USAGE ON EXTERNAL VOLUME kinesis_external_volume TO ROLE openflow_kinesis_connector_role_1;

Criar uma tabela Apache Iceberg™ para ingestão

O conector não cria tabelas Iceberg automaticamente e não oferece suporte à evolução de esquema. Antes de executar o conector, você deve criar uma tabela Iceberg manualmente.

Ao criar uma tabela Iceberg, é possível usar tipos de dados Iceberg (incluindo VARIANT) ou tipos compatíveis com o Snowflake.

Por exemplo, considere a seguinte mensagem:

{
  "id": 1,
  "name": "Steve",
  "body_temperature": 36.6,
  "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
  "animals_possessed": {
    "dogs": true,
    "cats": false
  },
  "options": {
    "can_walk": true,
    "can_talk": false
  },
  "date_added": "2024-10-15"
}

Para criar uma tabela Iceberg para a mensagem de exemplo, use uma das seguintes instruções:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
  kinesisMetadata OBJECT(
    stream STRING,
    shardId STRING,
    approximateArrival STRING,
    partitionKey STRING,
    sequenceNumber STRING,
    subSequenceNumber INTEGER,
    shardedSequenceNumber STRING
  ),
  id INT,
  name string,
  body_temperature float,
  approved_coffee_types array(string),
  animals_possessed variant,
  date_added date,
  options object(can_walk boolean, can_talk boolean)
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table'
ICEBERG_VERSION = 3;

Usando o conector com tabelas interativas

As tabelas interativas são um tipo especial de tabela do Snowflake otimizada para consultas de baixa latência e alta simultaneidade. Você pode saber mais sobre tabelas interativas na documentação de tabelas interativas.

  1. Crie uma tabela interativa:

    CREATE INTERACTIVE TABLE REALTIME_METRICS (
      metric_name VARCHAR,
      metric_value NUMBER,
      source_topic VARCHAR,
      timestamp TIMESTAMP_NTZ
    ) CLUSTER BY (metric_name)
    AS (SELECT
      $1:M_NAME::VARCHAR,
      $1:M_VALUE::NUMBER,
      $1:RECORD_METADATA.topic::VARCHAR,
      $1:RECORD_METADATA.timestamp::TIMESTAMP_NTZ
    from TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
    

Considerações importantes:

  • As tabelas interativas têm limitações e restrições de consulta específicas. Revise a documentação de tabelas interativas antes de usá-las com o conector.

  • Para tabelas interativas, as transformações necessárias devem ser tratadas na definição da tabela.

  • Warehouses interativos são necessários para consultar tabelas interativas de forma eficiente.

Usando o conector com um esquema definido pelo cliente para a tabela de destino

O conector trata cada registro do Kinesis como uma linha a ser inserida em uma tabela do Snowflake. Por exemplo, se você tem um tópico do Kinesis com o conteúdo da mensagem estruturado como neste JSON:

{
  "order_id": 12345,
  "customer_name": "John",
  "order_total": 100.00,
  "isPaid": true
}

Por padrão, você não precisa especificar todos os campos do JSON. A evolução de esquema cuidará disso. Entretanto, se você preferir um esquema estático, ele pode ser criado executando:

CREATE TABLE ORDERS (
  kinesisMetadata OBJECT,
  order_id NUMBER,
  customer_name VARCHAR,
  order_total FLOAT,
  ispaid BOOLEAN
);

Usando o conector com um PIPE definido pelo cliente

Se você criar o próprio canal, poderá definir a lógica de transformação de dados na instrução COPY INTO do canal. Você pode renomear as colunas conforme exigido e converter os tipos de dados conforme necessário. Por exemplo:

CREATE TABLE ORDERS (
  order_id VARCHAR,
  customer_name VARCHAR,
  order_total VARCHAR,
  ispaid VARCHAR
);

CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
    $1:order_id::STRING,
    $1:customer_name,
    $1:order_total::STRING,
    $1:isPaid::STRING
  FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);

Quando você define o próprio canal, as colunas da tabela de destino não têm que corresponder às chaves JSON. Você pode renomear as colunas para os nomes desejados e converter os tipos de dados se necessário.

Para ajustar o conector para funcionar com um canal personalizado, execute as seguintes tarefas:

  1. Clique com o botão direito no processador PublishSnowpipeStreaming usado no fluxo de ingestão do Kinesis na tela do Openflow.

  2. Selecione Configure no menu de contexto.

  3. Navegue até a guia Properties.

  4. No campo Destination type, escolha Pipe.

  5. No campo Pipe, digite o nome do seu canal.

  6. Selecione Apply para salvar a configuração.

Personalizando o tratamento de erros

O tratamento de erros é dividido entre falhas no lado do Openflow e falhas no lado do servidor no serviço Snowpipe Streaming.

  • Erros do Openflow (falhas no lado do cliente): erros, como cargas úteis não analisáveis ou falhas na transformação personalizada, ocorrem antes que os registros cheguem ao Snowflake. Por padrão, esses registros são descartados. É possível processar esses erros no Openflow: use FlowFiles do relacionamento de falha de análise no processador ConsumeKinesis.

  • Erros do Snowpipe Streaming (falhas no lado do servidor): erros de registros que chegam ao Snowflake, mas que são incompatíveis com o esquema da tabela de destino (por exemplo, incompatibilidades de tipo), são capturados pela infraestrutura do Snowflake. Quando o registro de erros está habilitado na tabela de destino (error_logging = true), as linhas com falha são automaticamente ingeridas na tabela de erros de destino.

Próximos passos