Configuração de Openflow Connector for Amazon Kinesis Data Streams¶
Nota
O conector está sujeito aos Termos do conector Snowflake.
Este tópico descreve como configurar o Openflow Connector for Amazon Kinesis Data Streams.
O Openflow Connector for Amazon Kinesis Data Streams foi projetado para ingestão de mensagens JSON dos fluxos do Kinesis para tabelas do Snowflake, com recursos de evolução de esquema.
Configurar o Openflow Connector para Kinesis¶
Pré-requisitos¶
Consulte Openflow Connector for Amazon Kinesis Data Streams.
Certifique-se de ter revisado Configuração do Openflow - BYOC ou Configuração do Openflow – Implantações do Snowflake.
Se você usa o Openflow – Implantações do Snowflake, certifique-se de ter revisado a configuração dos domínios necessários e concedido acesso aos domínios necessários para o conector Kinesis.
Configurar funções e políticas do IAM na AWS¶
Como administrador do AWS, execute as seguintes ações em sua conta AWS:
Crie um usuário ou função do IAM na AWS que o Openflow usará para acessar o fluxo de dados do Kinesis. Para obter mais informações, consulte como criar usuários do IAM na documentação da AWS.
Garanta que o usuário da AWS tenha credenciais de chave de acesso configuradas.
Conceda ao usuário da AWS as seguintes permissões do IAM:
Serviço
Ações
Recursos (ARNs)
Objetivo
Amazon Kinesis Data Streams
kinesis:DescribeStream,kinesis:DescribeStreamConsumer,kinesis:GetRecords,kinesis:GetShardIterator,kinesis:ListShards,kinesis:RegisterStreamConsumerarn:aws:kinesis:${REGION}:${ACCOUNT_ID}:stream/${STREAM_NAME}Descobre fragmentos, lê registros por meio da sondagem de taxa de transferência compartilhada, resolve o ARN do fluxo, registra um consumidor de fan-out aprimorado e sonda o status do consumidor durante o registro.
Amazon Kinesis Data Streams
kinesis:DeregisterStreamConsumer,kinesis:DescribeStreamConsumer,kinesis:SubscribeToShardarn:aws:kinesis:${REGION}:${ACCOUNT_ID}:stream/${STREAM_NAME}/consumer/*Descreve, assina e cancela o registro de consumidores de fan-out aprimorado por ARN de consumidor.
Amazon DynamoDB
dynamodb:CreateTable,dynamodb:DeleteTable,dynamodb:DescribeTable,dynamodb:GetItem,dynamodb:PutItem,dynamodb:Query,dynamodb:Scan,dynamodb:UpdateItemarn:aws:dynamodb:${REGION}:${ACCOUNT_ID}:table/${APPLICATION_NAME},arn:aws:dynamodb:${REGION}:${ACCOUNT_ID}:table/${APPLICATION_NAME}_migrationCria e gerencia a tabela de ponto de verificação/concessão (concessão de fragmentos, pulsações de nós, pontos de verificação) e uma tabela de migração temporária usada durante a migração única de tabelas de ponto de verificação legadas.
Exemplo de política do IAM:
Antes de usar a política de exemplo, substitua os seguintes espaços reservados:
Espaço reservado
Descrição
${REGION}Sua região AWS (por exemplo,
us-east-1)${ACCOUNT_ID}O ID da sua conta AWS (por exemplo,
123456789012)${STREAM_NAME}O valor do parâmetro de conector AWS Kinesis Stream Name
${APPLICATION_NAME}O valor do parâmetro de conector AWS Kinesis Application Name. Usado como o nome da tabela de ponto de verificação do DynamoDB e como o nome do consumidor registrado de fan-out aprimorado.
Nota
${APPLICATION_NAME}_migrationé uma tabela do DynamoDB temporária criada somente durante uma migração única das tabelas de ponto de verificação legadas para o novo esquema. Ela é excluída automaticamente quando a migração é concluída. Se a sua implantação nunca usou o conector baseado em KCL legado, você pode omitir o ARN da tabela de migração da política.A ação
dynamodb:DeleteTableé usada durante o processo de migração e pode ser removida da política depois que a migração é confirmada como concluída.A ação
kinesis:DeregisterStreamConsumeré invocada quando o processador é removido da tela. Se a entidade do IAM não tiver essa permissão, o registro do consumidor deverá ser cancelado manualmente por meio do console ou da CLI da AWS.
Configure a conta Snowflake¶
Como administrador de conta Snowflake, execute as seguintes tarefas:
Crie um novo usuário de serviço Snowflake com o tipo SERVICE.
Crie uma nova função ou use uma função existente e conceda os privilégios de banco de dados.
O conector exige que o usuário crie a tabela de destino. Certifique-se de que o usuário tenha os privilégios necessários para gerenciar objetos Snowflake:
Objeto
Privilégio
Notas
Banco de dados
USAGE
Esquema
USAGE
Tabela
OWNERSHIP
Necessário para que o conector faça a ingestão de dados em uma tabela.
A Snowflake recomenda a criação de um usuário e uma função separados para cada fluxo do Kinesis para ter melhor controle de acesso.
Você pode usar o script a seguir para criar e configurar uma função personalizada (requer SECURITYADMIN ou equivalente):
Nota
Os privilégios devem ser concedidos diretamente à função do conector e não podem ser herdados.
Configurar a tabela de destino
Recomendamos fortemente usar a evolução de esquema do lado do servidor para alterações de esquema e uma tabela de erros para registro de erros DML.
O exemplo abaixo mostra como criar uma tabela e adicionar permissões OWNERSHIP.
Esses conectores são compatíveis com a detecção e a evolução automáticas de esquema. A estrutura das tabelas no Snowflake é definida e evoluída automaticamente para oferecer suporte à estrutura dos novos dados carregados pelo conector. Ela mapeia automaticamente as chaves de primeiro nível do conteúdo do registro para as colunas de tabela correspondentes por nome (sem distinção entre maiúsculas e minúsculas).
Quando a evolução de esquema está habilitada, o Snowflake pode expandir automaticamente a tabela de destino adicionando novas colunas que são detectadas no fluxo de entrada e descartando as restrições NOT NULL para acomodar novos padrões de dados. Para obter mais informações, consulte Evolução do esquema de tabela.
Se ENABLE_SCHEMA_EVOLUTION não estiver habilitada, será necessário criar o esquema manualmente estendendo a definição da tabela. O conector tenta corresponder as chaves de primeiro nível do conteúdo do registro às colunas da tabela por nome. Se as chaves do JSON não corresponderem às colunas da tabela, o conector vai ignorá-las.
(Opcional) Configurar um gerenciador de segredos
A Snowflake recomenda enfaticamente essa etapa. Configure um gerenciador de segredos compatível com o Openflow, por exemplo, AWS, Azure e Hashicorp, e armazene as chaves públicas e privadas no armazenamento de segredos.
Depois que o gerenciador de segredos estiver configurado, determine como você se autenticará nele. Na AWS, é recomendável que você use a função de instância EC2 associada ao Openflow, pois dessa forma nenhum outro segredo precisa ser mantido.
Na tela do Openflow, configure um provedor de parâmetros associado a esse gerenciador de segredos, usando o menu de opções no canto superior direito. Navegue até Controller Settings » Parameter Provider e depois buscar seus valores de parâmetro.
Nesse momento, todas as credenciais podem ser referenciadas com os caminhos de parâmetros associados e nenhum valor sensível precisa ser mantido no Openflow.
Conceder acesso a usuários
Outros usuários do Snowflake que precisem de acesso aos dados brutos ingeridos pelo conector (por exemplo, para processamento personalizado no Snowflake) devem receber a função criada na etapa 2.
Configuração do conector¶
Como engenheiro de dados, execute as seguintes tarefas para instalar e configurar o conector:
Instalação do conector¶
Navegue até a página de visão geral do Openflow. Na seção Featured connectors, selecione View more connectors.
Na página de conectores do Openflow, encontre Openflow connector for Amazon Kinesis Data Streams e selecione Add to runtime.
Na caixa de diálogo Select runtime, selecione o tempo de execução na lista suspensa Available runtimes e clique em Add.
Nota
Antes de instalar o conector, verifique se você criou um banco de dados, um esquema e uma tabela no Snowflake para que o conector armazene os dados ingeridos.
Autentique-se na implantação com as credenciais da sua conta Snowflake e selecione Allow quando solicitado para permitir que o aplicativo de tempo de execução acesse sua conta Snowflake. O processo de instalação do conector leva alguns minutos para ser concluído.
Autentique-se no tempo de execução com as credenciais de sua conta Snowflake.
A tela do Openflow é exibida com o grupo de processos do conector adicionado a ela.
Configuração do conector¶
Se necessário, personalize a configuração do conector antes de configurar os parâmetros internos.
Preencher os parâmetros do grupo de processos
Clique com o botão direito do mouse no grupo de processos importado e selecione Parameters.
Preencha os valores dos parâmetros necessários.
Parâmetros comuns¶
Parâmetro |
Descrição |
Obrigatório |
|---|---|---|
ID da chave de acesso AWS |
O AWS da chave de acesso ID para se conectar ao seu Kinesis Stream e DynamoDB. |
Sim |
Região AWS do Kinesis |
A região AWS à qual você deve se conectar. Use o formato de região AWS regular, por exemplo: |
Sim |
Chave de acesso secreta AWS |
A chave de acesso secreta AWS para se conectar ao seu Kinesis Stream e DynamoDB. |
Sim |
Nome do aplicativo AWS Kinesis |
O nome usado como nome da tabela do DynamoDB para rastrear o progresso do aplicativo sobre o consumo de fluxos do Kinesis. |
Sim |
Tipo de consumidor do AWS Kinesis |
A estratégia usada para ler os registros de um fluxo do Kinesis. Deve ser um dos seguintes valores: SHARED_THROUGHPUT, ENHANCED_FAN_OUT. Para obter mais informações, consulte as diferenças entre o consumidor de taxa de transferência compartilhada e o consumidor de fan-out aprimorado. |
Sim |
Posição inicial do fluxo do AWS Kinesis |
A posição inicial do fluxo a partir da qual os dados começam a ser replicados. Isso entra em vigor somente durante a ativação inicial para um determinado nome de aplicativo AWS Kinesis. Os valores possíveis são: LATEST: último registro armazenado. TRIM_HORIZON: registro armazenado mais antigo. |
Sim |
Nome do fluxo do AWS Kinesis |
Nome do fluxo do AWS Kinesis do qual consumir os dados. |
Sim |
Banco de dados de destino do Snowflake |
O banco de dados onde os dados serão persistidos. Ele já deve existir no Snowflake. O nome diferencia maiúsculas de minúsculas. Para identificadores sem aspas, forneça o nome em maiúsculas. |
Sim |
Esquema de destino do Snowflake |
O esquema onde os dados serão persistidos, que já deve existir no Snowflake. O nome diferencia maiúsculas de minúsculas. Para identificadores sem aspas, forneça o nome em maiúsculas. Veja os exemplos a seguir:
|
Sim |
Tabela de destino do Snowflake |
A tabela em que os dados serão mantidos. Ele já deve existir no Snowflake. O nome diferencia maiúsculas de minúsculas. Para identificadores sem aspas, forneça o nome em maiúsculas. |
Sim |
Iniciar o conector¶
Clique com o botão direito do mouse no plano e selecione Enable all Controller Services.
Clique com o botão direito no plano e selecione Start. O conector inicia a ingestão de dados.
Explicando a coluna KINESISMETADATA¶
O conector preenche a estrutura de KINESISMETADATA com os metadados sobre o registro do Kinesis. A estrutura contém as seguintes informações:
Nome do campo |
Tipo de campo |
Valor de exemplo |
Descrição |
|---|---|---|---|
fluxo |
Cadeia de caracteres |
|
O nome do fluxo do Kinesis de origem do registro. |
shardId |
Cadeia de caracteres |
|
O identificador do fragmento no fluxo de origem do registro. |
approximateArrival |
Cadeia de caracteres |
|
A hora aproximada em que o registro foi inserido no fluxo (formato ISO 8601). |
partitionKey |
Cadeia de caracteres |
|
A chave de partição especificada pelo produtor de dados para o registro. |
sequenceNumber |
Cadeia de caracteres |
|
O número de sequência exclusivo atribuído pelos fluxos de dados do Kinesis ao registro no fragmento. |
subSequenceNumber |
Número |
|
O número de subsequência do registro (usado para registros agregados com o mesmo número de sequência). |
shardedSequenceNumber |
Cadeia de caracteres |
|
Uma combinação do número de sequência e do número de subsequência para o registro. |
Medindo a latência de ingestão¶
Para rastreamento de alterações, processamento incremental e consultas do Time Travel com base no horário de modificação da linha, é possível usar o recurso ROW_TIMESTAMP.
Ele pode ser habilitado executando o seguinte comando na tabela de destino:
Após a habilitação dos carimbos de data/hora de linha, as tabelas expõem a coluna METADATA$ROW_LAST_COMMIT_TIME, que retorna o carimbo de data/hora da última modificação de cada linha.
Para obter mais informações, consulte Carimbos de data/hora de linha.
Nota
O carimbo de data/hora de linha não está disponível para tabelas interativas. Para obter mais informações, consulte Limitações das tabelas interativas.
Usando o conector com tabelas Apache Iceberg™¶
O conector pode ingerir dados em tabelas Apache Iceberg™ gerenciadas pelo Snowflake, mas é necessário atender aos seguintes requisitos:
Você deve ter recebido o privilégio USAGE no volume externo associado à sua tabela Apache Iceberg™.
Você deve criar uma tabela Apache Iceberg™ antes de executar o conector.
Concessão de uso em um volume externo¶
Por exemplo, se a tabela Iceberg usar o volume externo kinesis_external_volume e o conector usar a função openflow_kinesis_connector_role_1, execute a seguinte instrução:
Criar uma tabela Apache Iceberg™ para ingestão¶
O conector não cria tabelas Iceberg automaticamente e não oferece suporte à evolução de esquema. Antes de executar o conector, você deve criar uma tabela Iceberg manualmente.
Ao criar uma tabela Iceberg, é possível usar tipos de dados Iceberg (incluindo VARIANT) ou tipos compatíveis com o Snowflake.
Por exemplo, considere a seguinte mensagem:
Para criar uma tabela Iceberg para a mensagem de exemplo, use uma das seguintes instruções:
Usando o conector com tabelas interativas¶
As tabelas interativas são um tipo especial de tabela do Snowflake otimizada para consultas de baixa latência e alta simultaneidade. Você pode saber mais sobre tabelas interativas na documentação de tabelas interativas.
Crie uma tabela interativa:
Considerações importantes:
As tabelas interativas têm limitações e restrições de consulta específicas. Revise a documentação de tabelas interativas antes de usá-las com o conector.
Para tabelas interativas, as transformações necessárias devem ser tratadas na definição da tabela.
Warehouses interativos são necessários para consultar tabelas interativas de forma eficiente.
Usando o conector com um esquema definido pelo cliente para a tabela de destino¶
O conector trata cada registro do Kinesis como uma linha a ser inserida em uma tabela do Snowflake. Por exemplo, se você tem um tópico do Kinesis com o conteúdo da mensagem estruturado como neste JSON:
Por padrão, você não precisa especificar todos os campos do JSON. A evolução de esquema cuidará disso. Entretanto, se você preferir um esquema estático, ele pode ser criado executando:
Usando o conector com um PIPE definido pelo cliente¶
Se você criar o próprio canal, poderá definir a lógica de transformação de dados na instrução COPY INTO do canal. Você pode renomear as colunas conforme exigido e converter os tipos de dados conforme necessário. Por exemplo:
Quando você define o próprio canal, as colunas da tabela de destino não têm que corresponder às chaves JSON. Você pode renomear as colunas para os nomes desejados e converter os tipos de dados se necessário.
Para ajustar o conector para funcionar com um canal personalizado, execute as seguintes tarefas:
Clique com o botão direito no processador PublishSnowpipeStreaming usado no fluxo de ingestão do Kinesis na tela do Openflow.
Selecione Configure no menu de contexto.
Navegue até a guia Properties.
No campo Destination type, escolha Pipe.
No campo Pipe, digite o nome do seu canal.
Selecione Apply para salvar a configuração.
Personalizando o tratamento de erros¶
O tratamento de erros é dividido entre falhas no lado do Openflow e falhas no lado do servidor no serviço Snowpipe Streaming.
Erros do Openflow (falhas no lado do cliente): erros, como cargas úteis não analisáveis ou falhas na transformação personalizada, ocorrem antes que os registros cheguem ao Snowflake. Por padrão, esses registros são descartados. É possível processar esses erros no Openflow: use FlowFiles do relacionamento de falha de análise no processador ConsumeKinesis.
Erros do Snowpipe Streaming (falhas no lado do servidor): erros de registros que chegam ao Snowflake, mas que são incompatíveis com o esquema da tabela de destino (por exemplo, incompatibilidades de tipo), são capturados pela infraestrutura do Snowflake. Quando o registro de erros está habilitado na tabela de destino (
error_logging = true), as linhas com falha são automaticamente ingeridas na tabela de erros de destino.