Configure o Openflow Connector for PostgreSQL¶
Nota
O conector está sujeito aos termos do conector.
Este tópico descreve as etapas para configurar o Openflow Connector for PostgreSQL.
Pré-requisitos¶
Certifique-se de ter revisado Sobre a Openflow Connector for PostgreSQL.
Certifique-se de ter revisado as versões compatíveis do PostgreSQL.
Certifique-se de ter configurado o Openflow.
Como administrador de banco de dados, execute as seguintes tarefas:
Certifique-se de que haja espaço suficiente em disco no servidor PostgreSQL para o WAL. Isso ocorre porque, uma vez criado, um slot de replicação faz com que o PostgreSQL retenha os dados WAL da posição mantida pelo slot de replicação, até que o conector confirme e avance essa posição.
Certifique-se de que todas as tabelas habilitadas para replicação tenham uma chave primária. A chave pode ser uma coluna única ou composta.
Defina o REPLICA IDENTITY das tabelas como
DEFAULT
. Isso garante que as chaves primárias sejam representadas no WAL, e que o conector possa lê-las.Crie um usuário para o conector. O conector requer um usuário com o atributo
REPLICATION
e permissões para SELECT de cada tabela replicada. Crie esse usuário com uma senha para entrar na configuração do conector. Para obter mais informações sobre segurança de replicação, consulte Segurança.
Configuração de wal_level¶
Openflow Connector for PostgreSQL requer que wal_level seja definido como logical
.
Dependendo de onde o servidor PostgreSQL estiver hospedado, você pode configurar o wal_level da seguinte forma:
No local |
Execute a seguinte consulta com superusuário ou usuário com privilégio
|
RDS |
O usuário usado pelo agente precisa ter a função Você também precisa definir:
|
AWS Aurora |
Defina o parâmetro estático |
GCP |
Defina os seguintes sinalizadores:
|
Azure |
Defina o suporte de replicação como |
Crie uma publicação¶
O Openflow Connector for PostgreSQL requer que uma publicação seja criada e configurada em PostgreSQL antes do início da replicação. Você pode criá-la para todas as tabelas ou para um subconjunto delas, bem como para tabelas específicas apenas com colunas especificadas. Certifique-se de que todas as tabelas e colunas que você planeja replicar estejam incluídas na publicação. Você também pode modificar a publicação posteriormente, enquanto o conector estiver em execução. Para criar e configurar uma publicação, faça o seguinte:
Faça login como um usuário com o privilégio
CREATE
no banco de dados e execute a seguinte consulta:
CREATE PUBLICATION <publication name>;
Defina as tabelas que o agente do banco de dados poderá ver usando:
ALTER PUBLICATION <publication name> ADD TABLE <table name>;
Importante
PostgreSQL 15 e posteriores oferecem suporte à configuração de publicações para um subconjunto específico de colunas da tabela. Para que o conector seja corretamente compatível com isso, você deve usar as configurações de filtragem de coluna para incluir as mesmas colunas definidas na publicação.
Sem essa configuração, o conector apresentará o seguinte comportamento:
Na tabela de destino, as colunas que não estiverem incluídas no filtro serão sufixadas com
__DELETED
. Todos os dados replicados durante a fase do instantâneo ainda estarão lá.Após adicionar novas colunas à publicação, a tabela sofrerá uma falha permanente e precisará reiniciar sua replicação.
Para obter mais informações, consulte ALTER PUBLICATION.
Como administrador de conta Snowflake, execute as seguintes tarefas:
Crie um usuário Snowflake com o tipo SERVICE. Crie um banco de dados para armazenar os dados replicados e configure privilégios para que o usuário Snowflake crie objetos nesse banco de dados, concedendo os privilégios USAGE e CREATE SCHEMA.
CREATE DATABASE <destination_database>; CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow'; CREATE ROLE <openflow_role>; GRANT ROLE <openflow_role> TO USER <openflow_user>; GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>; GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>; CREATE WAREHOUSE <openflow_warehouse> WITH WAREHOUSE_SIZE = 'MEDIUM' AUTO_SUSPEND = 300 AUTO_RESUME = TRUE; GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
Crie um par de chaves seguras (pública e privada). Armazene a chave privada do usuário em um arquivo para fornecer à configuração do conector. Atribua a chave pública ao usuário do serviço Snowflake:
ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
Para obter mais informações, consulte par de chaves.
Designar um warehouse para o conector usar. Comece com o tamanho do warehouse
MEDIUM
e, em seguida, experimente o tamanho, dependendo da quantidade de tabelas que estão sendo replicadas e da quantidade de dados transferidos. Os números de tabelas grandes normalmente são mais bem dimensionados com warehouses multicluster, em vez do tamanho do warehouse.
Importe a definição do conector para o Openflow¶
Navegue até a página Visão geral do Openflow. Na seção Featured connectors, selecione View more connectors.
Na página de conectores do Openflow, localize o conector e selecione Add to runtime.
Na caixa de diálogo Select runtime, selecione seu tempo de execução na lista suspensa Available runtimes.
Selecione Add.
Nota
Antes de instalar o conector, verifique se você criou um banco de dados e um esquema no Snowflake para que o conector armazene os dados ingeridos.
Autentique-se na implementação com as credenciais de sua conta Snowflake e selecione Allow quando solicitado para permitir que o aplicativo de tempo de execução acesse sua conta Snowflake. O processo de instalação do conector leva alguns minutos para ser concluído.
Autentique-se no tempo de execução com as credenciais de sua conta Snowflake.
A tela do Openflow é exibida com o grupo de processos do conector adicionado a ela.
Configuração do conector¶
Você pode configurar o conector para os seguintes casos de uso:
Replique um conjunto de tabelas em tempo real¶
Clique com o botão direito do mouse no grupo de processos importado e selecione Parameters.
Preencha os valores dos parâmetros necessários conforme descrito em Parâmetros de fluxo.
Parâmetros de fluxo¶
Comece definindo os parâmetros do contexto de Parâmetros de origem do PostgreSQL e, em seguida, o contexto de Parâmetros de destino do PostgreSQL. Feito isso, você pode ativar o conector e ele deve se conectar ao PostgreSQL e ao Snowflake e começar a funcionar. No entanto, ele não replicará nenhum dado até que qualquer tabela seja explicitamente adicionada à sua configuração.
Para configurar tabelas específicas para replicação, edite o contexto Parâmetros de ingestão do PostgreSQL. Logo depois que você aplicar as alterações ao contexto Parâmetros de replicação, a configuração será captada pelo conector, e o ciclo de vida da replicação será iniciado para cada tabela.
Contexto dos parâmetros de origem do PostgreSQL¶
Parâmetro |
Descrição |
---|---|
URL de conexão Postgres |
O URL completo do JDBC para o banco de dados de origem. Exemplo: |
Driver Postgres JDBC |
O caminho para o jar do driver PostgreSQL JDBC. Faça o download do jar em seu site e, em seguida, marque a caixa de seleção Reference asset para fazer o upload e anexá-lo. |
Modo Postgres SSL |
Ative ou desative as conexões SSL. |
Certificado SSL da raiz Postgres |
O conteúdo completo do certificado raiz do banco de dados. Opcional se SSL estiver desativado. |
Nome de usuário do Postgres |
O nome de usuário do conector. |
Senha do Postgres |
A senha do conector. |
Nome da publicação |
O nome da publicação que você criou anteriormente. |
Contexto dos parâmetros de destino do PostgreSQL¶
Parâmetro |
Descrição |
---|---|
Banco de dados de destino |
O banco de dados onde os dados serão mantidos. Ele já deve existir no Snowflake. |
Identificador de conta Snowflake |
Nome da conta Snowflake formatado como [organization-name]-[account-name] onde os dados serão mantidos |
Estratégia de autenticação Snowflake |
Estratégia de autenticação para o Snowflake. Valores possíveis: |
Chave privada Snowflake |
A chave privada RSA utilizada para autenticação. A chave RSA deve ser formatada de acordo com os padrões PKCS8 e ter cabeçalhos e rodapés no padrão PEM. Observe que o arquivo de chave privada Snowflake ou a chave privada Snowflake devem ser definidos. |
Arquivo de chave privada Snowflake |
O arquivo que contém a chave privada RSA usada para autenticação no Snowflake, formatado de acordo com os padrões PKCS8 e com cabeçalhos e rodapés no padrão PEM. A linha do cabeçalho começa com |
Senha de chave privada Snowflake |
A senha associada ao arquivo de chave privada Snowflake |
Função Snowflake |
Função Snowflake usada durante a execução da consulta |
Nome de usuário do Snowflake |
Nome de usuário usado para se conectar à instância Snowflake |
Warehouse Snowflake |
Warehouse Snowflake usado para executar consultas |
Contexto dos parâmetros de ingestão do PostgreSQL¶
Parâmetro |
Descrição |
---|---|
Nomes de tabela inclusos |
Uma lista de caminhos de tabela separados por vírgulas, incluindo seus esquemas. Exemplo: |
Regex de tabela inclusa |
Uma expressão regular para comparar com os caminhos da tabela. Todos os caminhos que corresponderem à expressão serão replicados, e as novas tabelas que corresponderem ao padrão e forem criadas posteriormente também serão incluídas automaticamente. Exemplo: |
Filtrar JSON |
Um JSON contendo uma lista de nomes de tabela totalmente qualificados e um padrão regex para nomes de coluna que devem ser incluídos na replicação. Exemplo: |
CRON do cronograma de tarefas de fusão |
A expressão CRON que define os períodos em que as operações de fusão do diário para a Tabela de destino serão acionadas. Defina-o como Por exemplo:
Para obter mais informações e exemplos, consulte o tutorial de acionadores do cron na documentação do Quartz |
Remova e adicione novamente uma tabela à replicação¶
Para remover uma tabela da replicação, certifique-se de que ela seja removida dos parâmetros Nomes de tabela inclusos
ou Regex de tabela inclusa
no contexto Parâmetros de replicação.
Se quiser adicionar novamente a tabela à replicação mais tarde, primeiro exclua a tabela de destino correspondente no Snowflake. Após isso, adicione a tabela novamente aos parâmetros Nomes de tabela inclusos
ou Regex de tabela inclusa
. Isso garante que o processo de replicação comece do zero para a tabela.
Essa abordagem também pode ser usada para se recuperar de um cenário de replicação de tabela com falha.
Replique um subconjunto de colunas em uma tabela¶
O conector pode filtrar os dados replicados por tabela para um subconjunto de colunas configuradas.
Para aplicar filtros a colunas, modifique a propriedade Filtro de coluna no contexto Parâmetros de replicação, adicionando uma matriz de configurações – uma entrada para cada tabela à qual deseja aplicar um filtro.
As colunas podem ser incluídas ou excluídas por nome ou padrão. Você pode aplicar uma única condição por tabela ou combinar várias condições, com as exclusões sempre tendo precedência sobre as inclusões.
O exemplo a seguir mostra os campos disponíveis. schema
e table
são obrigatórios e, em seguida, um ou mais dos campos included
, excluded
, includedPattern
, excludedPattern
são obrigatórios.
[
{
"schema": "<source table schema>",
"table" : "<source table name>",
"included": ["<column name>", "<column name>"],
"excluded": ["<column name>", "<column name>"],
"includedPattern": "<regular expression>",
"excludedPattern": "<regular expression>",
}
]
Monitore as alterações de dados em tabelas¶
O conector replica não apenas o estado atual dos dados das tabelas de origem, mas também cada estado de cada linha de cada conjunto de alterações. Esses dados são armazenados em tabelas de diário criadas no mesmo esquema da tabela de destino.
Os nomes das tabelas de diário são formatados como: <nome da tabela de origem>_JOURNAL_<carimbo de data/hora>_<geração do esquema>
Onde <carimbo de data/hora>
é o valor de segundos de época em que a tabela de origem foi adicionada à replicação e <geração de esquema>
é um número inteiro que aumenta a cada alteração de esquema na tabela de origem. Isso significa que uma tabela de origem que passa por alterações de esquema terá várias tabelas de diário.
Quando uma tabela é removida da replicação e depois adicionada novamente, o valor de <carimbo de data/hora>
será alterado e a <geração de esquema>
começará novamente a partir de 1
.
Importante
A Snowflake recomenda que você não altere as tabelas de diário ou os dados nelas contidos, de forma alguma. Eles são usados pelo conector para atualizar a tabela de destino como parte do processo de replicação.
O conector nunca descarta tabelas de diário, mas só faz uso ativo do último diário para cada tabela de origem replicada. Se quiser recuperar o armazenamento, você pode eliminar com segurança as tabelas de diário relacionadas às tabelas de origem que foram removidas da replicação e todas as tabelas replicadas ativamente, exceto as de última geração.
Por exemplo, se o conector estiver definido para replicar ativamente a tabela de origem orders
e você tiver removido anteriormente a tabela customers
da replicação, poderá ter as seguintes tabelas de diário. Nesse caso, você pode descartar todas elas exceto orders_5678_2
.
customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2
Configure o agendamento de tarefas de fusão¶
O conector usa um warehouse para mesclar informações de captura de dados de alteração (CDC) nas tabelas de destino. Essa operação é acionada pelo processador MergeSnowflakeJournalTable. Se não houver novas alterações ou se não houver novos FlowFiles aguardando na fila MergeSnowflakeJournalTable, nenhuma fusão será acionada e o warehouse será suspenso automaticamente.
Para limitar o custo do warehouse e limitar as mesclagens apenas ao horário programado, use a expressão CRON no parâmetro CRON Cronograma da tarefa de mesclagem. Ele controla o fluxo dos FlowFiles que chegam ao processador MergeSnowflakeJournalTable e as mesclagens são acionadas somente em um período de tempo específico. Para obter mais informações sobre agendamento, consulte Estratégia de agendamento.
Pare ou exclua o conector¶
Ao interromper ou remover o conector, você deve considerar o slot de replicação que o conector usa.
O conector cria seu próprio slot de replicação com um nome que começa com snowflake_connector_
seguido de um sufixo aleatório. À medida que o conector lê o fluxo de replicação, ele avança o slot, de modo que o PostgreSQL possa cortar o log WAL e liberar espaço em disco.
Quando o conector é pausado, o slot não é avançado e as alterações no banco de dados de origem continuam aumentando o tamanho do registro WAL. Você não deve manter o conector em pausa por longos períodos de tempo, especialmente em bancos de dados de alto tráfego.
Quando o conector é removido, seja excluindo-o da tela do Openflow ou por qualquer outro meio, como a exclusão de toda a instância do Openflow, o slot de replicação permanece no lugar e deve ser removido manualmente.
Se você tiver várias instâncias do conector replicando do mesmo banco de dados PostgreSQL, cada instância criará seu próprio slot de replicação com nome exclusivo. Ao descartar um slot de replicação manualmente, certifique-se de que seja o correto. Você pode ver qual slot de replicação é usado por uma determinada instância do conector verificando o estado do processador CaptureChangePostgreSQL
.
Execute o fluxo¶
Clique com o botão direito do mouse no plano e selecione Enable all Controller Services.
Clique com o botão direito do mouse no grupo de processos importado e selecione Start. O conector inicia a ingestão de dados.