Set up Openflow Connector for Kinesis for JSON data format¶
Nota
This connector is subject to the Snowflake Connector Terms.
This topic describes the set up steps for the Openflow Connector for Kinesis for JSON data format. This is a simplified connector optimized for basic message ingestion with schema evolution capabilities.
The Openflow Connector for Kinesis for JSON data format is designed for straightforward JSON message ingestion from Kinesis streams to Snowflake tables.
Pré-requisitos¶
Certifique-se de ter revisado Sobre a Openflow Connector for Kinesis.
Ensure that you have Configuração do Openflow - BYOC or Set up Openflow - Snowflake Deployments.
Se você usa Openflow - Snowflake Deployments, garanta que já tenha revisado a configuração dos domínios necessários e concedido acesso a esses domínios para o conector Kinesis.
Nota
If you need the support of other data formats or features, such as DLQ, reach out to your Snowflake representative.
Configure um fluxo Kinesis¶
Como administrador do AWS, execute as seguintes ações em sua conta AWS:
Ensure that you have an AWS User with IAM permissions to access Kinesis Streams and DynamoDB.
Ensure that the AWS User has configured Access Key credentials.
Configure a conta Snowflake¶
Como administrador de conta Snowflake, execute as seguintes tarefas:
Crie uma nova função ou use uma função existente e conceda a Privilégios de banco de dados.
Crie um banco de dados de destino e um esquema de destino que serão usados para criar tabelas de destino para armazenar os dados.
Se você planeja usar o recurso do conector para criar automaticamente a tabela de destino se ela ainda não existir, certifique-se de que o usuário tenha os privilégios necessários para criar e gerenciar objetos Snowflake:
Objeto
Privilégio
Notas
Banco de dados
USAGE
Esquema
USAGE . CREATE TABLE .
Após a criação dos objetos de nível de esquema, os privilégios CREATE
objectpodem ser revogados.Tabela
OWNERSHIP
Necessário somente ao usar o conector Kinesis para ingerir dados em uma tabela existente. . Se o conector criar uma nova tabela de destino para registros do fluxo Kinesis, a função padrão do usuário especificado na configuração se tornará o proprietário da tabela.
Você pode usar o script a seguir para criar e configurar uma função personalizada (requer SECURITYADMIN ou equivalente):
USE ROLE SECURITYADMIN; CREATE ROLE kinesis_connector_role; GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role; GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role; GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role; -- Only for existing tables. GRANT OWNERSHIP ON TABLE existing_table TO ROLE kinesis_connector_role;
Crie um novo usuário de serviço Snowflake com o tipo SERVICE.
Conceda ao usuário do serviço Snowflake a função que você criou nas etapas anteriores.
GRANT ROLE kinesis_connector_role TO USER kinesis_connector_user; ALTER USER kinesis_connector_user SET DEFAULT_ROLE = kinesis_connector_role;
Configure com autenticação por pares de chaves para o usuário Snowflake SERVICE a partir da etapa 3.
A Snowflake recomenda enfaticamente essa etapa. Configure um gerenciador de segredos compatível com o Openflow, por exemplo, AWS, Azure e Hashicorp, e armazene as chaves públicas e privadas no armazenamento de segredos.
Nota
Se, por algum motivo, você não quiser usar um gerenciador de segredos, será responsável por proteger os arquivos de chave pública e chave privada usados para autenticação de pares de chaves de acordo com as políticas de segurança de sua organização.
Depois que o gerenciador de segredos estiver configurado, determine como você se autenticará nele. No AWS, é recomendável que você use a função de instância EC2 associada ao Openflow, pois dessa forma nenhum outro segredo precisa ser mantido.
No Openflow, configurar um provedor de parâmetros associado a este Secrets Manager, a partir do menu de configuração no canto superior direito. Navegue até Controller Settings » Parameter Provider e depois buscar seus valores de parâmetro.
Nesse momento, todas as credenciais podem ser referenciadas com os caminhos de parâmetros associados e nenhum valor sensível precisa ser mantido no Openflow.
If any other Snowflake users require access to the ingested data and created tables (for example, for custom processing in Snowflake), then grant those users the role created in step 2.
Configuração do conector¶
Como engenheiro de dados, execute as seguintes tarefas para instalar e configurar o conector:
Instalação do conector¶
Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.
Na página de conectores do Openflow, localize o conector e selecione Add to runtime.
In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.
Nota
Antes de instalar o conector, verifique se você criou um banco de dados e um esquema no Snowflake para que o conector armazene os dados ingeridos.
Autentique-se na implementação com as credenciais de sua conta Snowflake e selecione Allow quando solicitado para permitir que o aplicativo de tempo de execução acesse sua conta Snowflake. O processo de instalação do conector leva alguns minutos para ser concluído.
Autentique-se no tempo de execução com as credenciais de sua conta Snowflake.
A tela do Openflow é exibida com o grupo de processos do conector adicionado a ela.
Configuração do conector¶
Clique com o botão direito do mouse no grupo de processos importado e selecione Parameters.
Populate the required parameter values as described in Parameters section below.
Parameters¶
This section describes all parameters for the Openflow Connector for Kinesis for JSON data format.
The connector consists of a several modules. To see the set, double click on the connector process group. You will be able to set the parameters for each module in the module’s parameter context.
Snowflake destination parameters¶
Parâmetro |
Descrição |
Obrigatório |
|---|---|---|
Banco de dados de destino |
O banco de dados onde os dados serão persistidos. Ele já deve existir no Snowflake. O nome diferencia maiúsculas de minúsculas. Para identificadores sem aspas, forneça o nome em maiúsculas. |
Sim |
Esquema de destino |
O esquema onde os dados serão persistidos, que já deve existir no Snowflake. O nome diferencia maiúsculas de minúsculas. Para identificadores sem aspas, forneça o nome em maiúsculas. Veja os exemplos a seguir:
|
Sim |
Iceberg ativado |
Whether Iceberg is enabled for table operations. One of |
Sim |
Schema Evolution Enabled |
Enables or disables schema evolution at the connector level. When enabled, allows automatic schema changes for tables.
Note that schema evolution can also be controlled at the individual table level through table-specific parameters.
One of: |
Sim |
Schema Evolution For New Tables Enabled |
Controls whether schema evolution is enabled when creating new tables. When set to “true”, new tables will be created with ENABLE_SCHEMA_EVOLUTION = TRUE parameter.
When set to “false”, new tables will be created with ENABLE_SCHEMA_EVOLUTION = FALSE parameter.
Not applicable to Iceberg tables as they are not being created automatically.
This setting only affects table creation, not existing tables. One of: |
Sim |
Identificador de conta Snowflake |
Ao utilizar:
|
Sim |
Estratégia de autenticação Snowflake |
Ao utilizar:
|
Sim |
Chave privada Snowflake |
Ao utilizar:
|
Não |
Arquivo de chave privada Snowflake |
Ao utilizar:
|
Não |
Senha de chave privada Snowflake |
Ao usar
|
Não |
Função Snowflake |
Ao usar
|
Sim |
Nome de usuário do Snowflake |
Ao usar
|
Sim |
Kinesis JSON Source Parameters¶
Parâmetro |
Descrição |
Obrigatório |
|---|---|---|
Código da região AWS |
A região AWS onde seu Kinesis Stream está localizado, por exemplo, |
Sim |
ID da chave de acesso AWS |
The AWS Access Key ID to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch. |
Sim |
Chave de acesso secreta AWS |
The AWS Secret Access Key to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch. |
Sim |
Nome do aplicativo Kinesis |
O nome usado para o nome de tabela DynamoDB para rastrear o progresso do aplicativo no consumo do Kinesis Stream. |
Sim |
Posição inicial do fluxo do Kinesis |
A posição inicial do fluxo a partir da qual os dados começam a ser replicados.
|
Sim |
Nome do fluxo do Kinesis |
Nome do fluxo do Kinesis AWS para consumir dados. |
Sim |
Metrics Publishing |
Specifies where Kinesis Client Library metrics are published to. Possible values: |
Sim |
Execute o fluxo¶
Clique com o botão direito do mouse no plano e selecione Enable all Controller Services.
Right-click on the connector’s process group and select Start.
O conector inicia a ingestão de dados.
Table Schema¶
The Snowflake table loaded by the connector contains columns named by the keys of your Kinesis messages.
The connector also adds a KINESISMETADATA column which stores metadata about the record.
Below is an example of a Snowflake table loaded by the connector:
Linha |
ACCOUNT |
SYMBOL |
SIDE |
QUANTITY |
KINESISMETADATA |
|---|---|---|---|---|---|
1 |
ABC123 |
ZTEST |
BUY |
3572 |
{ … KINESISMETADATA object … } |
2 |
XYZ789 |
ZABZX |
SELL |
3024 |
{ … KINESISMETADATA object … } |
3 |
XYZ789 |
ZTEST |
SELL |
799 |
{ … KINESISMETADATA object … } |
4 |
ABC123 |
ZABZX |
BUY |
2033 |
{ … KINESISMETADATA object … } |
The KINESISMETADATA column contains an object with the following fields:
Field Name |
Field Type |
Example Value |
Descrição |
|---|---|---|---|
|
String |
|
The name of the Kinesis stream the record came from. |
|
String |
|
The identifier of the shard in the stream the record came from. |
|
String |
|
The approximate time that the record was inserted into the stream (ISO 8601 format). |
|
String |
|
The partition key specified by the data producer for the record. |
|
String |
|
The unique sequence number assigned by Kinesis Data Streams to the record in the shard. |
|
Number |
|
The subsequence number for the record (used for aggregated records with the same sequence number). |
|
String |
|
A combination of the sequence number and the subsequence number for the record. |
Evolução do esquema¶
This connector supports automatic schema detection and evolution. The structure of tables in Snowflake is defined and evolved automatically to support the structure of new data loaded by the connector.
Snowflake detects the schema of the incoming data and loads data into tables
that match any user-defined schema. Snowflake also allows adding
new columns or dropping the NOT NULL constraint from columns missing in new incoming records.
Schema detection with the connector infers data types based on the JSON data provided.
If the connector creates the target table, schema evolution is enabled by default.
If you want to enable or disable schema evolution on an existing table,
use the ALTER TABLE command to set the ENABLE_SCHEMA_EVOLUTION parameter.
You must also use a role that has the OWNERSHIP privilege on the table. For more information, see Evolução do esquema da tabela.
However, if schema evolution is disabled for an existing table, then the connector will try to send the rows with mismatched schemas to the configured failure output port.
Iceberg table support¶
Openflow Connector for Kinesis can ingest data into a Snowflake-managed Apache Iceberg™ table when Iceberg Enabled is set to true.
Requisitos e limitações¶
Before you configure the Openflow Connector for Kinesis for Iceberg table ingestion, note the following requirements and limitations:
É necessário criar uma tabela Iceberg antes de executar o conector.
Certifique-se de que o usuário tenha acesso à inserção de dados nas tabelas criadas.
Configuração e instalação¶
To configure the Openflow Connector for Kinesis for Iceberg table ingestion, follow the steps in Set up Openflow Connector for Kinesis for JSON data format with a few differences noted in the following sections.
Habilite a ingestão na tabela Iceberg¶
Para permitir a ingestão em uma tabela Iceberg, você deve definir o parâmetro Iceberg Enabled como true.
Criação de uma tabela Iceberg para ingestão¶
Before you run the connector, you must create an Iceberg table.
The initial table schema depends on your connector Schema Evolution Enabled property settings.
With enabled schema evolution, you must create a table with a column named kinesisMetadata.
The connector automatically creates the columns for message fields and alters the kinesisMetadata column schema.
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
kinesisMetadata OBJECT()
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table'
ENABLE_SCHEMA_EVOLUTION = true;
If schema evolution is disabled, you must create the table with all fields the Kinesis message contains. When you create an Iceberg table, you can use Iceberg data types or compatible Snowflake types. The semi-structured VARIANT type isn’t supported. Instead, use a structured OBJECT or MAP.
Por exemplo, considere a seguinte mensagem:
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"date_added": "2024-10-15"
}
The following statement creates a table with all fields the Kinesis message contains:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
kinesisMetadata OBJECT(
stream STRING,
shardId STRING,
approximateArrival STRING,
partitionKey STRING,
sequenceNumber STRING,
subSequenceNumber INTEGER,
shardedSequenceNumber STRING
),
id INT,
body_temperature FLOAT,
name STRING,
approved_coffee_types ARRAY(STRING),
animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
date_added DATE
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
Nota
kinesisMetadata must always be created. Field names inside nested structures such as dogs or cats are case sensitive.