Configure o Openflow Connector for PostgreSQL

Nota

O conector está sujeito aos Termos do conector Snowflake.

Este tópico descreve as etapas para configurar o Openflow Connector for PostgreSQL.

Nota

This connector can be configured to immediately start replicating incremental changes for newly added tables, bypassing the snapshot load phase. This option is often useful when reinstalling the connector in an account where previously replicated data exists and you want to continue replication without having to re-snapshot tables.

For details on the incremental load process, see Incremental replication.

Pré-requisitos

  1. Certifique-se de ter revisado Sobre a Openflow Connector for PostgreSQL.

  2. Certifique-se de ter revisado as versões compatíveis do PostgreSQL.

  3. Recomendado: certifique-se de adicionar apenas uma instância de conector por tempo de execução.

  4. Ensure that you have Configuração do Openflow - BYOC or Set up Openflow - Snowflake Deployments.

  5. If using Openflow - Snowflake Deployments, ensure that you’ve reviewed configuring required domains and have granted access to the required domains for the PostgreSQL connector.

  6. Como administrador de banco de dados, execute as seguintes tarefas:

    1. Configuração de wal_level

    2. Crie uma publicação

    3. Certifique-se de que haja espaço suficiente em disco no servidor PostgreSQL para o WAL. Isso ocorre porque, uma vez criado, um slot de replicação faz com que o PostgreSQL retenha os dados WAL da posição mantida pelo slot de replicação, até que o conector confirme e avance essa posição.

    4. Certifique-se de que todas as tabelas habilitadas para replicação tenham uma chave primária. A chave pode ser uma coluna única ou composta.

    5. Defina o REPLICA IDENTITY das tabelas como DEFAULT. Isso garante que as chaves primárias sejam representadas no WAL, e que o conector possa lê-las.

    6. Crie um usuário para o conector. O conector requer um usuário com o atributo REPLICATION e permissões para SELECT de cada tabela replicada. Crie esse usuário com uma senha para entrar na configuração do conector. Para obter mais informações sobre segurança de replicação, consulte Segurança.

  7. Como administrador de conta Snowflake, execute as seguintes tarefas:

    1. Crie um usuário Snowflake com o tipo SERVICE. Crie um banco de dados para armazenar os dados replicados e configure privilégios para que o usuário Snowflake crie objetos nesse banco de dados, concedendo os privilégios USAGE e CREATE SCHEMA.

      CREATE DATABASE <destination_database>;
      CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow';
      CREATE ROLE <openflow_role>;
      GRANT ROLE <openflow_role> TO USER <openflow_user>;
      GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>;
      GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
      CREATE WAREHOUSE <openflow_warehouse>
        WITH
          WAREHOUSE_SIZE = 'MEDIUM'
          AUTO_SUSPEND = 300
          AUTO_RESUME = TRUE;
      GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
      
      Copy
    2. Crie um par de chaves seguras (pública e privada). Armazene a chave privada do usuário em um arquivo para usá-la durante a configuração do conector. Atribua a chave pública ao usuário do serviço Snowflake:

      ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
      
      Copy

      Para obter mais informações, consulte autenticação de par de chaves.

    3. Designe um warehouse para o conector usar. Comece com o tamanho MEDIUM de warehouse e, em seguida, experimente os tamanhos dependendo da quantidade de tabelas que estão sendo replicadas e da quantidade de dados transferidos. Um grande número de tabelas normalmente escalona melhor com warehouses multicluster, em vez do tamanho do warehouse.

Configuração de wal_level

Openflow Connector for PostgreSQL requer que wal_level seja definido como logical.

Dependendo de onde o servidor PostgreSQL estiver hospedado, você pode configurar o wal_level da seguinte forma:

No local

Execute a seguinte consulta com superusuário ou usuário com privilégio ALTER SYSTEM:

ALTER SYSTEM SET wal_level = logical;
Copy

RDS

O usuário usado pelo agente precisa ter a função rds_superuser ou rds_replication atribuída.

Você também precisa definir:

  • Parâmetro estático rds.logical_replication como 1.

  • Parâmetros max_replication_slots, max_connections e max_wal_senders de acordo com seu banco de dados e configuração de replicação.

AWS Aurora

Defina o parâmetro estático rds.logical_replication como 1.

GCP

Defina os seguintes sinalizadores:

  • cloudsql.logical_decoding=on.

  • cloudsql.enable_pglogical=on.

Para obter mais informações, consulte a documentação do Google Cloud.

Azure

Defina o suporte de replicação como Logical. Para obter mais informações, consulte a documentação do Azure.

Crie uma publicação

O Openflow Connector for PostgreSQL requer que uma publicação seja criada e configurada em PostgreSQL antes do início da replicação. Você pode criá-la para todas as tabelas ou para um subconjunto delas, bem como para tabelas específicas apenas com colunas especificadas. Certifique-se de que todas as tabelas e colunas que você planeja replicar estejam incluídas na publicação. Você também pode modificar a publicação posteriormente, enquanto o conector estiver em execução. Para criar e configurar uma publicação, faça o seguinte:

  1. Efetue login como um usuário com o privilégio CREATE no banco de dados e execute a seguinte consulta:

    • Para PostgreSQL 13 e superior:

      CREATE PUBLICATION <publication name> WITH (publish_via_partition_root = true);
      
      Copy

      O publish_via_partition_root adicional é necessário para replicar tabelas particionadas corretamente. Para saber mais sobre a ingestão de tabelas particionadas, consulte Replicação de uma tabela particionada.

    • Para versões anteriores ao PostgreSQL 13:

      CREATE PUBLICATION <publication name>;
      
      Copy
  2. Defina as tabelas que o agente do banco de dados poderá ver usando:

ALTER PUBLICATION <publication name> ADD TABLE <table name>;
Copy

Para tabelas particionadas, basta adicionar a tabela de partição raiz à publicação. Consulte Replicação de uma tabela particionada para obter mais detalhes.

Importante

PostgreSQL 15 e posteriores oferecem suporte à configuração de publicações para um subconjunto específico de colunas da tabela. Para que o conector seja corretamente compatível com isso, você deve usar as configurações de filtragem de coluna para incluir as mesmas colunas definidas na publicação.

Sem essa configuração, o conector se comportará da seguinte forma:

  • Na tabela de destino, as colunas que não estiverem incluídas no filtro serão sufixadas com __DELETED. Todos os dados replicados durante a fase de instantâneo serão mantidos.

  • Após adicionar novas colunas à publicação, a tabela falhará permanentemente e você precisará reiniciar a replicação.

Para obter mais informações, consulte ALTER PUBLICATION.

Instalação do conector

  1. Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.

  2. Na página de conectores do Openflow, localize o conector e selecione Add to runtime.

  3. In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.

    Nota

    Antes de instalar o conector, verifique se você criou um banco de dados e um esquema no Snowflake para que o conector armazene os dados ingeridos.

  4. Autentique-se na implementação com as credenciais de sua conta Snowflake e selecione Allow quando solicitado para permitir que o aplicativo de tempo de execução acesse sua conta Snowflake. O processo de instalação do conector leva alguns minutos para ser concluído.

  5. Autentique-se no tempo de execução com as credenciais de sua conta Snowflake.

A tela do Openflow é exibida com o grupo de processos do conector adicionado a ela.

Configuração do conector

Você pode configurar o conector para os seguintes casos de uso:

Replique um conjunto de tabelas em tempo real

  1. Clique com o botão direito do mouse no grupo de processos importado e selecione Parameters.

  2. Preencha os valores dos parâmetros necessários conforme descrito em Parâmetros de fluxo.

Parâmetros de fluxo

Comece definindo os parâmetros do contexto PostgreSQL Source Parameters, depois os parâmetros do PostgreSQL Destination Parameters. Em seguida, você pode ativar o conector, que deverá se conectar ao PostgreSQL e ao Snowflake para começar a execução. Entretanto, ele não replicará nenhum dado até que uma tabela seja explicitamente adicionada à configuração.

Para configurar tabelas específicas para replicação, edite o contexto Parâmetros de ingestão do PostgreSQL. Logo depois que você aplicar as alterações ao contexto Parâmetros de replicação, a configuração será captada pelo conector, e o ciclo de vida da replicação será iniciado para cada tabela.

Contexto dos parâmetros de origem do PostgreSQL

Parâmetro

Descrição

URL da conexão do PostgreSQL

O URL completo do JDBC para o banco de dados de origem. Exemplo: jdbc:postgresql://example.com:5432/public

Se você estiver se conectando ao servidor de replicação PostgreSQL, consulte Replicação de tabelas de um servidor de replicação PostgreSQL.

Driver PostgreSQL JDBC

O caminho para o jar do driver PostgreSQL JDBC. Faça o download do jar em seu site e, em seguida, marque a caixa de seleção Reference asset para fazer o upload e anexá-lo.

Nome de usuário do PostgreSQL

O nome de usuário do conector.

Senha do PostgreSQL

A senha do conector.

Nome da publicação

O nome da publicação que você criou anteriormente.

Replication Slot Name

Optional. When no value is provided, the connector will create a new, uniquely-named slot. When given a value, the connector will use the existing slot, or create a new one with the provided name.

Changing the value for a running connector will restart reading the incremental change data capture (CDC) stream from the updated slot’s position.

Contexto dos parâmetros de destino do PostgreSQL

Parâmetro

Descrição

Obrigatório

Banco de dados de destino

O banco de dados onde os dados serão persistidos. Ele já deve existir no Snowflake. O nome diferencia maiúsculas de minúsculas. Para identificadores sem aspas, forneça o nome em maiúsculas.

Sim

Estratégia de autenticação Snowflake

Ao utilizar:

  • Snowflake Openflow Deployment or BYOC: Use SNOWFLAKE_SESSION_TOKEN. This token is managed automatically by Snowflake. BYOC deployments must have previously configured runtime roles to use SNOWFLAKE_SESSION_TOKEN.

  • BYOC: Alternatively BYOC can use KEY_PAIR as the value for authentication strategy.

Sim

Identificador de conta Snowflake

Ao utilizar:

  • Session Token Authentication Strategy: deve ficar em branco.

  • KEY_PAIR: nome da conta Snowflake formatado como [nome-da-organização]-[nome-da-conta], onde os dados serão persistentes.

Sim

Chave privada Snowflake

Ao utilizar:

  • Session Token Authentication Strategy: deve ficar em branco.

  • KEY_PAIR: deve ser a chave privada RSA utilizada para a autenticação.

    A chave RSA deve ser formatada de acordo com os padrões PKCS8 e têm os cabeçalhos e rodapés PEM padrão. Observe que é necessário definir o arquivo de chave privada do Snowflake ou a chave privada do Snowflake.

Não

Arquivo de chave privada Snowflake

Ao utilizar:

  • Estratégia de autenticação de token de sessão: o arquivo de chave privada deve estar em branco.

  • KEY_PAIR: carregue o arquivo que contém a chave privada RSA usada para autenticação no Snowflake, formatada de acordo com as normas PKCS8 e incluindo cabeçalhos e rodapés PEM padrão. A linha do cabeçalho começa com -----BEGIN PRIVATE. Para carregar o arquivo de chave privada, marque a caixa de seleção Reference asset.

Não

Senha de chave privada Snowflake

Ao usar

  • Session Token Authentication Strategy: deve ficar em branco.

  • KEY_PAIR: forneça a senha associada ao arquivo de chave privada do Snowflake.

Não

Função Snowflake

Ao usar

  • Estratégia de autenticação de tokens de sessão: use sua função de tempo de execução. Você pode encontrar sua função de tempo de execução na UI do Openflow, navegando para View Details no seu tempo de execução.

  • Estratégia de autenticação de KEY_PAIR: use uma função válida configurada para o usuário do seu serviço.

Sim

Nome de usuário do Snowflake

Ao usar

  • Session Token Authentication Strategy: deve ficar em branco.

  • KEY_PAIR: forneça o nome de usuário usado para se conectar à instância do Snowflake.

Sim

Warehouse Snowflake

Warehouse Snowflake usado para executar consultas.

Sim

Contexto dos parâmetros de ingestão do PostgreSQL

Parâmetro

Descrição

Nomes de tabela inclusos

Uma lista separada por vírgulas de caminhos de tabela, incluindo os esquemas. Exemplo: public.my_table, other_schema.other_table.

Selecione tabelas por nome ou por Regex. Se você usar ambas, todas as tabelas correspondentes de qualquer uma das opções serão incluídas.

As tabelas que são subpartições são sempre excluídas da ingestão. Consulte Replicação de uma tabela particionada para obter mais informações.

Regex de tabela inclusa

Uma expressão regular para comparar com os caminhos da tabela. Todos os caminhos que corresponderem à expressão serão replicados, e as novas tabelas que corresponderem ao padrão e forem criadas posteriormente também serão incluídas automaticamente. Exemplo: public\.auto_.*

Selecione tabelas por nome ou por Regex. Se você usar ambas, todas as tabelas correspondentes de qualquer uma das opções serão incluídas.

As tabelas que são subpartições são sempre excluídas da ingestão. Consulte Replicação de uma tabela particionada para obter mais informações.

Filtro de coluna JSON

Opcional. Um JSON contendo uma lista de nomes de tabelas totalmente qualificados e um padrão regex para nomes de colunas que devem ser incluídos na replicação. Exemplo: [ {"schema":"public", "table":"table1", "includedPattern":".*name"} ] incluirá todas as colunas que terminam com name em table1 do esquema public.

CRON do cronograma de tarefas de fusão

A expressão CRON que define os períodos em que as operações de fusão do diário para a Tabela de destino serão acionadas. Defina-o como * * * * * ? se quiser ter uma fusão contínua ou um cronograma de tempo para limitar o tempo de execução do warehouse.

Por exemplo:

  • A cadeia de caracteres * 0 * * * ? indica que você deseja agendar fusões de hora em hora por um minuto

  • A cadeia de caracteres * 20 14 ? * MON-FRI indica que você deseja agendar fusões às 2:20 PM de segunda a sexta-feira.

Para obter mais informações e exemplos, consulte o tutorial de acionadores do cron na documentação do Quartz

Resolução do identificador do objeto

Specifies how source object identifiers such as the names of schemas, tables, and columns are stored and queried in Snowflake. This setting specifies that you must use double quotes in SQL queries.

Opção 1: Padrão, com distinção entre maiúsculas e minúsculas. Para compatibilidade reversa.

  • Transformação: As letras maiúsculas/minúsculas são preservadas. Por exemplo, My_Table continua sendo My_Table.

  • Consultas: consultas SQL devem usar aspas duplas para corresponder exatamente ao uso de maiúsculas/minúsculas dos objetos de banco de dados. Por exemplo, SELECT * FROM "My_Table";.

Nota

Snowflake recommends using this option if you must preserve source casing for legacy or compatibility reasons. For example, if the source database includes table names that differ in case only–such as MY_TABLE and my_table–that would result in a name collision when using when using case-insensitive comparisons.

Opção 2: Recomendado, sem distinção entre maiúsculas e minúsculas

  • Transformação: Todos os identificadores são convertidos em maiúsculas. Por exemplo, My_Table torna-se MY_TABLE.

  • Queries: SQL queries are case-insensitive and don’t require SQL double quotes. For example, SELECT * FROM my_table; returns the same results as SELECT * FROM MY_TABLE;.

Nota

A Snowflake recomenda usar esta opção se não for esperado que os objetos de banco de dados tenham nomes que misturem letras maiúsculas e minúsculas.

Importante

Do not change this setting after the connector has begun ingesting data. Changing this setting after ingestion has begun breaks the existing ingestion. If you must change this setting, create a new connector instance.

Replicação de tabelas de um servidor de replicação PostgreSQL

O conector pode ingerir dados de um servidor primário, uma réplica de espera ativa ou servidor de assinante usando replicação lógica. Antes de configurar o conector para se conectar a uma réplica do PostgreSQL, certifique-se de que a replicação entre nós primários e de réplica funcione corretamente. Ao investigar problemas de dados ausentes no conector, primeiro certifique-se de que as linhas ausentes estejam presentes no servidor de réplica usado pelo conector.

Considerações adicionais ao conectar-se a uma réplica em espera:

  • Somente a conexão a réplicas em espera ativa é compatível. Observe que as réplicas em espera dinâmica não podem aceitar conexões de clientes até serem promovidas a uma instância primária.

  • A versão do PostgreSQL do servidor deve ser >= 16.

  • A publicação necessária para o conector deve ser criada no servidor primário, não no servidor em espera. O servidor em espera é somente leitura e não permite criar publicação.

Se você se conectar a uma instância de espera ativa e consulte o erro Tentativa de criar o slot de replicação “<replication slot> esgotou o tempo limite. Se estiver conectando a uma instância em espera, certifique-se de que haja algum tráfego na instância PostgreSQL primária, caso contrário a chamada para criar um slot de replicação nunca retornará. no código do Openflow ou o processador Read PostgreSQL CDC Stream não está iniciando, faça login na instância PostgreSQL primária e execute a seguinte consulta:

SELECT pg_log_standby_snapshot();
Copy

O erro ocorre quando não há alterações de dados no servidor primário. Como tal, o conector pode travar enquanto cria um slot de replicação no servidor de réplica. Isso resulta no servidor de réplica que exige informações sobre a execução de transações do servidor primário para poder criar um slot de replicação. Os servidores primários não enviarão as informações enquanto estiverem ociosos. A função pg_log_standby_snapshot() força o servidor primário a enviar informações sobre transações em execução para o servidor de réplica.

Remova e adicione novamente uma tabela à replicação

Para remover uma tabela da replicação, certifique-se de que ela seja removida dos parâmetros Nomes de tabela inclusos ou Regex de tabela inclusa no contexto Parâmetros de replicação.

Se quiser adicionar novamente a tabela à replicação mais tarde, primeiro exclua a tabela de destino correspondente no Snowflake. Após isso, adicione a tabela novamente aos parâmetros Nomes de tabela inclusos ou Regex de tabela inclusa. Isso garante que o processo de replicação comece do zero para a tabela.

Essa abordagem também pode ser usada para se recuperar de um cenário de replicação de tabela com falha.

Replique um subconjunto de colunas em uma tabela

O conector pode filtrar os dados replicados por tabela para um subconjunto de colunas configuradas.

Para aplicar filtros a colunas, modifique a propriedade Filtro de coluna no contexto Parâmetros de replicação, adicionando uma matriz de configurações – uma entrada para cada tabela à qual deseja aplicar um filtro.

As colunas podem ser incluídas ou excluídas por nome ou padrão. Você pode aplicar uma única condição por tabela ou combinar várias condições, com as exclusões sempre tendo precedência sobre as inclusões.

O exemplo a seguir mostra os campos disponíveis. schema e table são obrigatórios e, em seguida, um ou mais dos campos included, excluded, includedPattern, excludedPattern são obrigatórios.

[
    {
        "schema": "<source table schema>",
        "table" : "<source table name>",
        "included": ["<column name>", "<column name>"],
        "excluded": ["<column name>", "<column name>"],
        "includedPattern": "<regular expression>",
        "excludedPattern": "<regular expression>",
    }
]
Copy

Replicação de uma tabela particionada

O conector oferece suporte à replicação de tabelas particionadas para servidores PostgreSQL da versão 15 ou superior. Uma tabela particionada PostgreSQL será replicada no Snowflake como uma única tabela de destino.

Por exemplo, se você tiver uma tabela particionada orders, com subpartições orders_2023, orders_2024 e configurou o conector para ingerir todas as tabelas padrão orders.* correspondentes, então somente a tabela orders será replicada para o Snowflake e incluirá dados de todas as subpartições.

Para oferecer suporte à replicação de tabelas particionadas, certifique-se de que a publicação criada no PostgreSQL tenha a opção publish_via_partition_root definida como true.

No momento, a ingestão de tabelas particionadas tem as seguintes limitações:

  • Quando uma tabela é anexada como uma partição a uma tabela particionada após o início da ingestão, o conector não buscará os dados que existiam na tabela de partição antes de ela ser anexada.

  • Quando uma tabela de subpartição é desanexada da tabela particionada após o início da ingestão, o conector não marcará os dados dessa subpartição como excluídos na tabela de partição raiz.

  • A operação de truncamento em subpartições não marcará os registros afetados como excluídos.

Monitore as alterações de dados em tabelas

O conector replica não apenas o estado atual dos dados das tabelas de origem, mas também cada estado de cada linha de cada conjunto de alterações. Esses dados são armazenados em tabelas de diário criadas no mesmo esquema da tabela de destino.

Os nomes das tabelas de diário são formatados como: é um número inteiro que aumenta a cada alteração de esquema na tabela de origem. Como resultado, as tabelas de origem que passam por alterações de esquema terão vários tabelas de diário.

Quando uma tabela é removida da replicação e depois adicionada novamente, o valor de <carimbo de data/hora> será alterado e a <geração de esquema> começará novamente a partir de 1.

Importante

O Snowflake recomenda que você não altere a estrutura das tabelas de diário de forma alguma. Elas são usadas pelo conector para atualizar a tabela de destino como parte do processo de replicação.

O conector nunca descarta tabelas de diário, mas faz uso do diário mais recente para cada tabela de origem replicada, lendo apenas fluxos apenas para anexação sobre os diários. Para recuperar o armazenamento, você pode:

  • Truncar todas as tabelas de diário a qualquer momento.

  • Descartar as tabelas de diário relacionadas às tabelas de origem que foram removidas da replicação.

  • Descartar todas as tabelas de diário, exceto as de última geração, para tabelas replicadas ativamente.

Por exemplo, se o conector estiver definido para replicar ativamente a tabela de origem orders e você tiver removido anteriormente a tabela customers da replicação, poderá ter as seguintes tabelas de diário. Nesse caso, você pode descartar todas elas exceto orders_5678_2.

customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2

Configure o agendamento de tarefas de fusão

O conector usa um warehouse para mesclar informações de captura de dados de alteração (CDC) nas tabelas de destino. Essa operação é acionada pelo processador MergeSnowflakeJournalTable. Se não houver novas alterações ou se não houver novos FlowFiles aguardando na fila MergeSnowflakeJournalTable, nenhuma fusão será acionada e o warehouse será suspenso automaticamente.

Para limitar o custo do warehouse e limitar as mesclagens apenas ao horário programado, use a expressão CRON no parâmetro CRON Cronograma da tarefa de mesclagem. Ele controla o fluxo dos FlowFiles que chegam ao processador MergeSnowflakeJournalTable e as mesclagens são acionadas somente em um período de tempo específico. Para obter mais informações sobre agendamento, consulte Estratégia de agendamento.

Pare ou exclua o conector

Ao interromper ou remover o conector, você deve considerar o slot de replicação que o conector usa.

O conector cria seu próprio slot de replicação com um nome que começa com snowflake_connector_ seguido de um sufixo aleatório. À medida que o conector lê o fluxo de replicação, ele avança o slot, de modo que o PostgreSQL possa cortar o log WAL e liberar espaço em disco.

Quando o conector é pausado, o slot não é avançado e as alterações no banco de dados de origem continuam aumentando o tamanho do registro WAL. Você não deve manter o conector em pausa por longos períodos de tempo, especialmente em bancos de dados de alto tráfego.

Quando o conector é removido, seja excluindo-o da tela do Openflow ou por qualquer outro meio, como a exclusão de toda a instância do Openflow, o slot de replicação permanece no lugar e deve ser removido manualmente.

Se você tiver várias instâncias do conector replicando do mesmo banco de dados PostgreSQL, cada instância criará seu próprio slot de replicação com nome exclusivo. Ao descartar um slot de replicação manualmente, certifique-se de que seja o correto. Você pode ver qual slot de replicação é usado por uma determinada instância do conector verificando o estado do processador CaptureChangePostgreSQL.

Execute o fluxo

  1. Clique com o botão direito do mouse no plano e selecione Enable all Controller Services.

  2. Clique com o botão direito do mouse no grupo de processos importado e selecione Start. O conector inicia a ingestão de dados.