Set up Openflow Connector for Kinesis for JSON data format

Nota

This connector is subject to the Snowflake Connector Terms.

This topic describes the set up steps for the Openflow Connector for Kinesis for JSON data format. This is a simplified connector optimized for basic message ingestion with schema evolution capabilities.

The Openflow Connector for Kinesis for JSON data format is designed for straightforward JSON message ingestion from Kinesis streams to Snowflake tables.

Pré-requisitos

  1. Certifique-se de ter revisado Sobre a Openflow Connector for Kinesis.

  2. Ensure that you have Configuração do Openflow - BYOC or Set up Openflow - Snowflake Deployments.

  3. Se você usa Openflow - Snowflake Deployments, garanta que já tenha revisado a configuração dos domínios necessários e concedido acesso a esses domínios para o conector Kinesis.

Nota

If you need the support of other data formats or features, such as DLQ, reach out to your Snowflake representative.

Configure um fluxo Kinesis

Como administrador do AWS, execute as seguintes ações em sua conta AWS:

  1. Ensure that you have an AWS User with IAM permissions to access Kinesis Streams and DynamoDB.

  2. Ensure that the AWS User has configured Access Key credentials.

Configure a conta Snowflake

Como administrador de conta Snowflake, execute as seguintes tarefas:

  1. Crie uma nova função ou use uma função existente e conceda a Privilégios de banco de dados.

  2. Crie um banco de dados de destino e um esquema de destino que serão usados para criar tabelas de destino para armazenar os dados.

    1. Se você planeja usar o recurso do conector para criar automaticamente a tabela de destino se ela ainda não existir, certifique-se de que o usuário tenha os privilégios necessários para criar e gerenciar objetos Snowflake:

      Objeto

      Privilégio

      Notas

      Banco de dados

      USAGE

      Esquema

      USAGE . CREATE TABLE .

      Após a criação dos objetos de nível de esquema, os privilégios CREATE object podem ser revogados.

      Tabela

      OWNERSHIP

      Necessário somente ao usar o conector Kinesis para ingerir dados em uma tabela existente. . Se o conector criar uma nova tabela de destino para registros do fluxo Kinesis, a função padrão do usuário especificado na configuração se tornará o proprietário da tabela.

      Você pode usar o script a seguir para criar e configurar uma função personalizada (requer SECURITYADMIN ou equivalente):

      USE ROLE SECURITYADMIN;
      
      CREATE ROLE kinesis_connector_role;
      GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role;
      GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role;
      GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role;
      
      -- Only for existing tables.
      GRANT OWNERSHIP ON TABLE existing_table TO ROLE kinesis_connector_role;
      
      Copy
  3. Crie um novo usuário de serviço Snowflake com o tipo SERVICE.

  4. Conceda ao usuário do serviço Snowflake a função que você criou nas etapas anteriores.

    GRANT ROLE kinesis_connector_role TO USER kinesis_connector_user;
    ALTER USER kinesis_connector_user SET DEFAULT_ROLE = kinesis_connector_role;
    
    Copy
  5. Configure com autenticação por pares de chaves para o usuário Snowflake SERVICE a partir da etapa 3.

  6. A Snowflake recomenda enfaticamente essa etapa. Configure um gerenciador de segredos compatível com o Openflow, por exemplo, AWS, Azure e Hashicorp, e armazene as chaves públicas e privadas no armazenamento de segredos.

    Nota

    Se, por algum motivo, você não quiser usar um gerenciador de segredos, será responsável por proteger os arquivos de chave pública e chave privada usados para autenticação de pares de chaves de acordo com as políticas de segurança de sua organização.

    1. Depois que o gerenciador de segredos estiver configurado, determine como você se autenticará nele. No AWS, é recomendável que você use a função de instância EC2 associada ao Openflow, pois dessa forma nenhum outro segredo precisa ser mantido.

    2. No Openflow, configurar um provedor de parâmetros associado a este Secrets Manager, a partir do menu de configuração no canto superior direito. Navegue até Controller Settings » Parameter Provider e depois buscar seus valores de parâmetro.

    3. Nesse momento, todas as credenciais podem ser referenciadas com os caminhos de parâmetros associados e nenhum valor sensível precisa ser mantido no Openflow.

  7. If any other Snowflake users require access to the ingested data and created tables (for example, for custom processing in Snowflake), then grant those users the role created in step 2.

Configuração do conector

Como engenheiro de dados, execute as seguintes tarefas para instalar e configurar o conector:

Instalação do conector

  1. Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.

  2. Na página de conectores do Openflow, localize o conector e selecione Add to runtime.

  3. In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.

    Nota

    Antes de instalar o conector, verifique se você criou um banco de dados e um esquema no Snowflake para que o conector armazene os dados ingeridos.

  4. Autentique-se na implementação com as credenciais de sua conta Snowflake e selecione Allow quando solicitado para permitir que o aplicativo de tempo de execução acesse sua conta Snowflake. O processo de instalação do conector leva alguns minutos para ser concluído.

  5. Autentique-se no tempo de execução com as credenciais de sua conta Snowflake.

A tela do Openflow é exibida com o grupo de processos do conector adicionado a ela.

Configuração do conector

  1. Clique com o botão direito do mouse no grupo de processos importado e selecione Parameters.

  2. Populate the required parameter values as described in Parameters section below.

Parameters

This section describes all parameters for the Openflow Connector for Kinesis for JSON data format.

The connector consists of a several modules. To see the set, double click on the connector process group. You will be able to set the parameters for each module in the module’s parameter context.

Snowflake destination parameters

Parâmetro

Descrição

Obrigatório

Banco de dados de destino

O banco de dados onde os dados serão persistidos. Ele já deve existir no Snowflake. O nome diferencia maiúsculas de minúsculas. Para identificadores sem aspas, forneça o nome em maiúsculas.

Sim

Esquema de destino

O esquema onde os dados serão persistidos, que já deve existir no Snowflake. O nome diferencia maiúsculas de minúsculas. Para identificadores sem aspas, forneça o nome em maiúsculas.

Veja os exemplos a seguir:

  • CREATE SCHEMA SCHEMA_NAME ou CREATE SCHEMA schema_name: use SCHEMA_NAME

  • CREATE SCHEMA "schema_name" ou CREATE SCHEMA "SCHEMA_NAME": use schema_name ou SCHEMA_NAME, respectivamente

Sim

Iceberg ativado

Whether Iceberg is enabled for table operations. One of true / false.

Sim

Schema Evolution Enabled

Enables or disables schema evolution at the connector level. When enabled, allows automatic schema changes for tables. Note that schema evolution can also be controlled at the individual table level through table-specific parameters. One of: true / false.

Sim

Schema Evolution For New Tables Enabled

Controls whether schema evolution is enabled when creating new tables. When set to “true”, new tables will be created with ENABLE_SCHEMA_EVOLUTION = TRUE parameter. When set to “false”, new tables will be created with ENABLE_SCHEMA_EVOLUTION = FALSE parameter. Not applicable to Iceberg tables as they are not being created automatically. This setting only affects table creation, not existing tables. One of: true / false.

Sim

Identificador de conta Snowflake

Ao utilizar:

  • Session Token Authentication Strategy: deve ficar em branco.

  • KEY_PAIR: nome da conta Snowflake formatado como [nome-da-organização]-[nome-da-conta], onde os dados serão persistentes.

Sim

Estratégia de autenticação Snowflake

Ao utilizar:

  • Implantação do Snowflake OpenFlow ou BYOC: Use SNOWFLAKE_SESSION_TOKEN. O Snowflake gerencia este token automaticamente. As implantações BYOC já devem ter configurado as funções de tempo de execução para usar o SNOWFLAKE_SESSION_TOKEN.

  • BYOC: Alternatively BYOC can use KEY_PAIR as the value for authentication strategy.

Sim

Chave privada Snowflake

Ao utilizar:

  • Session Token Authentication Strategy: deve ficar em branco.

  • KEY_PAIR: deve ser a chave privada RSA utilizada para a autenticação.

    The RSA key must be formatted according to PKCS8 standards and have standard PEM headers and footers. Note that either a Snowflake Private Key File or a Snowflake Private Key must be defined.

Não

Arquivo de chave privada Snowflake

Ao utilizar:

  • Estratégia de autenticação de token de sessão: o arquivo de chave privada deve estar em branco.

  • KEY_PAIR: carregue o arquivo que contém a chave privada RSA usada para autenticação no Snowflake, formatado de acordo com os padrões PKCS8 e incluindo cabeçalhos e rodapés PEM padrão. A linha do cabeçalho começa com -----BEGIN PRIVATE. Para carregar o arquivo de chave privada, marque a caixa de seleção Reference asset.

Não

Senha de chave privada Snowflake

Ao usar

  • Session Token Authentication Strategy: deve ficar em branco.

  • KEY_PAIR: forneça a senha associada ao arquivo de chave privada do Snowflake.

Não

Função Snowflake

Ao usar

  • Session Token Authentication Strategy: Use your Snowflake Role. You can find your Snowflake Role in the Openflow UI, by navigating to View Details for your Runtime.

  • Estratégia de autenticação de KEY_PAIR: use uma função válida configurada para o usuário do seu serviço.

Sim

Nome de usuário do Snowflake

Ao usar

  • Session Token Authentication Strategy: deve ficar em branco.

  • KEY_PAIR: forneça o nome de usuário usado para se conectar à instância do Snowflake.

Sim

Kinesis JSON Source Parameters

Parâmetro

Descrição

Obrigatório

Código da região AWS

A região AWS onde seu Kinesis Stream está localizado, por exemplo, us-west-2.

Sim

ID da chave de acesso AWS

The AWS Access Key ID to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch.

Sim

Chave de acesso secreta AWS

The AWS Secret Access Key to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch.

Sim

Nome do aplicativo Kinesis

O nome usado para o nome de tabela DynamoDB para rastrear o progresso do aplicativo no consumo do Kinesis Stream.

Sim

Posição inicial do fluxo do Kinesis

A posição inicial do fluxo a partir da qual os dados começam a ser replicados.

Os valores possíveis são:
  • LATEST: último registro armazenado

  • TRIM_HORIZON: registro armazenado mais antigo

Sim

Nome do fluxo do Kinesis

Nome do fluxo do Kinesis AWS para consumir dados.

Sim

Metrics Publishing

Specifies where Kinesis Client Library metrics are published to. Possible values: DISABLED, LOGS, CLOUDWATCH.

Sim

Execute o fluxo

  1. Clique com o botão direito do mouse no plano e selecione Enable all Controller Services.

  2. Right-click on the connector’s process group and select Start.

O conector inicia a ingestão de dados.

Table Schema

The Snowflake table loaded by the connector contains columns named by the keys of your Kinesis messages. The connector also adds a KINESISMETADATA column which stores metadata about the record.

Below is an example of a Snowflake table loaded by the connector:

Linha

ACCOUNT

SYMBOL

SIDE

QUANTITY

KINESISMETADATA

1

ABC123

ZTEST

BUY

3572

{ … KINESISMETADATA object … }

2

XYZ789

ZABZX

SELL

3024

{ … KINESISMETADATA object … }

3

XYZ789

ZTEST

SELL

799

{ … KINESISMETADATA object … }

4

ABC123

ZABZX

BUY

2033

{ … KINESISMETADATA object … }

The KINESISMETADATA column contains an object with the following fields:

Field Name

Field Type

Example Value

Descrição

stream

String

stream-name

The name of the Kinesis stream the record came from.

shardId

String

shardId-000000000001

The identifier of the shard in the stream the record came from.

approximateArrival

String

2025-11-05T09:12:15.300

The approximate time that the record was inserted into the stream (ISO 8601 format).

partitionKey

String

key-1234

The partition key specified by the data producer for the record.

sequenceNumber

String

123456789

The unique sequence number assigned by Kinesis Data Streams to the record in the shard.

subSequenceNumber

Number

2

The subsequence number for the record (used for aggregated records with the same sequence number).

shardedSequenceNumber

String

12345678900002

A combination of the sequence number and the subsequence number for the record.

Evolução do esquema

This connector supports automatic schema detection and evolution. The structure of tables in Snowflake is defined and evolved automatically to support the structure of new data loaded by the connector.

Snowflake detects the schema of the incoming data and loads data into tables that match any user-defined schema. Snowflake also allows adding new columns or dropping the NOT NULL constraint from columns missing in new incoming records.

Schema detection with the connector infers data types based on the JSON data provided.

If the connector creates the target table, schema evolution is enabled by default.

If you want to enable or disable schema evolution on an existing table, use the ALTER TABLE command to set the ENABLE_SCHEMA_EVOLUTION parameter. You must also use a role that has the OWNERSHIP privilege on the table. For more information, see Evolução do esquema da tabela.

However, if schema evolution is disabled for an existing table, then the connector will try to send the rows with mismatched schemas to the configured failure output port.

Iceberg table support

Openflow Connector for Kinesis can ingest data into a Snowflake-managed Apache Iceberg™ table when Iceberg Enabled is set to true.

Requisitos e limitações

Before you configure the Openflow Connector for Kinesis for Iceberg table ingestion, note the following requirements and limitations:

  • É necessário criar uma tabela Iceberg antes de executar o conector.

  • Certifique-se de que o usuário tenha acesso à inserção de dados nas tabelas criadas.

Configuração e instalação

To configure the Openflow Connector for Kinesis for Iceberg table ingestion, follow the steps in Set up Openflow Connector for Kinesis for JSON data format with a few differences noted in the following sections.

Habilite a ingestão na tabela Iceberg

Para permitir a ingestão em uma tabela Iceberg, você deve definir o parâmetro Iceberg Enabled como true.

Criação de uma tabela Iceberg para ingestão

Before you run the connector, you must create an Iceberg table. The initial table schema depends on your connector Schema Evolution Enabled property settings.

With enabled schema evolution, you must create a table with a column named kinesisMetadata. The connector automatically creates the columns for message fields and alters the kinesisMetadata column schema.

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    kinesisMetadata OBJECT()
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table'
  ENABLE_SCHEMA_EVOLUTION = true;
Copy

If schema evolution is disabled, you must create the table with all fields the Kinesis message contains. When you create an Iceberg table, you can use Iceberg data types or compatible Snowflake types. The semi-structured VARIANT type isn’t supported. Instead, use a structured OBJECT or MAP.

Por exemplo, considere a seguinte mensagem:

{
    "id": 1,
    "name": "Steve",
    "body_temperature": 36.6,
    "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
    "animals_possessed":
    {
        "dogs": true,
        "cats": false
    },
    "date_added": "2024-10-15"
}
Copy

The following statement creates a table with all fields the Kinesis message contains:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    kinesisMetadata OBJECT(
        stream STRING,
        shardId STRING,
        approximateArrival STRING,
        partitionKey STRING,
        sequenceNumber STRING,
        subSequenceNumber INTEGER,
        shardedSequenceNumber STRING
    ),
    id INT,
    body_temperature FLOAT,
    name STRING,
    approved_coffee_types ARRAY(STRING),
    animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
    date_added DATE
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

Nota

kinesisMetadata must always be created. Field names inside nested structures such as dogs or cats are case sensitive.