Configure o Openflow Connector for Kinesis¶
Nota
O conector está sujeito aos termos do conector.
Este tópico descreve as etapas para configurar o Openflow Connector for Kinesis.
Pré-requisitos¶
Certifique-se de ter revisado Sobre a Openflow Connector for Kinesis.
Certifique-se de ter configurado o Openflow.
Configure um fluxo Kinesis¶
Como administrador do AWS, execute as seguintes ações em sua conta AWS:
Certifique-se de que você tenha uma conta AWS com permissões de IAM para acessar o Kinesis Streams e DynamoDB.
Opcionalmente, crie um fluxo Kinesis de fila de mensagens não entregues (DLQ). As mensagens que não podem ser analisadas com sucesso podem ser redirecionadas para uma DLQ designada.
Configure a conta Snowflake¶
Como administrador de conta Snowflake, execute as seguintes tarefas:
Crie uma nova função ou use uma função existente e conceda a Privilégios de banco de dados.
Crie um banco de dados de destino e um esquema de destino que serão usados para criar tabelas de destino para armazenar os dados.
Se você planeja usar o recurso do conector para criar automaticamente a tabela de destino se ela ainda não existir, certifique-se de que o usuário tenha os privilégios necessários para criar e gerenciar objetos Snowflake:
Objeto
Privilégio
Notas
Banco de dados
USAGE
Esquema
USAGE . CREATE TABLE .
Após a criação dos objetos de nível de esquema, os privilégios CREATE
object
podem ser revogados.Tabela
OWNERSHIP
Necessário somente ao usar o conector Kinesis para ingerir dados em uma tabela existente. . Se o conector criar uma nova tabela de destino para registros do fluxo Kinesis, a função padrão do usuário especificado na configuração se tornará o proprietário da tabela.
Você pode usar o script a seguir para criar e configurar uma função personalizada (requer SECURITYADMIN ou equivalente):
USE ROLE SECURITYADMIN; CREATE ROLE kinesis_connector_role_1; GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role_1; GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role_1; GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role_1; -- Only for existing tables GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE kinesis_connector_role_1;
Crie um novo usuário de serviço Snowflake com o tipo SERVICE.
Conceda ao usuário do serviço Snowflake a função que você criou nas etapas anteriores.
GRANT ROLE kinesis_connector_role_1 TO USER kinesis_connector_user_1; ALTER USER kinesis_connector_user_1 SET DEFAULT_ROLE = kinesis_connector_role_1;
Configure com autenticação por pares de chaves para o usuário Snowflake SERVICE a partir da etapa 3.
A Snowflake recomenda enfaticamente essa etapa. Configure um gerenciador de segredos compatível com o Openflow, por exemplo, AWS, Azure e Hashicorp, e armazene as chaves públicas e privadas no armazenamento de segredos.
Nota
Se, por algum motivo, você não quiser usar um gerenciador de segredos, será responsável por proteger os arquivos de chave pública e chave privada usados para autenticação de pares de chaves de acordo com as políticas de segurança de sua organização.
Depois que o gerenciador de segredos estiver configurado, determine como você se autenticará nele. No AWS, é recomendável que você use a função de instância EC2 associada ao Openflow, pois dessa forma nenhum outro segredo precisa ser mantido.
No Openflow, configure um provedor de parâmetros associado a esse gerenciador de segredos, no menu de três traços no canto superior direito. Navegue até Controller Settings » Parameter Provider e, em seguida, busque os valores dos parâmetros.
Nesse momento, todas as credenciais podem ser referenciadas com os caminhos de parâmetros associados e nenhum valor sensível precisa ser mantido no Openflow.
Se qualquer outro usuário Snowflake precisar de acesso aos documentos e tabelas brutos ingeridos pelo conector (por exemplo, para processamento personalizado no Snowflake), conceda a esses usuários a função criada na etapa 1.
Designar um warehouse para o conector usar. Comece com o menor tamanho de warehouse e, em seguida, experimente o tamanho, dependendo do número de tabelas sendo replicadas e da quantidade de dados transferidos. Os números de tabelas grandes normalmente são mais bem dimensionados com warehouses multicluster, em vez de warehouses maiores.
Configuração do conector¶
Como engenheiro de dados, execute as seguintes tarefas para instalar e configurar o conector:
Instalação do conector¶
Navegue até a página Visão geral do Openflow. Na seção Featured connectors, selecione View more connectors.
Na página de conectores do Openflow, localize o conector e selecione Add to runtime.
Na caixa de diálogo Select runtime, selecione seu tempo de execução na lista suspensa Available runtimes.
Selecione Add.
Nota
Antes de instalar o conector, verifique se você criou um banco de dados e um esquema no Snowflake para que o conector armazene os dados ingeridos.
Autentique-se na implementação com as credenciais de sua conta Snowflake e selecione Allow quando solicitado para permitir que o aplicativo de tempo de execução acesse sua conta Snowflake. O processo de instalação do conector leva alguns minutos para ser concluído.
Autentique-se no tempo de execução com as credenciais de sua conta Snowflake.
A tela do Openflow é exibida com o grupo de processos do conector adicionado a ela.
Configuração do conector¶
Clique com o botão direito do mouse no grupo de processos importado e selecione Parameters.
Preencha os valores dos parâmetros necessários conforme descrito em Parâmetros de fluxo.
Parâmetros de fluxo¶
Esta seção descreve os parâmetros de fluxo que podem ser configurados com base nos contextos de parâmetros a seguir:
Parâmetros de origem do Kinesis: usado para estabelecer conexão com o Kinesis.
Parâmetros de destino do Kinesis: usado para estabelecer conexão com o Snowflake.
Parâmetros de ingestão do Kinesis: usado para definir a configuração dos dados baixados do Kinesis.
Parâmetros de origem do Kinesis¶
Parâmetro |
Descrição |
---|---|
Código da região AWS |
A região AWS onde seu Kinesis Stream está localizado, por exemplo, |
ID da chave de acesso AWS |
O ID da chave de acesso AWS para se conectar ao seu Kinesis Stream e DynamoDB. |
Chave de acesso secreta AWS |
A chave de acesso secreta AWS para se conectar ao seu Kinesis Stream e DynamoDB. |
URL do registro de esquema |
URL do registro de esquema AVRO. Isso é necessário se o parâmetro Estratégia de acesso ao esquema AVRO estiver definido como |
Tipo de autenticação do registro de esquema |
O tipo de autenticação usado pelo registro de esquema AVRO. Isso é necessário se o parâmetro Estratégia de acesso ao esquema AVRO estiver definido como
|
Nome de usuário do registro de esquema |
O nome de usuário usado para autenticação |
Senha do registro de esquema |
A senha usada para autenticação |
Parâmetros de destino do Kinesis¶
Parâmetro |
Descrição |
---|---|
Banco de dados de destino |
O banco de dados onde os dados serão mantidos. Ele já deve existir no Snowflake. |
Esquema de destino |
O esquema em que os dados serão mantidos. Ele já deve existir no Snowflake. Esse parâmetro diferencia maiúsculas de minúsculas. |
Identificador de conta Snowflake |
Nome da conta Snowflake formatado como [organization-name]-[account-name] onde os dados serão mantidos. |
Estratégia de autenticação Snowflake |
Estratégia de autenticação para o Snowflake. Valores possíveis: |
Chave privada Snowflake |
A chave privada RSA utilizada para autenticação. A chave RSA deve ser formatada de acordo com os padrões PKCS8 e ter cabeçalhos e rodapés no padrão PEM. Observe que o arquivo de chave privada Snowflake ou a chave privada Snowflake devem ser definidos. |
Arquivo de chave privada Snowflake |
O arquivo que contém a chave privada RSA usada para autenticação no Snowflake, formatado de acordo com os padrões PKCS8 e com cabeçalhos e rodapés no padrão PEM. A linha do cabeçalho começa com |
Senha de chave privada Snowflake |
A senha associada ao arquivo de chave privada Snowflake. |
Função Snowflake |
Função Snowflake usada durante a execução da consulta. |
Nome de usuário do Snowflake |
Nome de usuário usado para se conectar à instância Snowflake. |
Warehouse Snowflake |
Warehouse Snowflake usado para executar consultas. Esse parâmetro diferencia maiúsculas de minúsculas. |
Parâmetros de ingestão do Kinesis¶
Parâmetro |
Descrição |
---|---|
Nome do aplicativo Kinesis |
O nome usado para o nome de tabela DynamoDB para rastrear o progresso do aplicativo no consumo do Kinesis Stream. |
Nome do fluxo do Kinesis |
Nome do fluxo do Kinesis AWS para consumir dados. |
Posição inicial do fluxo do Kinesis |
A posição inicial do fluxo a partir da qual os dados começam a ser replicados.
|
Nome do fluxo DLQ do Kinesis |
O nome do fluxo para o qual são enviados todos os registros que falharam no processamento. Se esse parâmetro não for adicionado, você pode esperar um sinal de aviso na parte relacionada à DLQ do conector na tela do Openflow. |
Formato de mensagem |
O formato das mensagens no Kinesis.
|
Estratégia de acesso ao esquema AVRO |
Para acessar os dados no formato de mensagem AVRO, o esquema é necessário. Esse parâmetro define a estratégia para acessar o esquema AVRO de uma determinada mensagem. Se o parâmetro Formato da mensagem for definido como
|
Mapa de fluxo Kinesis para tabela |
Esse parâmetro opcional permite que o usuário especifique quais fluxos devem ser mapeados para quais tabelas. Cada fluxo e seu nome de tabela devem ser separados por dois pontos. Este nome de tabela deve ser um identificador válido do Snowflake, sem aspas. As expressões regulares não podem ser ambíguas e qualquer fluxo correspondente deve corresponder apenas a uma única tabela de destino. Se não for encontrada nenhuma correspondência ou se estiver vazia, o nome do fluxo será usado como nome da tabela.
|
Iceberg ativado |
Especifica se o processador ingere dados em uma tabela Iceberg. O processador falhará se essa propriedade não corresponder ao tipo de tabela real.
|
Execute o fluxo¶
Clique com o botão direito do mouse no plano e selecione Enable all Controller Services.
Clique com o botão direito do mouse no grupo de processos importado e selecione Start.
O conector inicia a ingestão de dados.
Esquema¶
A tabela Snowflake carregada pelo conector contém colunas nomeadas pelas chaves de suas mensagens do Kinesis. Abaixo está o exemplo de uma tabela desse tipo.
Linha |
ACCOUNT |
SYMBOL |
SIDE |
QUANTITY |
---|---|---|---|---|
1 |
ABC123 |
ZTEST |
BUY |
3572 |
2 |
XYZ789 |
ZABZX |
SELL |
3024 |
3 |
XYZ789 |
ZTEST |
SELL |
799 |
4 |
ABC123 |
ZABZX |
BUY |
2033 |
5 |
ABC123 |
ZTEST |
BUY |
1558 |
Evolução do esquema¶
Atualmente, quando Iceberg Enabled
é definido como false
. Se o conector criar a tabela de destino, a evolução do esquema será ativada por padrão. Se você quiser ativar ou desativar a evolução do esquema na tabela existente, use o comando ALTER TABLE para definir o parâmetro ENABLE_SCHEMA_EVOLUTION
. Você também deve usar uma função com o privilégio OWNERSHIP
na tabela. Para obter mais informações, consulte Evolução do esquema da tabela.
No entanto, se a evolução do esquema estiver desativada para uma tabela existente, o conector tentará enviar as linhas com esquemas incompatíveis para as filas de mensagens não entregues configuradas (DLQ).
Para o caso em que Iceberg Enabled
está definido como true
, consulte o parágrafo Evolução do esquema para as tabelas Apache Iceberg™.
Usando o Openflow Connector for Kinesis com as tabelas Apache Iceberg™¶
O Openflow Connector for Kinesis pode ingerir dados em uma tabela Apache Iceberg™ gerenciada pelo Snowflake.
Requisitos e limitações¶
Antes de configurar o conector para ingestão de tabelas Iceberg, observe os seguintes requisitos e limitações:
É necessário criar uma tabela Iceberg antes de executar o conector.
Certifique-se de que o usuário tenha acesso à inserção de dados nas tabelas criadas.
A evolução do esquema não é compatível com as tabelas Iceberg.
Configuração e instalação¶
Para configurar o conector para ingestão da tabela Iceberg, siga as instruções em Configure o conector com algumas diferenças descritas nas seções a seguir.
Habilite a ingestão na tabela Iceberg¶
Para permitir a ingestão em uma tabela Iceberg, você deve definir o parâmetro Iceberg Enabled
como true
.
Criação de uma tabela Iceberg para ingestão¶
Antes de executar o conector, é necessário criar uma tabela Iceberg. Como não há suporte para a evolução do esquema, você deve criar a tabela com todos os campos que a mensagem do Kinesis contém.
Ao criar uma tabela Iceberg, é possível usar tipos de dados Iceberg ou tipos Snowflake compatíveis. O tipo VARIANT semiestruturado não é compatível. Em vez disso, use um OBJECT ou MAP estruturado.
Por exemplo, considere a seguinte mensagem:
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"date_added": "2024-10-15"
}
Para criar uma tabela Iceberg para a mensagem de exemplo, use a seguinte instrução:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table ( id INT, body_temperature FLOAT, name STRING, approved_coffee_types ARRAY(STRING), animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN), date_added DATE ) EXTERNAL_VOLUME = 'my_volume' CATALOG = 'SNOWFLAKE' BASE_LOCATION = 'my_location/my_iceberg_table';
Nota
Os nomes de campo dentro de estruturas aninhadas, como dogs
ou cats
, diferenciam maiúsculas de minúsculas.
Evolução do esquema para as tabelas Apache Iceberg™¶
Atualmente, o conector não oferece suporte à evolução do esquema das tabelas Apache Iceberg™.
Problemas conhecidos¶
O grupo de processos do conector tem uma porta de saída denominada “Falha de upload”. Ela pode ser usada para lidar com FlowFiles que não foram carregados com sucesso no Snowflake. Se essa porta não estiver conectada fora do grupo de processos do conector, ela exibirá um sinal de aviso que pode ser ignorado.
Todos os processadores, quando parados, podem ser ordenados para funcionar uma vez. O processador ConsumeKinesisStream, devido à arquitetura interna, não fará nenhum trabalho significativo quando solicitado a ser executado uma vez. Para que o processador comece a funcionar, ele precisa ser iniciado e funcionar por cerca de dois minutos.