Openflow Connector for PostgreSQL Maintenance

Note

This connector is subject to the Snowflake Connector Terms.

This topic describes important maintenance considerations and best practices for maintaining the Openflow Connector for PostgreSQL when making changes to the source PostgreSQL database. In addition, this topic describes how to restart table replication and reinstall the connector.

Check the replication status of a table

Interim failures, such as connection errors, do not prevent table replication. However, permanent failures, such as unsupported data types, prevent table replication.

To troubleshoot replication issues or verify that a table has been successfully removed from the replication flow, check the Table State Store:

  1. In the Openflow runtime canvas, right-click a processor group and choose Controller Services. A table listing controller services displays.
  2. Locate the row labeled Table State Store, click the More Three vertical dots indicating more options button on the right side of the row, and then choose View State.

A list of tables and their current states displays. Type in the search box to filter the list by table name. The possible states are:

  • NEW: The table is scheduled for replication but replication hasn’t started.
  • SNAPSHOT_REPLICATION: The connector is copying existing data. This status displays until all records are stored in the destination table.
  • INCREMENTAL_REPLICATION: The connector is actively replicating changes. This status displays after snapshot replication ends and continues to display indefinitely until a table is either removed from replication or replication fails.
  • FAILED: Replication has permanently stopped due to an error.

Note

The Openflow runtime canvas doesn’t display table status changes — only the current table status. However, table status changes are recorded in logs when they occur. Look for the following log message:

Replication state for table <database_name>.<schema_name>.<table_name> changed from <old_state> to <new_state>

If a permanent failure prevents table replication, remove the table from replication. After you address the problem that caused the failure, you can add the table back to replication. For more information, see Restart table replication.

Restart table replication

A table in FAILED state — for example, due to a missing primary key or unsupported schema change — does not restart automatically. If a table enters a FAILED state or you need to restart replication from scratch, use the following procedure to remove and re-add the table to replication.

Note

If the failure was caused by an issue in the source table such as a missing primary key, resolve that issue in the source database before continuing.

  1. Remove the table from replication, using one of the following methods:

    • Add the table to the Re-snapshot Table Exclusions parameter to temporarily exclude it from replication. This is convenient when the table is matched by an Included Table Regex that you don’t want to change.
    • In the Ingestion Parameters context, either remove the table from Included Table Names or modify the Included Table Regex so the table is no longer matched.
  2. Verify the table has been removed:

    1. In the Openflow runtime canvas, right-click a processor group and choose Controller Services.
    2. In the table listing controller services, locate the Table State Store row, click the three vertical dots on the right side of the row, then choose View State.

    Important

    You must wait until the table’s state is fully removed from this list before proceeding. Do not continue until this configuration change has completed.

  3. Clean up the destination: Once the table’s state shows as fully removed, manually DROP the destination table in Snowflake. Note that the connector will not overwrite an existing destination table during the snapshot phase; if the table still exists, replication will fail again. Optionally, the journal table and stream can also be removed if they are no longer needed.

  4. Re-add the table by reversing the change you made in the first step: either remove the table from Re-snapshot Table Exclusions, or add it back to Included Table Names or Included Table Regex. The connector then re-snapshots the table.

  5. Verify the restart: Check the Table State Store using the instructions given previously. The state of the table should appear with the status NEW, then transition to SNAPSHOT_REPLICATION, and finally INCREMENTAL_REPLICATION.

Increase the oversized value limit

By default, the connector replicates individual values up to 16 MB and marks any table that contains a larger value as permanently failed. If your Snowflake account has the ENABLE_OPENFLOW_CDC_POSTGRES_SSV2 parameter set to true, the per-value limit can be raised from 16 MB to 128 MB.

Important

The 128 MB limit applies in two ways: it’s both the maximum size of a single value and the maximum total size of a row. The connector adds metadata columns to every replicated row (_SNOWFLAKE_UPDATED_AT, _SNOWFLAKE_INSERTED_AT, _SNOWFLAKE_DELETED) that count toward the per-row limit, along with all other columns in the row. As a result, a single value can’t reach the full 128 MB in practice: you must leave headroom of at least a few MB per row for this overhead.

The increased limit doesn’t apply equally to all column types.

Note

In Snowflake, the maximum size for BINARY is 64 MB (BINARY(67108864)), even when the increased size limits are enabled. Only VARCHAR, VARIANT, ARRAY and OBJECT columns can hold up to 128 MB.

Check the account parameter

To verify the value of the ENABLE_OPENFLOW_CDC_POSTGRES_SSV2 account parameter, run the following query.

SHOW PARAMETERS LIKE 'ENABLE_OPENFLOW_CDC_POSTGRES_SSV2' FOR ACCOUNT <account_locator>;

If the returned value is not true, the connector continues to enforce the 16 MB limit regardless of any processor configuration.

Configure the processors

Update the Oversized Value Limit property to 128 MB on both of the following processors:

  • Fetch Table Rows (in the Snapshot Load group)
  • Read PostgreSQL CDC Stream (in the Incremental Load group)

For each processor:

  1. Locate the processor in the flow.
  2. Right-click the processor and select Configure.
  3. Open the Properties tab.
  4. Set Oversized Value Limit to 128 MB.
  5. Apply the change.

For tables that are already being replicated and have destination columns narrower than VARCHAR(134217728) or BINARY(67108864), see Migrate existing tables.

Migrate existing tables

The steps in Increase the oversized value limit raise the limit for newly created destination tables. If a table is already being replicated and its destination column type is not VARCHAR(134217728) or BINARY(67108864), but you now want to load values larger than the original 16 MB limit, you must manually widen the column type on both the journal and destination tables.

Before you migrate, check the current destination column type, because it can vary depending on when the snapshot replication was performed.

Warning

You must stop replication for the affected table before altering its journal or destination tables. Altering these tables while replication is active can corrupt in-flight data.

To migrate a table:

  1. Stop replication for the affected table by stopping the topmost processors of the Snapshot Load and Incremental Load groups until all queues are empty. For the equivalent stop procedure, see the substeps in Reinstall the connector.
  2. Widen the column on both the journal table and the destination table, according to the column type:
    1. For VARCHAR columns, run a single ALTER TABLE ... ALTER COLUMN ... SET DATA TYPE VARCHAR(134217728) on both the journal and destination tables.
    2. For BINARY columns, Snowflake doesn’t allow widening BINARY in place, so do the following on both the journal and destination tables:
      1. Add a new column of type BINARY(67108864).
      2. Copy data from the original column into the new column.
      3. Drop the original column and rename the new column to the original name.
  3. Restart replication by re-enabling the processors.

Performance considerations

Raising the per-value limit increases the amount of data that the connector loads into memory and moves through the flow, which raises the load on both the runtime and the warehouse. Size the runtime and warehouse accordingly.

During both snapshot and incremental replication, the queue in front of the Upload Rows via Snowpipe Streaming 2 processor can fill with FlowFiles and trigger back pressure, which consumes a large amount of runtime disk space. For larger tables, use a Large runtime to provide additional storage. For guidance on choosing a size, see Runtime sizing.

Snapshot replication

During snapshot replication, the product of fetchSize * rowSize * concurrentQueries can’t exceed the heap size of the NiFi runtime, where:

  • fetchSize is the number of rows fetched per query, set on the Fetch Table Rows processor (default: 100).
  • rowSize is the size of a single row being fetched.
  • concurrentQueries is the number of concurrent queries, set on the Fetch Table Rows processor (default: 2).

This memory requirement applies even when Oversized Value Strategy is set to Set Null, because the connector must load each oversized value into memory before it can replace the value with NULL.

If the source database contains many densely packed oversized values, consider excluding the affected column from replication before you start the snapshot. For example, if a column contains 1 GB values, loading even nine rows (~9 GB) can exhaust the heap and cause an out-of-memory error on a Medium runtime.

To speed up snapshot replication, you can increase the number of channels that the Upload Rows via Snowpipe Streaming 2 processor uses. The number of channels is set by the processor’s Channel Group property, which defaults to ${chunk.index:isEmpty():ifElse('1', ${chunk.index:mod(8)})}.

To increase the number of channels:

  1. Locate the Upload Rows via Snowpipe Streaming 2 processor in the flow.
  2. Stop the processor. You must stop the processor before you can change its properties.
  3. Right-click the processor and select Configure.
  4. Open the Properties tab.
  5. In the Channel Group property, increase the value 8 in the expression. For example, change 8 to 16 to double the number of channels.
  6. Apply the change.
  7. Start the processor.

Warning

While a snapshot replication is in progress, only increase the number of channels. Decreasing the number of channels during an active snapshot can cause data loss.

Incremental replication

When the source produces frequent changes to rows that contain large values, you might need a Large warehouse. With smaller warehouses, replicating many 8 MB rows can cause an out-of-memory error. By contrast, replicating 128 MB rows with continuous merges completes without warehouse errors, because the connector streams the data file by file through the Upload Rows via Snowpipe Streaming 2 processor and the merge processes it gradually.

Upgrading PostgreSQL

Upgrading the connector requires a different approach depending on whether PostgreSQL is being upgraded to the next minor or major version.

Minor version upgrades

  • Are data safe.
  • Require no special treatment.
  • Require stopping the connector for the duration of the upgrade to avoid reporting connectivity issues.
  • Continue replicating, after the upgrade, with no data loss.

Major version upgrades

  • Require the PostgreSQL server to drop replication slots, including any used by the connector.
  • Cannot preserve, or migrate replication slots to the new version. See also PostgresSQL 17 and later versions upgrades.
  • Restart replicating all tables from the prior snapshot phase.

To perform a minor version upgrade, do the following:

  1. Stop the connector, including all Processors and Controller Services.
  2. Upgrade PostgreSQL.
  3. Restart the connector.

To perform a major version upgrade, do the following:

  1. Remove all tables from replication in the connector by clearing the Included Table Names and Included Table Regex parameters.
  2. Wait until all queues in the connector are empty.
  3. Remove the destination tables, by dropping them or renaming.
  4. Stop the connector, including all Processors and Controller Services.
  5. Open the Incremental Load group in the connector.
  6. Right-click the top Processor in the group, Read PostgreSQL CDC Stream, and select View state.
  7. Click Clear state.
  8. Click Close.
  9. Upgrade PostgreSQL.
  10. Restart the connector. A new replication slot will be created.
  11. Re-add all tables to begin replication.

PostgresSQL 17 and later versions upgrades

PostgreSQL 17 improved upgrading such that it no longer requires dropping replication slots when upgrading to later versions such as 17.1 » 18.0. Upgrading to PostgreSQL 17.0 or later from prior versions (16 and earlier) drops replications slots and should be treated as a major upgrade. Future versions of PostgreSQL may also improve the upgrade process further.

Reinstall the connector

This section describes how to reinstall the connector. It covers situations where the new connector is installed in the same runtime, or when it is moved to a new runtime. Reinstall is often used in conjunction with Incremental replication without snapshots.

Warning

For the connector to be able to continue replicating from the same CDC stream position where it stopped before reinstallation, the source database must retain the WAL long enough to cover the time since the old connector is stopped and the new connector is started. Ensure the max_wal_size parameter of the PostgreSQL server is high enough, depending on your traffic, and keep the reinstallation time to a minimum.

Prerequisites

Review and note connector parameter context values. If you’re reinstalling the connector in the same runtime, you can reuse the existing context. If the new instance will be located in a different runtime, you will have to re-enter all parameters.

To reinstall the connector:

  1. Finish processing all in-flight FlowFiles in the existing connector, and then stop the connector.

    1. Sign in to Snowsight.

    2. In the navigation menu, select Ingestion » Openflow.

    3. Select Launch Openflow.

    4. In the Openflow pane select the Runtimes tab.

    5. Select the runtime containing the connector.

    6. Select the connector.

    7. Stop the topmost processor Set Tables for Replication in the Snapshot Load group.

    8. Stop the topmost processor Read PostgreSQL CDC Stream in the Incremental Load group.

    9. If you changed the value of the Merge Task Schedule CRON parameter, return it to * * * * * ?, otherwise queues won’t be emptied until the next scheduled run.

      Wait until all FlowFiles in the connector have been processed, and all queues are empty. When all FlowFiles have been processed, the Queued value on the connector’s processor group becomes zero. If there are any items left in the original connector’s queues, there may be data gaps when the new connector starts.

    10. Stop all processors and controller services in the connector.

  2. Find and copy the name of the replication slot used by the original connector, by viewing the state of the topmost processor in the Incremental Load group with name Read PostgreSQL CDC Stream. The replication slot name is stored under the key replication.slot.name. Copy the value of the key to a text editor.

  3. Create a new instance of the connector. If you’re using the same runtime as the original connector, you can choose to keep the existing parameter contexts, and reuse the settings.

    Caution

    The existing connector can remain in the runtime and doesn’t interfere with the new instance, as long as it remains stopped.

  4. If you’re installing into a different runtime, or you deleted the previous parameter contexts, enter all the configuration settings into the new parameter contexts, including the table names and patterns as described in Set up the Openflow Connector for PostgreSQL.

  5. Open the PostgreSQL Ingestion Parameters context, and set Ingestion Type parameter to incremental. For more information on the concerns see Enable incremental replication without snapshots.

  6. Open the PostgreSQL Source Parameters context, and set the Replication Slot Name parameter to the value you copied earlier.

  7. Start the new connector.

Usage notes

The new connector will use the same, existing destination tables that created by the original connector, but will create new journal tables.