Set up the Openflow Connector for PostgreSQL¶
Note
The connector is subject to the Connector Terms.
This topic describes the steps to set up the Openflow Connector for PostgreSQL.
Prerequisites¶
Ensure that you have reviewed About Openflow Connector for PostgreSQL.
Ensure that you have reviewed the supported PostgreSQL versions.
Ensure that you have set up Openflow.
As a database administrator, perform the following tasks:
Ensure that there is enough disk space on your PostgreSQL server for the WAL. This is because once created, a replication slot causes PostgreSQL to retain the WAL data from the position held by the replication slot, until the connector confirms and advances that position.
Ensure that every table enabled for replication has a primary key. The key can be a single column or composite.
Set the
REPLICA IDENTITY <https://www.postgresql.org/docs/current/sql-altertable.html#SQL-ALTERTABLE-REPLICA-IDENTITY>
of tables toDEFAULT
. This ensures that the primary keys are represented in the WAL, and the connector can read them.Create a user for the connector. The connector requires a user with the
REPLICATION
attribute and permissions to SELECT from every replicated table. Create that user with a password to enter into the connector’s configuration. For more information on replication security, see Security.
Configure wal_level¶
Openflow Connector for PostgreSQL requires wal_level to be set to logical
.
Depending on where your PostgreSQL server is hosted, you can configure the wal_level as follows:
On premise |
Execute following query with superuser or user with
|
RDS |
User used by the agent needs to have the You also need to set:
|
AWS Aurora |
Set the |
GCP |
Set the following flags:
|
Azure |
Set the replication support to |
Create a publication¶
Openflow Connector for PostgreSQL requires a publication to be created and configured in PostgreSQL before replication starts. You can create it for all, or a subset of tables, as well as for specific tables with specified columns only. Make sure that every table and column that you plan to have replicated is included in the publication. You can also modify the publication later, while the connector is running. To create and configure a publication, do the following:
Log in as a user with the
CREATE
privilege in the database and execute following query:
CREATE PUBLICATION <publication name>;
Define tables that the database agent will be able to see using:
ALTER PUBLICATION <publication name> ADD TABLE <table name>;
Important
For Postgres v15 and later
In case of publications created for subset of table’s columns, add tables for replication using the ADD_TABLE_WITH_COLUMNS procedure, specifying exactly the same set of columns.
If ADD_TABLES
is used, the connector will work, but the following might occur:
In the destination database, columns that are not included in filter will be suffixed with
_DELETED
. All data replicated during snapshot phase will still be there.In the case of adding more columns to the publication, table will result in
Permanently Failed
state, requiring restarting the replication.
For more information see ALTER PUBLICATION.
As a Snowflake account administrator, perform the following tasks:
Create a Snowflake user with the type as SERVICE. Create a database to store the replicated data, and set up privileges for the Snowflake user to create objects in that database by granting the USAGE and CREATE SCHEMA privileges.
CREATE DATABASE <destination_database>; CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow'; CREATE ROLE <openflow_role>; GRANT ROLE <openflow_role> TO USER <openflow_user>; GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>; GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>; CREATE WAREHOUSE <openflow_warehouse> WITH WAREHOUSE_SIZE = 'MEDIUM' AUTO_SUSPEND = 300 AUTO_RESUME = TRUE; GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
Create a pair of secure keys (public and private). Store the private key for the user in a file to supply to the connector’s configuration. Assign the public key to the Snowflake service user:
ALTER USER openflow_user SET RSA_PUBLIC_KEY = 'thekey';
For more information, see pair of keys.
Designate a warehouse for the connector to use. Start with the smallest warehouse size, then experiment with size depending on the amount of tables being replicated, and the amount of data transferred. Large table numbers typically scale better with multi-cluster warehouses, rather than the warehouse size.
Import the connector definition into Openflow¶
Download the
connector definition file
.Import the connector definition into Openflow:
Open the Snowflake Openflow canvas.
Add a process group. To do this, drag and drop the Process Group icon from the tool palette at the top of the page onto the canvas. Once you release your pointer, a Create Process Group dialog appears.
In the Create Process Group dialog , select the connector definition file to import.
Configure the connector¶
You can configure the connector for the following use cases:
Replicate a set of tables in real-time¶
Right-click on the imported process group and select Parameters.
Populate the required parameter values as described in Flow parameters.
Flow parameters¶
Start with setting the parameters of the Source Database Parameters context, then the Snowflake Parameters context. Once this is done, you can enable the connector, and it should connect both to PostgreSQL and Snowflake and start running. Howvever, it will not replicate any data until any tables are explicitly added to its configuration.
To configure specific tables for replication, edit the Replication Parameters context. Shortly after you apply the changes to the Replication Parameters context, the configuration will be picked up by the connector, and the replication lifecycle will start for every table.
Source Database Parameters context¶
Parameter |
Description |
---|---|
Postgres Connection URL |
The full JDBC URL to the source database. Example: |
Postgres JDBC Driver |
The absolute path to the PostgreSQL JDBC driver jar. Example: |
Postgres SSL Mode |
Enable or disable SSL connections. |
Postgres Root SSL Certificate |
The full contents of the root certificate for the database. Optional if SSL is disabled. |
Postgres Username |
The username for the connector. |
Postgres Password |
The password for the connector. |
Publication Name |
The name of the publication you created earlier. |
Snowflake Parameters context¶
Parameter |
Description |
---|---|
Snowflake Account |
Name of the Snowflake account to replicate into. |
Snowflake Database |
The name of the destination database to replicate into. Any unquoted name will be automatically converted to uppercase, whereas enclosing the name in double quotes (“”) preserves its exact case. |
Snowflake Private Key |
The RSA private key used for authentication. The RSA key must be formatted according to PKCS8 standards and have standard PEM headers and footers. Note that either Snowflake Private Key File or Snowflake Private Key must be defined. |
Snowflake Private Key File |
The file that contains the RSA Private Key used for authentication to Snowflake, formatted according to PKCS8 standards and having standard PEM headers and footers. The header line starts with |
Snowflake Private Key Password |
The password associated with the Snowflake Private Key File. |
Snowflake User Role |
The Snowflake account role for the connector to use. |
Snowflake Username |
The name of the Snowflake user for the connector. |
Snowflake Warehouse |
The name of the warehouse to use by the connector. |
Replication Parameters context¶
Parameter |
Description |
---|---|
Included Table Names |
A comma-separated list of table paths, including their schemas. Example: |
Included Table Regex |
A regular expression to match against table paths. Every path matching the expression will be replicated, and new tables matching the pattern that get created later will also be included automatically. Example: |
Remove and re-add a table to replication¶
To remove a table from replication, ensure that it is removed from the Included Table Names
or Included Table Regex
parameters in the Replication Parameters context.
If you want to re-add the table to replication later, first delete the corresponding destination table in Snowflake.
Afterward, add the table back to the Included Table Names
or Included Table Regex
parameters.
This ensures that the replication process starts fresh for the table.
This approach can also be used to recover from a failed table replication scenario.
Replicate a subset of columns in a table¶
The connector can filter the data replicated per table to a subset of configured columns.
To apply filters to columns, modify the Column Filter property in the Replication Parameters context, adding an array of configurations, one entry for every table you wish to apply a filter to.
Columns can be included or excluded by name or pattern. You can apply a single condition per table, or combine multiple conditions, with exclusions always taking precedence over inclusions.
The following example shows the fields that are available. schema
and table
are mandatory, and then one or
more of included
, excluded
, includedPattern
, excludedPattern
is required.
[
{
"schema": "<source table schema>",
"table" : "<source table name>",
"included": ["<column name>", "<column name>"],
"excluded": ["<column name>", "<column name>"],
"includedPattern": "<regular expression>",
"excludedPattern": "<regular expression>",
}
]
Track data changes in tables¶
The connector replicates not only the current state of data from the source tables, but also every state of every row from every changeset. This data is stored in journal tables created in the same schema as the destination table.
The journal table names are formatted as: <source table name>_JOURNAL_<timestamp>_<schema generation>
where <timestamp>
is the value of epoch seconds when the source table was added to replication, and <schema generation>
is an integer increasing with every schema change on the source table.
This means a source table that undergoes schema changes will have multiple journal tables.
When a table is removed from replication, then added back, the <timestamp>
value will change, and <schema generation>
will start again from 1
.
Important
Snowflake recommends that you do not alter the journal tables or the data in them, in any way. They are used by the connector to update the destination table as part of the replication process.
The connector never drops journal tables, but it only makes active use of the latest journal for every replicated source table. If you wish to reclaim the storage, you can safely drop the journal tables related to source tables that were removed from replication, and all but the latest generation ones for actively replicated tables.
For example, if your connector is set to actively replicate source table orders
, and you have earlier removed table customers
from replication, you may have the following journal tables. In this case you can drop all of them except orders_5678_2
.
customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2
Configure scheduling of merge tasks¶
The connector uses a warehouse to merge change data capture (CDC) data into destination tables. This operation is triggered by the MergeSnowflakeJournalTable processor. If there are no new changes or if no new flow files are waiting in the MergeSnowflakeJournalTable queue, no merge is triggered and the warehouse auto-suspends.
To limit the warehouse cost and limit merges to only scheduled time, use the CRON expression in the Merge task Schedule CRON parameter. It throttles the flow files coming to the MergeSnowflakeJournalTable processor and merges are triggered only in a dedicated period of time. For more information about scheduling, see Scheduling strategy.
Stop or delete the connector¶
When stopping or removing the connector, you have to consider the replication slot that the connector uses.
The connector creates its own replication slot with a name starting with
snowflake_connector_
followed by a random suffix. As the connector reads the replication stream,
it advances the slot, so that PostgreSQL can trim its WAL log and free up disk space.
When the connector is paused, the slot is not advanced, and changes to the source database keep increasing the WAL log size. You should not keep the connector paused for extended periods of time, especially on high-traffic databases.
When the connector is removed, whether by deleting it from the Openflow canvas, or any other means, such as deleting the whole Openflow instance, the replication slot remains in place, and must be dropped manually.
If you have multiple connector instances replicating from the same PostgreSQL database,
each instance will create its own uniquely-named replication slot. When dropping a replication slot manually, make sure
it’s the right one. You can see which replication slot is used by a given connector instance by checking the state of the CaptureChangePostgreSQL
processor.
Run the flow¶
Right-click on the plane and select Enable all Controller Services.
Right-click on the imported process group and select Start. The connector starts the data ingestion.