Set up Openflow Connector for Kinesis for JSON data format¶
Nota
This connector is subject to the Snowflake Connector Terms.
Este tópico descreve as etapas de configuração do Openflow Connector for Kinesis para formato de dados JSON. Trata-se de um conector simplificado otimizado para ingestão básica de mensagens com recursos de evolução do esquema.
O Openflow Connector for Kinesis para formato de dados JSON foi projetado para ingestão simples de mensagens JSON dos fluxos do Kinesis para tabelas do Snowflake.
Pré-requisitos¶
Certifique-se de ter revisado Sobre a Openflow Connector for Kinesis.
Ensure that you have Configuração do Openflow - BYOC or Set up Openflow - Snowflake Deployments.
Se você usa Openflow - Snowflake Deployments, garanta que já tenha revisado a configuração dos domínios necessários e concedido acesso a esses domínios para o conector Kinesis.
Nota
Se você precisar de suporte para outros formatos de dados ou recursos, como DLQ, entre em contato com seu representante Snowflake.
Configure um fluxo Kinesis¶
Como administrador do AWS, execute as seguintes ações em sua conta AWS:
Ensure that you have an AWS User with IAM permissions to access Kinesis Streams and DynamoDB.
Ensure that the AWS User has configured Access Key credentials.
Configure a conta Snowflake¶
Como administrador de conta Snowflake, execute as seguintes tarefas:
Crie uma nova função ou use uma função existente e conceda a Privilégios de banco de dados.
Crie um banco de dados de destino e um esquema de destino que serão usados para criar tabelas de destino para armazenar os dados.
Se você planeja usar o recurso do conector para criar automaticamente a tabela de destino se ela ainda não existir, certifique-se de que o usuário tenha os privilégios necessários para criar e gerenciar objetos Snowflake:
Objeto
Privilégio
Notas
Banco de dados
USAGE
Esquema
USAGE . CREATE TABLE .
Após a criação dos objetos de nível de esquema, os privilégios CREATE
objectpodem ser revogados.Tabela
OWNERSHIP
Necessário somente ao usar o conector Kinesis para ingerir dados em uma tabela existente. . Se o conector criar uma nova tabela de destino para registros do fluxo Kinesis, a função padrão do usuário especificado na configuração se tornará o proprietário da tabela.
Você pode usar o script a seguir para criar e configurar uma função personalizada (requer SECURITYADMIN ou equivalente):
USE ROLE SECURITYADMIN; CREATE ROLE kinesis_connector_role; GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role; GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role; GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role; -- Only for existing tables. GRANT OWNERSHIP ON TABLE existing_table TO ROLE kinesis_connector_role;
Crie um novo usuário de serviço Snowflake com o tipo SERVICE.
Conceda ao usuário do serviço Snowflake a função que você criou nas etapas anteriores.
GRANT ROLE kinesis_connector_role TO USER kinesis_connector_user; ALTER USER kinesis_connector_user SET DEFAULT_ROLE = kinesis_connector_role;
Configure com autenticação por pares de chaves para o usuário Snowflake SERVICE a partir da etapa 3.
A Snowflake recomenda enfaticamente essa etapa. Configure um gerenciador de segredos compatível com o Openflow, por exemplo, AWS, Azure e Hashicorp, e armazene as chaves públicas e privadas no armazenamento de segredos.
Nota
Se, por algum motivo, você não quiser usar um gerenciador de segredos, será responsável por proteger os arquivos de chave pública e chave privada usados para autenticação de pares de chaves de acordo com as políticas de segurança de sua organização.
Depois que o gerenciador de segredos estiver configurado, determine como você se autenticará nele. No AWS, é recomendável que você use a função de instância EC2 associada ao Openflow, pois dessa forma nenhum outro segredo precisa ser mantido.
No Openflow, configurar um provedor de parâmetros associado a este Secrets Manager, a partir do menu de configuração no canto superior direito. Navegue até Controller Settings » Parameter Provider e depois buscar seus valores de parâmetro.
Nesse momento, todas as credenciais podem ser referenciadas com os caminhos de parâmetros associados e nenhum valor sensível precisa ser mantido no Openflow.
If any other Snowflake users require access to the ingested data and created tables (for example, for custom processing in Snowflake), then grant those users the role created in step 2.
Configuração do conector¶
Como engenheiro de dados, execute as seguintes tarefas para instalar e configurar o conector:
Instalação do conector¶
Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.
Na página de conectores do Openflow, localize o conector e selecione Add to runtime.
In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.
Nota
Antes de instalar o conector, verifique se você criou um banco de dados e um esquema no Snowflake para que o conector armazene os dados ingeridos.
Autentique-se na implementação com as credenciais de sua conta Snowflake e selecione Allow quando solicitado para permitir que o aplicativo de tempo de execução acesse sua conta Snowflake. O processo de instalação do conector leva alguns minutos para ser concluído.
Autentique-se no tempo de execução com as credenciais de sua conta Snowflake.
A tela do Openflow é exibida com o grupo de processos do conector adicionado a ela.
Configuração do conector¶
Clique com o botão direito do mouse no grupo de processos importado e selecione Parameters.
Populate the required parameter values as described in Parameters section below.
Parameters¶
Esta seção descreve todos os parâmetros do Openflow Connector for Kinesis para formato de dados JSON.
O conector consiste em vários módulos. Para ver o conjunto, clique duas vezes no grupo de processos do conector. Você pode definir os parâmetros para cada módulo no contexto de parâmetro do módulo.
Snowflake destination parameters¶
Parâmetro |
Descrição |
Obrigatório |
|---|---|---|
Banco de dados de destino |
O banco de dados onde os dados serão persistidos. Ele já deve existir no Snowflake. O nome diferencia maiúsculas de minúsculas. Para identificadores sem aspas, forneça o nome em maiúsculas. |
Sim |
Esquema de destino |
O esquema onde os dados serão persistidos, que já deve existir no Snowflake. O nome diferencia maiúsculas de minúsculas. Para identificadores sem aspas, forneça o nome em maiúsculas. Veja os exemplos a seguir:
|
Sim |
Iceberg ativado |
Se o Iceberg está habilitado para operações de tabela. Pode ser |
Sim |
Schema Evolution Enabled |
Habilita ou desabilita a evolução do esquema no nível do conector. Quando habilitado, permite alterações automáticas de esquema em tabelas. Observe que a evolução do esquema também pode ser controlada no nível da tabela individual por meio de parâmetros específicos da tabela. Pode ser |
Sim |
Schema Evolution For New Tables Enabled |
Controla se a evolução do esquema será habilitada ao criar novas tabelas. Quando definido como «true», novas tabelas serão criadas com o parâmetro ENABLE_SCHEMA_EVOLUTION = TRUE. Quando definido como «false», novas tabelas serão criadas com o parâmetro ENABLE_SCHEMA_EVOLUTION = FALSE. Não aplicável às tabelas Iceberg, pois elas não são criadas automaticamente. Essa configuração afeta apenas a criação de tabelas, não as tabelas existentes. Pode ser |
Sim |
Identificador de conta Snowflake |
Ao utilizar:
|
Sim |
Estratégia de autenticação Snowflake |
Ao utilizar:
|
Sim |
Chave privada Snowflake |
Ao utilizar:
|
Não |
Arquivo de chave privada Snowflake |
Ao utilizar:
|
Não |
Senha de chave privada Snowflake |
Ao usar
|
Não |
Função Snowflake |
Ao usar
|
Sim |
Nome de usuário do Snowflake |
Ao usar
|
Sim |
Kinesis JSON Source Parameters¶
Parâmetro |
Descrição |
Obrigatório |
|---|---|---|
Código da região AWS |
A região AWS onde seu Kinesis Stream está localizado, por exemplo, |
Sim |
ID da chave de acesso AWS |
The AWS Access Key ID to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch. |
Sim |
Chave de acesso secreta AWS |
The AWS Secret Access Key to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch. |
Sim |
Nome do aplicativo Kinesis |
O nome usado para o nome de tabela DynamoDB para rastrear o progresso do aplicativo no consumo do Kinesis Stream. |
Sim |
Posição inicial do fluxo do Kinesis |
A posição inicial do fluxo a partir da qual os dados começam a ser replicados.
|
Sim |
Nome do fluxo do Kinesis |
Nome do fluxo do Kinesis AWS para consumir dados. |
Sim |
Publicação de métricas |
Especifica onde as métricas da Kinesis Client Library são publicadas. Valores possíveis: |
Sim |
Execute o fluxo¶
Clique com o botão direito do mouse no plano e selecione Enable all Controller Services.
Right-click on the connector’s process group and select Start.
O conector inicia a ingestão de dados.
Table Schema¶
The Snowflake table loaded by the connector contains columns named by the keys of your Kinesis messages.
The connector also adds a KINESISMETADATA column which stores metadata about the record.
Veja abaixo um exemplo de tabela do Snowflake carregada pelo conector:
Linha |
ACCOUNT |
SYMBOL |
SIDE |
QUANTITY |
KINESISMETADATA |
|---|---|---|---|---|---|
1 |
ABC123 |
ZTEST |
BUY |
3572 |
{ … objeto KINESISMETADATA … } |
2 |
XYZ789 |
ZABZX |
SELL |
3024 |
{ … objeto KINESISMETADATA … } |
3 |
XYZ789 |
ZTEST |
SELL |
799 |
{ … objeto KINESISMETADATA … } |
4 |
ABC123 |
ZABZX |
BUY |
2033 |
{ … objeto KINESISMETADATA … } |
A coluna KINESISMETADATA contém um objeto com os seguintes campos:
Nome do campo |
Tipo de campo |
Example Value |
Descrição |
|---|---|---|---|
|
Cadeia de caracteres |
|
O nome do fluxo do Kinesis de origem do registro. |
|
Cadeia de caracteres |
|
O identificador do fragmento no fluxo de origem do registro. |
|
Cadeia de caracteres |
|
A hora aproximada em que o registro foi inserido no fluxo (formato ISO 8601). |
|
Cadeia de caracteres |
|
A chave de partição especificada pelo produtor de dados para o registro. |
|
Cadeia de caracteres |
|
O número de sequência exclusivo atribuído pelos fluxos de dados do Kinesis ao registro no fragmento. |
|
Número |
|
O número de subsequência do registro (usado para registros agregados com o mesmo número de sequência). |
|
Cadeia de caracteres |
|
Uma combinação do número de sequência e do número de subsequência para o registro. |
Evolução do esquema¶
Esse conector é compatível com a detecção e a evolução automáticas do esquema. A estrutura das tabelas no Snowflake é definida e evoluída automaticamente para oferecer suporte à estrutura dos novos dados carregados pelo conector.
O Snowflake detecta o esquema dos dados recebidos e carrega os dados em tabelas que correspondam a um esquema definido pelo usuário. O Snowflake também permite adicionar novas colunas ou descartar a restrição NOT NULL de colunas ausentes em novos registros recebidos.
A detecção de esquema com o conector infere tipos de dados com base no dados JSON fornecidos.
Se o conector criar a tabela de destino, a evolução do esquema será ativada por padrão.
If you want to enable or disable schema evolution on an existing table,
use the ALTER TABLE command to set the ENABLE_SCHEMA_EVOLUTION parameter.
You must also use a role that has the OWNERSHIP privilege on the table. For more information, see Evolução do esquema da tabela.
However, if schema evolution is disabled for an existing table, then the connector will try to send the rows with mismatched schemas to the configured failure output port.
Iceberg table support¶
Openflow Connector for Kinesis can ingest data into a Snowflake-managed Apache Iceberg™ table when Iceberg Enabled is set to true.
Requisitos e limitações¶
Before you configure the Openflow Connector for Kinesis for Iceberg table ingestion, note the following requirements and limitations:
É necessário criar uma tabela Iceberg antes de executar o conector.
Certifique-se de que o usuário tenha acesso à inserção de dados nas tabelas criadas.
Configuração e instalação¶
To configure the Openflow Connector for Kinesis for Iceberg table ingestion, follow the steps in Set up Openflow Connector for Kinesis for JSON data format with a few differences noted in the following sections.
Habilite a ingestão na tabela Iceberg¶
Para permitir a ingestão em uma tabela Iceberg, você deve definir o parâmetro Iceberg Enabled como true.
Criação de uma tabela Iceberg para ingestão¶
Antes de executar o conector, é necessário criar uma tabela Iceberg. O esquema inicial da tabela depende das configurações de propriedade da evolução do esquema habilitada do seu conector.
Com a evolução do esquema habilitada, você deve criar uma tabela com uma coluna chamada kinesisMetadata. O conector cria automaticamente as colunas para os campos da mensagem e altera o esquema da coluna kinesisMetadata.
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
kinesisMetadata OBJECT()
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table'
ENABLE_SCHEMA_EVOLUTION = true;
If schema evolution is disabled, you must create the table with all fields the Kinesis message contains. When you create an Iceberg table, you can use Iceberg data types or compatible Snowflake types. The semi-structured VARIANT type isn’t supported. Instead, use a structured OBJECT or MAP.
Por exemplo, considere a seguinte mensagem:
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"date_added": "2024-10-15"
}
A seguinte instrução cria uma tabela com todos os campos que fazem parte da mensagem do Kinesis:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
kinesisMetadata OBJECT(
stream STRING,
shardId STRING,
approximateArrival STRING,
partitionKey STRING,
sequenceNumber STRING,
subSequenceNumber INTEGER,
shardedSequenceNumber STRING
),
id INT,
body_temperature FLOAT,
name STRING,
approved_coffee_types ARRAY(STRING),
animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
date_added DATE
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
Nota
kinesisMetadata must always be created. Field names inside nested structures such as dogs or cats are case sensitive.