Set up the Openflow Connector for PostgreSQL

Note

The connector is subject to the Connector Terms.

This topic describes the steps to set up the Openflow Connector for PostgreSQL.

Prerequisites

  1. Ensure that you have reviewed About Openflow Connector for PostgreSQL.

  2. Ensure that you have reviewed the supported PostgreSQL versions.

  3. Ensure that you have set up Openflow.

  4. As a database administrator, perform the following tasks:

    1. Configure wal_level

    2. Create a publication

    3. Ensure that there is enough disk space on your PostgreSQL server for the WAL. This is because once created, a replication slot causes PostgreSQL to retain the WAL data from the position held by the replication slot, until the connector confirms and advances that position.

    4. Ensure that every table enabled for replication has a primary key. The key can be a single column or composite.

    5. Set the REPLICA IDENTITY <https://www.postgresql.org/docs/current/sql-altertable.html#SQL-ALTERTABLE-REPLICA-IDENTITY> of tables to DEFAULT. This ensures that the primary keys are represented in the WAL, and the connector can read them.

    6. Create a user for the connector. The connector requires a user with the REPLICATION attribute and permissions to SELECT from every replicated table. Create that user with a password to enter into the connector’s configuration. For more information on replication security, see Security.

Configure wal_level

Openflow Connector for PostgreSQL requires wal_level to be set to logical.

Depending on where your PostgreSQL server is hosted, you can configure the wal_level as follows:

On premise

Execute following query with superuser or user with ALTER SYSTEM privilege:

ALTER SYSTEM SET wal_level = logical;
Copy

RDS

User used by the agent needs to have the rds_superuser or rds_replication roles assigned.

You also need to set:

  • rds.logical_replication static parameter to 1.

  • max_replication_slots, max_connections and max_wal_senders parameters according to your database and replication setup.

AWS Aurora

Set the rds.logical_replication static parameter to 1.

GCP

Set the following flags:

  • cloudsql.logical_decoding=on.

  • cloudsql.enable_pglogical=on.

For more information, see Google Cloud documentation.

Azure

Set the replication support to Logical. For more information, see Azure documentation.

Create a publication

Openflow Connector for PostgreSQL requires a publication to be created and configured in PostgreSQL before replication starts. You can create it for all, or a subset of tables, as well as for specific tables with specified columns only. Make sure that every table and column that you plan to have replicated is included in the publication. You can also modify the publication later, while the connector is running. To create and configure a publication, do the following:

  1. Log in as a user with the CREATE privilege in the database and execute following query:

CREATE PUBLICATION <publication name>;
Copy
  1. Define tables that the database agent will be able to see using:

ALTER PUBLICATION <publication name> ADD TABLE <table name>;
Copy

Important

For Postgres v15 and later

In case of publications created for subset of table’s columns, add tables for replication using the ADD_TABLE_WITH_COLUMNS procedure, specifying exactly the same set of columns.

If ADD_TABLES is used, the connector will work, but the following might occur:

  • In the destination database, columns that are not included in filter will be suffixed with _DELETED. All data replicated during snapshot phase will still be there.

  • In the case of adding more columns to the publication, table will result in Permanently Failed state, requiring restarting the replication.

For more information see ALTER PUBLICATION.

  1. As a Snowflake account administrator, perform the following tasks:

    1. Create a Snowflake user with the type as SERVICE. Create a database to store the replicated data, and set up privileges for the Snowflake user to create objects in that database by granting the USAGE and CREATE SCHEMA privileges.

      CREATE DATABASE <destination_database>;
      CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow';
      CREATE ROLE <openflow_role>;
      GRANT ROLE <openflow_role> TO USER <openflow_user>;
      GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>;
      GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
      CREATE WAREHOUSE <openflow_warehouse>
           WITH
                WAREHOUSE_SIZE = 'MEDIUM'
                AUTO_SUSPEND = 300
                AUTO_RESUME = TRUE;
      GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
      
      Copy
    2. Create a pair of secure keys (public and private). Store the private key for the user in a file to supply to the connector’s configuration. Assign the public key to the Snowflake service user:

      ALTER USER openflow_user SET RSA_PUBLIC_KEY = 'thekey';
      
      Copy

      For more information, see pair of keys.

    3. Designate a warehouse for the connector to use. Start with the smallest warehouse size, then experiment with size depending on the amount of tables being replicated, and the amount of data transferred. Large table numbers typically scale better with multi-cluster warehouses, rather than the warehouse size.

Import the connector definition into Openflow

  1. Download the connector definition file.

  2. Import the connector definition into Openflow:

    1. Open the Snowflake Openflow canvas.

    2. Add a process group. To do this, drag and drop the Process Group icon from the tool palette at the top of the page onto the canvas. Once you release your pointer, a Create Process Group dialog appears.

    3. In the Create Process Group dialog , select the connector definition file to import.

Configure the connector

You can configure the connector for the following use cases:

Replicate a set of tables in real-time

  1. Right-click on the imported process group and select Parameters.

  2. Populate the required parameter values as described in Flow parameters.

Flow parameters

Start with setting the parameters of the Source Database Parameters context, then the Snowflake Parameters context. Once this is done, you can enable the connector, and it should connect both to PostgreSQL and Snowflake and start running. Howvever, it will not replicate any data until any tables are explicitly added to its configuration.

To configure specific tables for replication, edit the Replication Parameters context. Shortly after you apply the changes to the Replication Parameters context, the configuration will be picked up by the connector, and the replication lifecycle will start for every table.

Source Database Parameters context

Parameter

Description

Postgres Connection URL

The full JDBC URL to the source database. Example: jdbc:postgresql://example.com:5432/public

Postgres JDBC Driver

The absolute path to the PostgreSQL JDBC driver jar. Example: /opt/resources/drivers/postgresql-42.7.5.jar

Postgres SSL Mode

Enable or disable SSL connections.

Postgres Root SSL Certificate

The full contents of the root certificate for the database. Optional if SSL is disabled.

Postgres Username

The username for the connector.

Postgres Password

The password for the connector.

Publication Name

The name of the publication you created earlier.

Snowflake Parameters context

Parameter

Description

Snowflake Account

Name of the Snowflake account to replicate into.

Snowflake Database

The name of the destination database to replicate into. Any unquoted name will be automatically converted to uppercase, whereas enclosing the name in double quotes (“”) preserves its exact case.

Snowflake Private Key

The RSA private key used for authentication. The RSA key must be formatted according to PKCS8 standards and have standard PEM headers and footers. Note that either Snowflake Private Key File or Snowflake Private Key must be defined.

Snowflake Private Key File

The file that contains the RSA Private Key used for authentication to Snowflake, formatted according to PKCS8 standards and having standard PEM headers and footers. The header line starts with -----BEGIN PRIVATE.

Snowflake Private Key Password

The password associated with the Snowflake Private Key File.

Snowflake User Role

The Snowflake account role for the connector to use.

Snowflake Username

The name of the Snowflake user for the connector.

Snowflake Warehouse

The name of the warehouse to use by the connector.

Replication Parameters context

Parameter

Description

Included Table Names

A comma-separated list of table paths, including their schemas. Example: public.my_table, other_schema.other_table

Included Table Regex

A regular expression to match against table paths. Every path matching the expression will be replicated, and new tables matching the pattern that get created later will also be included automatically. Example: public\.auto_.*

Remove and re-add a table to replication

To remove a table from replication, ensure that it is removed from the Included Table Names or Included Table Regex parameters in the Replication Parameters context.

If you want to re-add the table to replication later, first delete the corresponding destination table in Snowflake. Afterward, add the table back to the Included Table Names or Included Table Regex parameters. This ensures that the replication process starts fresh for the table.

This approach can also be used to recover from a failed table replication scenario.

Replicate a subset of columns in a table

The connector can filter the data replicated per table to a subset of configured columns.

To apply filters to columns, modify the Column Filter property in the Replication Parameters context, adding an array of configurations, one entry for every table you wish to apply a filter to.

Columns can be included or excluded by name or pattern. You can apply a single condition per table, or combine multiple conditions, with exclusions always taking precedence over inclusions.

The following example shows the fields that are available. schema and table are mandatory, and then one or more of included, excluded, includedPattern, excludedPattern is required.

[
    {
        "schema": "<source table schema>",
        "table" : "<source table name>",
        "included": ["<column name>", "<column name>"],
        "excluded": ["<column name>", "<column name>"],
        "includedPattern": "<regular expression>",
        "excludedPattern": "<regular expression>",
    }
]
Copy

Track data changes in tables

The connector replicates not only the current state of data from the source tables, but also every state of every row from every changeset. This data is stored in journal tables created in the same schema as the destination table.

The journal table names are formatted as: <source table name>_JOURNAL_<timestamp>_<schema generation>

where <timestamp> is the value of epoch seconds when the source table was added to replication, and <schema generation> is an integer increasing with every schema change on the source table. This means a source table that undergoes schema changes will have multiple journal tables.

When a table is removed from replication, then added back, the <timestamp> value will change, and <schema generation> will start again from 1.

Important

Snowflake recommends that you do not alter the journal tables or the data in them, in any way. They are used by the connector to update the destination table as part of the replication process.

The connector never drops journal tables, but it only makes active use of the latest journal for every replicated source table. If you wish to reclaim the storage, you can safely drop the journal tables related to source tables that were removed from replication, and all but the latest generation ones for actively replicated tables.

For example, if your connector is set to actively replicate source table orders, and you have earlier removed table customers from replication, you may have the following journal tables. In this case you can drop all of them except orders_5678_2.

customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2

Configure scheduling of merge tasks

The connector uses a warehouse to merge change data capture (CDC) data into destination tables. This operation is triggered by the MergeSnowflakeJournalTable processor. If there are no new changes or if no new flow files are waiting in the MergeSnowflakeJournalTable queue, no merge is triggered and the warehouse auto-suspends.

To limit the warehouse cost and limit merges to only scheduled time, use the CRON expression in the Merge task Schedule CRON parameter. It throttles the flow files coming to the MergeSnowflakeJournalTable processor and merges are triggered only in a dedicated period of time. For more information about scheduling, see Scheduling strategy.

Stop or delete the connector

When stopping or removing the connector, you have to consider the replication slot that the connector uses.

The connector creates its own replication slot with a name starting with snowflake_connector_ followed by a random suffix. As the connector reads the replication stream, it advances the slot, so that PostgreSQL can trim its WAL log and free up disk space.

When the connector is paused, the slot is not advanced, and changes to the source database keep increasing the WAL log size. You should not keep the connector paused for extended periods of time, especially on high-traffic databases.

When the connector is removed, whether by deleting it from the Openflow canvas, or any other means, such as deleting the whole Openflow instance, the replication slot remains in place, and must be dropped manually.

If you have multiple connector instances replicating from the same PostgreSQL database, each instance will create its own uniquely-named replication slot. When dropping a replication slot manually, make sure it’s the right one. You can see which replication slot is used by a given connector instance by checking the state of the CaptureChangePostgreSQL processor.

Run the flow

  1. Right-click on the plane and select Enable all Controller Services.

  2. Right-click on the imported process group and select Start. The connector starts the data ingestion.