Set up Openflow Connector for Kinesis for JSON data format

Nota

This connector is subject to the Snowflake Connector Terms.

Este tópico descreve as etapas de configuração do Openflow Connector for Kinesis para formato de dados JSON. Trata-se de um conector simplificado otimizado para ingestão básica de mensagens com recursos de evolução do esquema.

O Openflow Connector for Kinesis para formato de dados JSON foi projetado para ingestão simples de mensagens JSON dos fluxos do Kinesis para tabelas do Snowflake.

Pré-requisitos

  1. Certifique-se de ter revisado Sobre a Openflow Connector for Kinesis.

  2. Ensure that you have Configuração do Openflow - BYOC or Set up Openflow - Snowflake Deployments.

  3. Se você usa Openflow - Snowflake Deployments, garanta que já tenha revisado a configuração dos domínios necessários e concedido acesso a esses domínios para o conector Kinesis.

Nota

Se você precisar de suporte para outros formatos de dados ou recursos, como DLQ, entre em contato com seu representante Snowflake.

Configure um fluxo Kinesis

Como administrador do AWS, execute as seguintes ações em sua conta AWS:

  1. Ensure that you have an AWS User with IAM permissions to access Kinesis Streams and DynamoDB.

  2. Ensure that the AWS User has configured Access Key credentials.

Configure a conta Snowflake

Como administrador de conta Snowflake, execute as seguintes tarefas:

  1. Crie uma nova função ou use uma função existente e conceda a Privilégios de banco de dados.

  2. Crie um banco de dados de destino e um esquema de destino que serão usados para criar tabelas de destino para armazenar os dados.

    1. Se você planeja usar o recurso do conector para criar automaticamente a tabela de destino se ela ainda não existir, certifique-se de que o usuário tenha os privilégios necessários para criar e gerenciar objetos Snowflake:

      Objeto

      Privilégio

      Notas

      Banco de dados

      USAGE

      Esquema

      USAGE . CREATE TABLE .

      Após a criação dos objetos de nível de esquema, os privilégios CREATE object podem ser revogados.

      Tabela

      OWNERSHIP

      Necessário somente ao usar o conector Kinesis para ingerir dados em uma tabela existente. . Se o conector criar uma nova tabela de destino para registros do fluxo Kinesis, a função padrão do usuário especificado na configuração se tornará o proprietário da tabela.

      Você pode usar o script a seguir para criar e configurar uma função personalizada (requer SECURITYADMIN ou equivalente):

      USE ROLE SECURITYADMIN;
      
      CREATE ROLE kinesis_connector_role;
      GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role;
      GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role;
      GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role;
      
      -- Only for existing tables.
      GRANT OWNERSHIP ON TABLE existing_table TO ROLE kinesis_connector_role;
      
      Copy
  3. Crie um novo usuário de serviço Snowflake com o tipo SERVICE.

  4. Conceda ao usuário do serviço Snowflake a função que você criou nas etapas anteriores.

    GRANT ROLE kinesis_connector_role TO USER kinesis_connector_user;
    ALTER USER kinesis_connector_user SET DEFAULT_ROLE = kinesis_connector_role;
    
    Copy
  5. Configure com autenticação por pares de chaves para o usuário Snowflake SERVICE a partir da etapa 3.

  6. A Snowflake recomenda enfaticamente essa etapa. Configure um gerenciador de segredos compatível com o Openflow, por exemplo, AWS, Azure e Hashicorp, e armazene as chaves públicas e privadas no armazenamento de segredos.

    Nota

    Se, por algum motivo, você não quiser usar um gerenciador de segredos, será responsável por proteger os arquivos de chave pública e chave privada usados para autenticação de pares de chaves de acordo com as políticas de segurança de sua organização.

    1. Depois que o gerenciador de segredos estiver configurado, determine como você se autenticará nele. No AWS, é recomendável que você use a função de instância EC2 associada ao Openflow, pois dessa forma nenhum outro segredo precisa ser mantido.

    2. No Openflow, configurar um provedor de parâmetros associado a este Secrets Manager, a partir do menu de configuração no canto superior direito. Navegue até Controller Settings » Parameter Provider e depois buscar seus valores de parâmetro.

    3. Nesse momento, todas as credenciais podem ser referenciadas com os caminhos de parâmetros associados e nenhum valor sensível precisa ser mantido no Openflow.

  7. If any other Snowflake users require access to the ingested data and created tables (for example, for custom processing in Snowflake), then grant those users the role created in step 2.

Configuração do conector

Como engenheiro de dados, execute as seguintes tarefas para instalar e configurar o conector:

Instalação do conector

  1. Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.

  2. Na página de conectores do Openflow, localize o conector e selecione Add to runtime.

  3. In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.

    Nota

    Antes de instalar o conector, verifique se você criou um banco de dados e um esquema no Snowflake para que o conector armazene os dados ingeridos.

  4. Autentique-se na implementação com as credenciais de sua conta Snowflake e selecione Allow quando solicitado para permitir que o aplicativo de tempo de execução acesse sua conta Snowflake. O processo de instalação do conector leva alguns minutos para ser concluído.

  5. Autentique-se no tempo de execução com as credenciais de sua conta Snowflake.

A tela do Openflow é exibida com o grupo de processos do conector adicionado a ela.

Configuração do conector

  1. Clique com o botão direito do mouse no grupo de processos importado e selecione Parameters.

  2. Populate the required parameter values as described in Parameters section below.

Parameters

Esta seção descreve todos os parâmetros do Openflow Connector for Kinesis para formato de dados JSON.

O conector consiste em vários módulos. Para ver o conjunto, clique duas vezes no grupo de processos do conector. Você pode definir os parâmetros para cada módulo no contexto de parâmetro do módulo.

Snowflake destination parameters

Parâmetro

Descrição

Obrigatório

Banco de dados de destino

O banco de dados onde os dados serão persistidos. Ele já deve existir no Snowflake. O nome diferencia maiúsculas de minúsculas. Para identificadores sem aspas, forneça o nome em maiúsculas.

Sim

Esquema de destino

O esquema onde os dados serão persistidos, que já deve existir no Snowflake. O nome diferencia maiúsculas de minúsculas. Para identificadores sem aspas, forneça o nome em maiúsculas.

Veja os exemplos a seguir:

  • CREATE SCHEMA SCHEMA_NAME ou CREATE SCHEMA schema_name: use SCHEMA_NAME

  • CREATE SCHEMA "schema_name" ou CREATE SCHEMA "SCHEMA_NAME": use schema_name ou SCHEMA_NAME, respectivamente

Sim

Iceberg ativado

Se o Iceberg está habilitado para operações de tabela. Pode ser true ou false.

Sim

Schema Evolution Enabled

Habilita ou desabilita a evolução do esquema no nível do conector. Quando habilitado, permite alterações automáticas de esquema em tabelas. Observe que a evolução do esquema também pode ser controlada no nível da tabela individual por meio de parâmetros específicos da tabela. Pode ser true ou false.

Sim

Schema Evolution For New Tables Enabled

Controla se a evolução do esquema será habilitada ao criar novas tabelas. Quando definido como «true», novas tabelas serão criadas com o parâmetro ENABLE_SCHEMA_EVOLUTION = TRUE. Quando definido como «false», novas tabelas serão criadas com o parâmetro ENABLE_SCHEMA_EVOLUTION = FALSE. Não aplicável às tabelas Iceberg, pois elas não são criadas automaticamente. Essa configuração afeta apenas a criação de tabelas, não as tabelas existentes. Pode ser true ou false.

Sim

Identificador de conta Snowflake

Ao utilizar:

  • Session Token Authentication Strategy: deve ficar em branco.

  • KEY_PAIR: nome da conta Snowflake formatado como [nome-da-organização]-[nome-da-conta], onde os dados serão persistentes.

Sim

Estratégia de autenticação Snowflake

Ao utilizar:

  • Implantação do Snowflake OpenFlow ou BYOC: Use SNOWFLAKE_SESSION_TOKEN. O Snowflake gerencia este token automaticamente. As implantações BYOC já devem ter configurado as funções de tempo de execução para usar o SNOWFLAKE_SESSION_TOKEN.

  • BYOC: Alternatively BYOC can use KEY_PAIR as the value for authentication strategy.

Sim

Chave privada Snowflake

Ao utilizar:

  • Session Token Authentication Strategy: deve ficar em branco.

  • KEY_PAIR: deve ser a chave privada RSA utilizada para a autenticação.

    The RSA key must be formatted according to PKCS8 standards and have standard PEM headers and footers. Note that either a Snowflake Private Key File or a Snowflake Private Key must be defined.

Não

Arquivo de chave privada Snowflake

Ao utilizar:

  • Estratégia de autenticação de token de sessão: o arquivo de chave privada deve estar em branco.

  • KEY_PAIR: carregue o arquivo que contém a chave privada RSA usada para autenticação no Snowflake, formatado de acordo com os padrões PKCS8 e incluindo cabeçalhos e rodapés PEM padrão. A linha do cabeçalho começa com -----BEGIN PRIVATE. Para carregar o arquivo de chave privada, marque a caixa de seleção Reference asset.

Não

Senha de chave privada Snowflake

Ao usar

  • Session Token Authentication Strategy: deve ficar em branco.

  • KEY_PAIR: forneça a senha associada ao arquivo de chave privada do Snowflake.

Não

Função Snowflake

Ao usar

  • Session Token Authentication Strategy: Use your Snowflake Role. You can find your Snowflake Role in the Openflow UI, by navigating to View Details for your Runtime.

  • Estratégia de autenticação de KEY_PAIR: use uma função válida configurada para o usuário do seu serviço.

Sim

Nome de usuário do Snowflake

Ao usar

  • Session Token Authentication Strategy: deve ficar em branco.

  • KEY_PAIR: forneça o nome de usuário usado para se conectar à instância do Snowflake.

Sim

Kinesis JSON Source Parameters

Parâmetro

Descrição

Obrigatório

Código da região AWS

A região AWS onde seu Kinesis Stream está localizado, por exemplo, us-west-2.

Sim

ID da chave de acesso AWS

The AWS Access Key ID to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch.

Sim

Chave de acesso secreta AWS

The AWS Secret Access Key to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch.

Sim

Nome do aplicativo Kinesis

O nome usado para o nome de tabela DynamoDB para rastrear o progresso do aplicativo no consumo do Kinesis Stream.

Sim

Posição inicial do fluxo do Kinesis

A posição inicial do fluxo a partir da qual os dados começam a ser replicados.

Os valores possíveis são:
  • LATEST: último registro armazenado

  • TRIM_HORIZON: registro armazenado mais antigo

Sim

Nome do fluxo do Kinesis

Nome do fluxo do Kinesis AWS para consumir dados.

Sim

Publicação de métricas

Especifica onde as métricas da Kinesis Client Library são publicadas. Valores possíveis: DISABLED, LOGS e CLOUDWATCH.

Sim

Execute o fluxo

  1. Clique com o botão direito do mouse no plano e selecione Enable all Controller Services.

  2. Right-click on the connector’s process group and select Start.

O conector inicia a ingestão de dados.

Table Schema

The Snowflake table loaded by the connector contains columns named by the keys of your Kinesis messages. The connector also adds a KINESISMETADATA column which stores metadata about the record.

Veja abaixo um exemplo de tabela do Snowflake carregada pelo conector:

Linha

ACCOUNT

SYMBOL

SIDE

QUANTITY

KINESISMETADATA

1

ABC123

ZTEST

BUY

3572

{ … objeto KINESISMETADATA … }

2

XYZ789

ZABZX

SELL

3024

{ … objeto KINESISMETADATA … }

3

XYZ789

ZTEST

SELL

799

{ … objeto KINESISMETADATA … }

4

ABC123

ZABZX

BUY

2033

{ … objeto KINESISMETADATA … }

A coluna KINESISMETADATA contém um objeto com os seguintes campos:

Nome do campo

Tipo de campo

Example Value

Descrição

stream

Cadeia de caracteres

stream-name

O nome do fluxo do Kinesis de origem do registro.

shardId

Cadeia de caracteres

shardId-000000000001

O identificador do fragmento no fluxo de origem do registro.

approximateArrival

Cadeia de caracteres

2025-11-05T09:12:15.300

A hora aproximada em que o registro foi inserido no fluxo (formato ISO 8601).

partitionKey

Cadeia de caracteres

key-1234

A chave de partição especificada pelo produtor de dados para o registro.

sequenceNumber

Cadeia de caracteres

123456789

O número de sequência exclusivo atribuído pelos fluxos de dados do Kinesis ao registro no fragmento.

subSequenceNumber

Número

2

O número de subsequência do registro (usado para registros agregados com o mesmo número de sequência).

shardedSequenceNumber

Cadeia de caracteres

12345678900002

Uma combinação do número de sequência e do número de subsequência para o registro.

Evolução do esquema

Esse conector é compatível com a detecção e a evolução automáticas do esquema. A estrutura das tabelas no Snowflake é definida e evoluída automaticamente para oferecer suporte à estrutura dos novos dados carregados pelo conector.

O Snowflake detecta o esquema dos dados recebidos e carrega os dados em tabelas que correspondam a um esquema definido pelo usuário. O Snowflake também permite adicionar novas colunas ou descartar a restrição NOT NULL de colunas ausentes em novos registros recebidos.

A detecção de esquema com o conector infere tipos de dados com base no dados JSON fornecidos.

Se o conector criar a tabela de destino, a evolução do esquema será ativada por padrão.

If you want to enable or disable schema evolution on an existing table, use the ALTER TABLE command to set the ENABLE_SCHEMA_EVOLUTION parameter. You must also use a role that has the OWNERSHIP privilege on the table. For more information, see Evolução do esquema da tabela.

However, if schema evolution is disabled for an existing table, then the connector will try to send the rows with mismatched schemas to the configured failure output port.

Iceberg table support

Openflow Connector for Kinesis can ingest data into a Snowflake-managed Apache Iceberg™ table when Iceberg Enabled is set to true.

Requisitos e limitações

Before you configure the Openflow Connector for Kinesis for Iceberg table ingestion, note the following requirements and limitations:

  • É necessário criar uma tabela Iceberg antes de executar o conector.

  • Certifique-se de que o usuário tenha acesso à inserção de dados nas tabelas criadas.

Configuração e instalação

To configure the Openflow Connector for Kinesis for Iceberg table ingestion, follow the steps in Set up Openflow Connector for Kinesis for JSON data format with a few differences noted in the following sections.

Habilite a ingestão na tabela Iceberg

Para permitir a ingestão em uma tabela Iceberg, você deve definir o parâmetro Iceberg Enabled como true.

Criação de uma tabela Iceberg para ingestão

Antes de executar o conector, é necessário criar uma tabela Iceberg. O esquema inicial da tabela depende das configurações de propriedade da evolução do esquema habilitada do seu conector.

Com a evolução do esquema habilitada, você deve criar uma tabela com uma coluna chamada kinesisMetadata. O conector cria automaticamente as colunas para os campos da mensagem e altera o esquema da coluna kinesisMetadata.

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    kinesisMetadata OBJECT()
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table'
  ENABLE_SCHEMA_EVOLUTION = true;
Copy

If schema evolution is disabled, you must create the table with all fields the Kinesis message contains. When you create an Iceberg table, you can use Iceberg data types or compatible Snowflake types. The semi-structured VARIANT type isn’t supported. Instead, use a structured OBJECT or MAP.

Por exemplo, considere a seguinte mensagem:

{
    "id": 1,
    "name": "Steve",
    "body_temperature": 36.6,
    "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
    "animals_possessed":
    {
        "dogs": true,
        "cats": false
    },
    "date_added": "2024-10-15"
}
Copy

A seguinte instrução cria uma tabela com todos os campos que fazem parte da mensagem do Kinesis:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    kinesisMetadata OBJECT(
        stream STRING,
        shardId STRING,
        approximateArrival STRING,
        partitionKey STRING,
        sequenceNumber STRING,
        subSequenceNumber INTEGER,
        shardedSequenceNumber STRING
    ),
    id INT,
    body_temperature FLOAT,
    name STRING,
    approved_coffee_types ARRAY(STRING),
    animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
    date_added DATE
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

Nota

kinesisMetadata must always be created. Field names inside nested structures such as dogs or cats are case sensitive.