Configure o Openflow Connector for MySQL¶
Nota
O conector está sujeito aos termos do conector.
Este tópico descreve as etapas para configurar o Openflow Connector for MySQL.
Pré-requisitos¶
Certifique-se de ter revisado Sobre a Openflow Connector for MySQL.
Certifique-se de que você tenha um MySQL 8 ou posterior para sincronizar os dados com o Snowflake.
Certifique-se de ter configurado o Openflow.
Como administrador de banco de dados, execute as seguintes tarefas:
Habilite os logs binários, salve e configure seu formato da seguinte forma:
log_bin
Defina como
on
.Isso habilita o log binário que registra alterações estruturais e de dados.
binlog_format
Defina como
row
.O conector suporta apenas a replicação baseada em linhas. Versões MySQL 8.x podem ser as últimas a suportar essa configuração, e as versões futuras suportarão apenas a replicação baseada em linhas.
Não aplicável em GCP Cloud SQL, onde é fixado no valor correto.
binlog_row_metadata
Defina como
full
.O conector requer todos os metadados da linha para operar, principalmente os nomes das colunas e as informações da chave primária.
binlog_row_image
Defina como
full
.O conector exige que todas as colunas sejam gravadas no registro binário.
Não aplicável no Amazon Aurora, onde é fixado no valor correto.
binlog_row_value_options
Deixe em branco.
Essa opção afeta apenas as colunas JSON, onde pode ser definida para incluir apenas as partes modificadas dos documentos JSON para as instruções
UPDATE
. O conector exige que os documentos completos sejam gravados no registro binário.binlog_expire_logs_seconds
Defina como pelo menos algumas horas, ou mais, para garantir que o agente de banco de dados possa continuar a replicação incremental após pausas prolongadas ou tempo de inatividade. A Snowflake recomenda que você defina o período de expiração do log binário (binlog_expire_logs_seconds) para pelo menos algumas horas para garantir o funcionamento estável do conector. Após o término do período de expiração do log binário, os arquivos de log binário podem ser removidos automaticamente. Se a integração for pausada por um longo período, por exemplo, devido a trabalhos de manutenção, e os arquivos de logs binários expirados forem excluídos durante esse período, o Openflow não poderá replicar os dados desses arquivos.
Se você estiver usando a replicação programada, o valor precisa ser maior do que o cronograma configurado.
Veja o código a seguir como exemplo:
log_bin = on binlog_format = row binlog_row_metadata = full binlog_row_image = full binlog_row_value_options =
Aumente o valor de
sort_buffer_size
.sort_buffer_size = 4194304
sort_buffer_size
define a quantidade de memória (em bytes) alocada por thread de consulta para operações de classificação na memória, como ORDER BY. Se o valor for muito pequeno, o conector poderá falhar com a seguinte mensagem de erro:Out of sort memory, consider increasing server sort buffer size
. Isso indica quesort_buffer_size
deve ser aumentado.Se você estiver usando bancos de dados do Amazon RDS, aumente o período de retenção relevante para
binlog_expire_logs_seconds
usandords_set_configuration
. Por exemplo, se você quiser armazenar o log binário por 24 horas, chamemysql.rds_set_configuration('binlog retention hours', 24)
.Conecte-se via SSL. Se estiver planejando usar uma conexão SSL para MySQL, prepare o certificado raiz para o servidor de banco de dados. Ele é necessário durante a configuração.
Crie um usuário para o conector. O conector requer um usuário com os privilégios REPLICATION_SLAVE e REPLICATION_CLIENT para ler os logs binários. Conceda esses privilégios:
GRANT REPLICATION SLAVE ON *.* TO '<username>'@'%' GRANT REPLICATION CLIENT ON *.* TO '<username>'@'%'
Conceda o privilégio SELECT em cada tabela replicada:
GRANT SELECT ON <schema>.* TO '<username>'@'%' GRANT SELECT ON <schema>.<table> TO '<username>'@'%'
Para obter mais informações sobre a segurança da replicação, consulte Log binário.
Como administrador de conta Snowflake, execute as seguintes tarefas:
Crie um usuário Snowflake com o tipo SERVICE. Crie um banco de dados para armazenar os dados replicados e configure privilégios para que o usuário Snowflake crie objetos nesse banco de dados, concedendo os privilégios USAGE e CREATE SCHEMA.
CREATE DATABASE <destination_database>; CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow'; CREATE ROLE <openflow_role>; GRANT ROLE <openflow_role> TO USER <openflow_user>; GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>; GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>; CREATE WAREHOUSE <openflow_warehouse> WITH WAREHOUSE_SIZE = 'MEDIUM' AUTO_SUSPEND = 300 AUTO_RESUME = TRUE; GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
Crie um par de chaves seguras (pública e privada). Armazene a chave privada do usuário em um arquivo para fornecer à configuração do conector. Atribua a chave pública ao usuário do serviço Snowflake:
ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
Para obter mais informações, consulte par de chaves.
Designar um warehouse para o conector usar. Comece com o tamanho do warehouse
MEDIUM
e, em seguida, experimente o tamanho, dependendo da quantidade de tabelas que estão sendo replicadas e da quantidade de dados transferidos. Os números de tabelas grandes normalmente são mais bem dimensionados com warehouses multicluster, em vez do tamanho do warehouse.
Configuração do conector¶
Como engenheiro de dados, execute as seguintes tarefas para instalar e configurar o conector:
Instalação do conector¶
Navegue até a página Visão geral do Openflow. Na seção Featured connectors, selecione View more connectors.
Na página de conectores do Openflow, localize o conector e selecione Add to runtime.
Na caixa de diálogo Select runtime, selecione seu tempo de execução na lista suspensa Available runtimes.
Selecione Add.
Nota
Antes de instalar o conector, verifique se você criou um banco de dados e um esquema no Snowflake para que o conector armazene os dados ingeridos.
Autentique-se na implementação com as credenciais de sua conta Snowflake e selecione Allow quando solicitado para permitir que o aplicativo de tempo de execução acesse sua conta Snowflake. O processo de instalação do conector leva alguns minutos para ser concluído.
Autentique-se no tempo de execução com as credenciais de sua conta Snowflake.
A tela do Openflow é exibida com o grupo de processos do conector adicionado a ela.
Configuração do conector¶
Você pode configurar o conector para os seguintes casos de uso:
Replique um conjunto de tabelas em tempo real¶
Clique com o botão direito do mouse no grupo de processos importado e selecione Parameters.
Preencha os valores dos parâmetros necessários conforme descrito em Parâmetros de fluxo.
Parâmetros de fluxo¶
Comece definindo os parâmetros do contexto de Parâmetros de origem do MySQL e, em seguida, o contexto de Parâmetros de destino do MySQL. Após fazer isso, você pode ativar o conector. O conector deve se conectar ao MySQL e ao Snowflake e começar a funcionar. No entanto, o conector não replica nenhum dado até que as tabelas a serem replicadas sejam explicitamente adicionadas à sua configuração.
Para configurar tabelas específicas para replicação, edite o contexto Parâmetros de ingestão do MySQL. Depois que você aplicar as alterações ao contexto de parâmetros de replicação, a configuração será captada pelo conector e o ciclo de vida da replicação será iniciado para cada tabela.
Contexto dos parâmetros de origem do MySQL¶
Parâmetro |
Descrição |
---|---|
URL de conexão do MySQL |
O URL completo do JDBC para o banco de dados de origem. O conector usa o driver MariaDB, que é compatível com MySQL e requer o prefixo Exemplos:
|
Driver MySQL JDBC |
O caminho absoluto para o jar do driver MariaDB JDBC. O conector usa o driver MariaDB, que é compatível com MySQL. Marque a caixa de seleção Reference asset para carregar o driver MariaDB JDBC. Exemplo: |
Nome de usuário MySQL |
O nome de usuário do conector. |
Senha MySQL |
A senha do conector. |
Contexto dos parâmetros de destino do MySQL¶
Parâmetro |
Descrição |
---|---|
Banco de dados de destino |
O banco de dados onde os dados serão mantidos. Ele já deve existir no Snowflake. |
Identificador de conta Snowflake |
Nome da conta Snowflake formatado como [organization-name]-[account-name] onde os dados serão mantidos |
Estratégia de autenticação Snowflake |
Estratégia de autenticação para o Snowflake. Valores possíveis: |
Chave privada Snowflake |
A chave privada RSA utilizada para autenticação. A chave RSA deve ser formatada de acordo com os padrões PKCS8 e ter cabeçalhos e rodapés no padrão PEM. Observe que o arquivo de chave privada Snowflake ou a chave privada Snowflake devem ser definidos. |
Arquivo de chave privada Snowflake |
O arquivo que contém a chave privada RSA usada para autenticação no Snowflake, formatado de acordo com os padrões PKCS8 e com cabeçalhos e rodapés no padrão PEM. A linha do cabeçalho começa com |
Senha de chave privada Snowflake |
A senha associada ao arquivo de chave privada Snowflake |
Função Snowflake |
Função Snowflake usada durante a execução da consulta |
Nome de usuário do Snowflake |
Nome de usuário usado para se conectar à instância Snowflake |
Warehouse Snowflake |
Warehouse Snowflake usado para executar consultas |
Contexto dos parâmetros de ingestão do MySQL¶
Parâmetro |
Descrição |
---|---|
Nomes de tabela inclusos |
Uma lista de caminhos de tabela separados por vírgulas, incluindo seus esquemas. Exemplo: |
Regex de tabela inclusa |
Uma expressão regular para comparar com os caminhos da tabela. Todos os caminhos que corresponderem à expressão serão replicados, e as novas tabelas que corresponderem ao padrão e forem criadas posteriormente também serão incluídas automaticamente. Exemplo: |
Filtrar JSON |
Um JSON contendo uma lista de nomes de tabela totalmente qualificados e um padrão regex para nomes de coluna que devem ser incluídos na replicação. Exemplo: |
CRON do cronograma de tarefas de fusão |
A expressão CRON que define os períodos em que as operações de fusão do diário para a Tabela de destino serão acionadas. Defina-o como |
Remova e adicione novamente uma tabela à replicação¶
Para remover uma tabela da replicação, certifique-se de que ela seja removida dos parâmetros Nomes de tabela inclusos
ou Regex de tabela inclusa
no contexto Parâmetros de replicação.
Se quiser adicionar novamente a tabela à replicação mais tarde, primeiro exclua a tabela de destino correspondente no Snowflake. Após isso, adicione a tabela novamente aos parâmetros Nomes de tabela inclusos
ou Regex de tabela inclusa
. Isso garante que o processo de replicação comece do zero para a tabela.
Essa abordagem também pode ser usada para se recuperar de um cenário de replicação de tabela com falha.
Replique um subconjunto de colunas em uma tabela¶
O conector pode filtrar os dados replicados por tabela para um subconjunto de colunas configuradas.
Para aplicar filtros a colunas, modifique a propriedade Filtro de coluna no contexto Parâmetros de replicação, adicionando uma matriz de configurações – uma entrada para cada tabela à qual deseja aplicar um filtro.
As colunas podem ser incluídas ou excluídas por nome ou padrão. Você pode aplicar uma única condição por tabela ou combinar várias condições, com as exclusões sempre tendo precedência sobre as inclusões.
O exemplo a seguir mostra os campos disponíveis. Os campos schema
e table
são obrigatórios. É necessário um ou mais de included
, excluded
, includedPattern
e excludedPattern
.
[
{
"schema": "<source table schema>",
"table" : "<source table name>",
"included": ["<column name>", "<column name>"],
"excluded": ["<column name>", "<column name>"],
"includedPattern": "<regular expression>",
"excludedPattern": "<regular expression>",
}
]
Monitore as alterações de dados em tabelas¶
O conector replica não apenas o estado atual dos dados das tabelas de origem, mas também cada estado de cada linha de cada conjunto de alterações. Esses dados são armazenados em tabelas de diário criadas no mesmo esquema da tabela de destino.
Os nomes das tabelas de diário são formatados como: <nome da tabela de origem>_JOURNAL_<carimbo de data/hora>_<geração do esquema>
Onde <carimbo de data/hora>
é o valor de segundos de época em que a tabela de origem foi adicionada à replicação e <geração de esquema>
é um número inteiro que aumenta a cada alteração de esquema na tabela de origem. Isso significa que uma tabela de origem que passa por alterações de esquema terá várias tabelas de diário.
Quando uma tabela é removida da replicação e depois adicionada novamente, o valor de <carimbo de data/hora>
será alterado e a <geração de esquema>
começará novamente a partir de 1
.
Importante
A Snowflake recomenda que você não altere as tabelas de diário ou os dados nelas contidos, de forma alguma. Eles são usados pelo conector para atualizar a tabela de destino como parte do processo de replicação.
O conector nunca descarta tabelas de diário, mas só faz uso ativo do último diário para cada tabela de origem replicada. Se quiser recuperar o armazenamento, você pode eliminar com segurança as tabelas de diário relacionadas às tabelas de origem que foram removidas da replicação e todas as tabelas replicadas ativamente, exceto as de última geração.
Por exemplo, se o conector estiver definido para replicar ativamente a tabela de origem orders
e você tiver removido anteriormente a tabela customers
da replicação, poderá ter as seguintes tabelas de diário. Nesse caso, você pode descartar todas elas exceto orders_5678_2
.
customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2
Configure o agendamento de tarefas de fusão¶
O conector usa um warehouse para mesclar informações de captura de dados de alteração (CDC) nas tabelas de destino. Essa operação é acionada pelo processador MergeSnowflakeJournalTable. Se não houver novas alterações ou se não houver novos FlowFiles aguardando na fila MergeSnowflakeJournalTable, nenhuma fusão será acionada e o warehouse será suspenso automaticamente.
Para limitar o custo do warehouse e limitar as mesclagens apenas ao horário programado, use a expressão CRON no parâmetro CRON Cronograma da tarefa de mesclagem. Ele controla o fluxo dos FlowFiles que chegam ao processador MergeSnowflakeJournalTable e as mesclagens são acionadas somente em um período de tempo específico. Para obter mais informações sobre agendamento, consulte Estratégia de agendamento.
Execute o fluxo¶
Clique com o botão direito do mouse no plano e selecione Enable all Controller Services.
Clique com o botão direito do mouse no grupo de processos importado e selecione Start. O conector inicia a ingestão de dados.