Set up the Openflow Connector for SQL Server (CDC)

Note

This connector is subject to the Snowflake Connector Terms.

This topic describes how to set up the Openflow Connector for SQL Server (CDC).

For information on the incremental load process, see Incremental replication.

Prerequisites

Before setting up the connector, ensure that you have completed the following prerequisites:

  1. Ensure that you have reviewed About Openflow Connector for SQL Server (CDC).

  2. Ensure that you have reviewed Supported SQL Server versions.

  3. Ensure that you have set up your runtime deployment. For more information, see the following topics:

  4. If you use Openflow - Snowflake Deployments, ensure that you have reviewed configuring required domains and have granted access to the required domains for the SQL Server connector.

Set up your SQL Server instance

Before setting up the connector, perform the following tasks in your SQL Server environment:

Note

You must perform these tasks as a database administrator.

  1. Enable Change Data Capture on the databases and tables that you plan to replicate:

    USE <database>;
    EXEC sys.sp_cdc_enable_db;
    
    EXEC sys.sp_cdc_enable_table
      @source_schema = N'<schema>',
      @source_name = N'<table>',
      @role_name = NULL;
    

    Note

    Run the sp_cdc_enable_table procedure for every table that you plan to replicate. Run sp_cdc_enable_db once per database.

    The connector requires that CDC is enabled on the databases and tables before replication starts. You can also enable CDC on additional tables while the connector is running.

    Note

    Platform-specific variants for enabling CDC at the database level. The sp_cdc_enable_table call shown above is the same on every platform; only the database-level enable procedure differs.

    • AWS RDS for SQL Server. You can’t call sys.sp_cdc_enable_db directly on RDS because RDS doesn’t expose the sysadmin server role. Use the RDS-provided wrapper instead:

      EXEC msdb.dbo.rds_cdc_enable_db '<database>';
      

      See Using change data capture for Amazon RDS for SQL Server. CDC isn’t supported on the Web edition of RDS for SQL Server.

    • Google Cloud SQL for SQL Server. You can’t call sys.sp_cdc_enable_db directly. Use the Cloud SQL-provided wrapper instead:

      EXEC msdb.dbo.gcloudsql_cdc_enable_db '<database>';
      

      See Enable change data capture (CDC) on Cloud SQL for SQL Server. Cloud SQL for SQL Server currently offers SQL Server 2017, 2019, and 2022 only.

    • Azure SQL Database (single database). Use the standard sys.sp_cdc_enable_db procedure. On the DTU-based purchasing model, CDC requires the S3 service tier or higher (CDC isn’t supported on Basic, S0, S1, or S2). On the vCore-based purchasing model, CDC is supported on any tier. See Change data capture with Azure SQL Database.

    • Azure SQL Managed Instance. Use the standard sys.sp_cdc_enable_db procedure. Enabling CDC requires membership in the sysadmin server role.

  2. Create a login for the SQL Server instance:

    CREATE LOGIN <user_name> WITH PASSWORD = '<password>';
    

    This login is used to create users for the databases you plan to replicate.

  3. Create a user for each database you are replicating by running the following SQL Server command in each database:

    USE <source_database>;
    CREATE USER <user_name> FOR LOGIN <user_name>;
    
  4. Grant the required permissions to the user for each database that you are replicating.

    Add the user to the db_datareader role and grant SELECT on the cdc schema so the connector can read both the source tables and the CDC change tables:

    ALTER ROLE db_datareader ADD MEMBER <user_name>;
    GRANT SELECT ON SCHEMA::cdc TO <user_name>;
    

    Run these commands in each database that you plan to replicate.

    Note

    These permissions give the connector read access to every user table in the database. To scope access more tightly, grant SELECT only on the specific tables being replicated and on SCHEMA::cdc instead of adding the user to the db_datareader role.

  5. Deploy the Openflow CDC wrapper procedures so the connector can manage capture instances and apply source schema changes autonomously. For more information, see Deploy the Openflow CDC wrapper procedures.

  6. (Optional) Grant the VIEW DEFINITION privilege on the User Defined Data Types (UDDT).

    If your tables contain columns that use User Defined Data Types (UDDT), and the UDDT is owned by a different user than the connector user, you must grant the VIEW DEFINITION permission to the connector user as shown in the following SQL Server example:

    GRANT VIEW DEFINITION TO <user_name>;
    

    Without this permission, columns using UDDT are silently excluded from replication.

  7. (Optional) Configure SSL connection.

    If you use an SSL connection to connect SQL Server, create the root certificate for your database server. This is required when configuring the connector.

Deploy the Openflow CDC wrapper procedures

The connector applies supported source table schema changes (DDL) without stopping replication or requiring a manual re-snapshot. To do this, the connector manages SQL Server capture instances autonomously: when a tracked table’s schema changes, the connector creates a new capture instance for the updated schema and drops the old one after it finishes the transition. For an overview of the process, see Schema changes.

Creating and dropping capture instances normally requires db_owner. Rather than granting the connector that level of access, deploy a small set of wrapper procedures that perform these operations on the connector’s behalf and grant the connector permission to run only those two procedures.

This design has the following properties:

  • The connector can run only the two wrapper procedures. For capture-instance management, the connector is granted EXECUTE on only dbo.sf_openflow_cdc_enable_table and dbo.sf_openflow_cdc_disable_table. It doesn’t hold db_owner and can’t call the underlying sys.sp_cdc_enable_table or sys.sp_cdc_disable_table procedures directly. The wrapper procedures run with EXECUTE AS OWNER, so they supply the elevated privileges only for the specific, audited operation.
  • Every operation is recorded in an audit table. Each invocation of a wrapper procedure writes an attempt row to the dbo.openflow_cdc_audit table before it calls the engine, then a success or failure row (including the SQL Server error number and message on failure) after the call. The rows are append-only: the wrappers never update or delete audit rows.

Deploy the procedures as a database administrator. Run the following scripts, in order, in each CDC-enabled database being replicated. Run them as a principal that already holds db_owner.

Note

Perform these tasks as a database administrator, after creating the connector’s database user as described in Set up your SQL Server instance.

  1. openflow_cdc_audit_setup.sql: Creates the append-only dbo.openflow_cdc_audit table that the wrapper procedures write to.

    SET NOCOUNT ON;
    
    -- Audit log. Each wrapper invocation writes one 'attempt' row before delegating to
    -- sys.sp_cdc_*, then either a 'success' or 'failure' row sharing the same attempt_id.
    -- Rows are append-only.
    IF OBJECT_ID(N'dbo.openflow_cdc_audit', N'U') IS NULL
    BEGIN
        CREATE TABLE dbo.openflow_cdc_audit (
            audit_id         bigint           IDENTITY(1,1) NOT NULL CONSTRAINT pk_openflow_cdc_audit PRIMARY KEY,
            attempt_id       uniqueidentifier NOT NULL,
            event_time       datetime2(3)     NOT NULL,
            event_kind       varchar(16)      NOT NULL,
            action           varchar(16)      NOT NULL,
            source_schema    sysname          NOT NULL,
            source_name      sysname          NOT NULL,
            capture_instance sysname          NOT NULL,
            caller           sysname          NOT NULL,
            error_number     int              NULL,
            error_message    nvarchar(4000)   NULL,
            CONSTRAINT ck_openflow_cdc_audit_event_kind CHECK (event_kind IN ('attempt', 'success', 'failure')),
            CONSTRAINT ck_openflow_cdc_audit_action     CHECK (action     IN ('enable',  'disable'))
        );
    
        CREATE INDEX ix_openflow_cdc_audit_attempt    ON dbo.openflow_cdc_audit(attempt_id);
        CREATE INDEX ix_openflow_cdc_audit_event_time ON dbo.openflow_cdc_audit(event_time);
    END;
    GO
    
  2. sf_openflow_cdc_enable_table.sql: Creates the wrapper procedure that the connector calls to add a new capture instance during a schema transition.

    CREATE OR ALTER PROCEDURE dbo.sf_openflow_cdc_enable_table
        @source_schema    sysname,
        @source_name      sysname,
        @capture_instance sysname
    WITH EXECUTE AS OWNER
    AS
    BEGIN
        SET NOCOUNT ON;
    
        -- 1. Table must exist and not be a system table.
        IF OBJECT_ID(QUOTENAME(@source_schema) + N'.' + QUOTENAME(@source_name), N'U') IS NULL
            THROW 50001, 'Source table does not exist or is not a user table.', 1;
    
        -- 2. The connector supplies the full capture instance name. Validate that
        --    it is present and fits within the 100-character limit imposed by CDC.
        IF @capture_instance IS NULL OR LEN(@capture_instance) = 0
            THROW 50002, 'Capture instance name must be provided.', 1;
    
        IF LEN(@capture_instance) > 100
            THROW 50003, 'Capture instance name exceeds the 100-character limit.', 1;
    
        -- 3. Record the attempt.
        DECLARE @attempt_id uniqueidentifier = NEWID();
        DECLARE @caller     sysname          = ORIGINAL_LOGIN();
    
        INSERT INTO dbo.openflow_cdc_audit
            (attempt_id, event_time, event_kind, action, source_schema, source_name, capture_instance, caller)
        VALUES
            (@attempt_id, SYSUTCDATETIME(), 'attempt', 'enable',
             @source_schema, @source_name, @capture_instance, @caller);
    
        -- 4. Delegate to the engine procedure and record the outcome.
        BEGIN TRY
            EXEC sys.sp_cdc_enable_table
                @source_schema    = @source_schema,
                @source_name      = @source_name,
                @capture_instance = @capture_instance,
                @role_name        = NULL;
    
            INSERT INTO dbo.openflow_cdc_audit
                (attempt_id, event_time, event_kind, action, source_schema, source_name, capture_instance, caller)
            VALUES
                (@attempt_id, SYSUTCDATETIME(), 'success', 'enable',
                 @source_schema, @source_name, @capture_instance, @caller);
        END TRY
        BEGIN CATCH
            DECLARE @err_num int            = ERROR_NUMBER();
            DECLARE @err_msg nvarchar(4000) = ERROR_MESSAGE();
    
            INSERT INTO dbo.openflow_cdc_audit
                (attempt_id, event_time, event_kind, action, source_schema, source_name,
                 capture_instance, caller, error_number, error_message)
            VALUES
                (@attempt_id, SYSUTCDATETIME(), 'failure', 'enable',
                 @source_schema, @source_name, @capture_instance, @caller,
                 @err_num, @err_msg);
    
            ;THROW;
        END CATCH
    END;
    GO
    
  3. sf_openflow_cdc_disable_table.sql: Creates the wrapper procedure that the connector calls to drop the old capture instance after a schema transition completes.

    CREATE OR ALTER PROCEDURE dbo.sf_openflow_cdc_disable_table
        @source_schema    sysname,
        @source_name      sysname,
        @capture_instance sysname
    WITH EXECUTE AS OWNER
    AS
    BEGIN
        SET NOCOUNT ON;
    
        -- 1. The connector supplies the full capture instance name. Validate that it is present.
        IF @capture_instance IS NULL OR LEN(@capture_instance) = 0
            THROW 50002, 'Capture instance name must be provided.', 1;
    
        -- 2. Record the attempt.
        DECLARE @attempt_id uniqueidentifier = NEWID();
        DECLARE @caller     sysname          = ORIGINAL_LOGIN();
    
        INSERT INTO dbo.openflow_cdc_audit
            (attempt_id, event_time, event_kind, action, source_schema, source_name, capture_instance, caller)
        VALUES
            (@attempt_id, SYSUTCDATETIME(), 'attempt', 'disable',
             @source_schema, @source_name, @capture_instance, @caller);
    
        -- 3. Delegate to the engine procedure and record the outcome.
        BEGIN TRY
            EXEC sys.sp_cdc_disable_table
                @source_schema    = @source_schema,
                @source_name      = @source_name,
                @capture_instance = @capture_instance;
    
            INSERT INTO dbo.openflow_cdc_audit
                (attempt_id, event_time, event_kind, action, source_schema, source_name, capture_instance, caller)
            VALUES
                (@attempt_id, SYSUTCDATETIME(), 'success', 'disable',
                 @source_schema, @source_name, @capture_instance, @caller);
        END TRY
        BEGIN CATCH
            DECLARE @err_num int            = ERROR_NUMBER();
            DECLARE @err_msg nvarchar(4000) = ERROR_MESSAGE();
    
            INSERT INTO dbo.openflow_cdc_audit
                (attempt_id, event_time, event_kind, action, source_schema, source_name,
                 capture_instance, caller, error_number, error_message)
            VALUES
                (@attempt_id, SYSUTCDATETIME(), 'failure', 'disable',
                 @source_schema, @source_name, @capture_instance, @caller,
                 @err_num, @err_msg);
    
            ;THROW;
        END CATCH
    END;
    GO
    
  4. openflow_cdc_grants.sql: Grants the connector’s database user permission to run the two wrapper procedures and to read the CDC metadata, change tables, and audit trail. Replace <user_name> with the connector’s database user created in Set up your SQL Server instance, then run the script.

    SET NOCOUNT ON;
    
    -- Replace <user_name> with the connector's SQL Server database user.
    DECLARE @connector sysname = N'<user_name>';
    
    -- The principal must already exist in this database.
    IF DATABASE_PRINCIPAL_ID(@connector) IS NULL
        THROW 50100,
              'Connector principal does not exist in this database. Create the user first, then re-run this script.',
              1;
    
    DECLARE @sql nvarchar(max);
    
    -- 1. Allow the connector to call the two wrapper procedures. All CDC management
    --    goes through these wrappers, so no elevated role is required.
    SET @sql = N'GRANT EXECUTE ON dbo.sf_openflow_cdc_enable_table  TO ' + QUOTENAME(@connector);
    EXEC sys.sp_executesql @sql;
    
    SET @sql = N'GRANT EXECUTE ON dbo.sf_openflow_cdc_disable_table TO ' + QUOTENAME(@connector);
    EXEC sys.sp_executesql @sql;
    
    -- 2. Allow the connector to read its own audit trail.
    SET @sql = N'GRANT SELECT ON dbo.openflow_cdc_audit TO ' + QUOTENAME(@connector);
    EXEC sys.sp_executesql @sql;
    
    -- 3. Allow the connector to enumerate capture instances and read CDC change tables.
    SET @sql = N'GRANT SELECT ON SCHEMA::cdc TO ' + QUOTENAME(@connector);
    EXEC sys.sp_executesql @sql;
    GO
    

Note

The connector also needs SELECT on each replicated source table. SQL Server applies row-level filtering to cdc.change_tables for callers that don’t hold db_owner, returning only rows for source tables that the caller can read. The db_datareader role granted in Set up your SQL Server instance satisfies this requirement. If access was scoped more tightly instead of using db_datareader, make sure the connector has SELECT on every source table that it replicates.

  1. Verify the deployment. Confirm that the two wrapper procedures exist and that the audit table is queryable:

    SELECT name FROM sys.procedures WHERE name LIKE 'sf_openflow%';
    SELECT TOP 1 1 FROM dbo.openflow_cdc_audit;
    

    The first query returns both sf_openflow_cdc_enable_table and sf_openflow_cdc_disable_table. The second query confirms that the audit table exists and is readable.

Set up your Snowflake environment

As a Snowflake administrator, perform the following tasks:

  1. Create a destination database in Snowflake to store the replicated data:

    CREATE DATABASE <destination_database>;
    
  2. Create a Snowflake service user:

    CREATE USER <openflow_user>
      TYPE = SERVICE
      COMMENT='Service user for automated access of Openflow';
    
  3. Create a Snowflake role for the connector and grant the required privileges:

    CREATE ROLE <openflow_role>;
    GRANT ROLE <openflow_role> TO USER <openflow_user>;
    GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>;
    GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
    

    Use this role to manage the connector’s access to the Snowflake database.

    To create objects in the destination database, you must grant the USAGE and CREATE SCHEMA privileges on the database to the role used to manage access.

  4. Create a Snowflake warehouse for the connector and grant the required privileges:

    CREATE WAREHOUSE <openflow_warehouse> WITH
      WAREHOUSE_SIZE = 'XSMALL'
      AUTO_SUSPEND = 300
      AUTO_RESUME = TRUE;
    GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
    

    Snowflake recommends starting with a XSMALL warehouse size, then experimenting with size depending on the number of tables being replicated and the amount of data transferred. Large numbers of tables typically scale better with multi-cluster warehouses, rather than a larger warehouse size. For more information, see multi-cluster warehouses.

  5. Set up the public and private keys for key pair authentication:

    1. Create a pair of secure keys (public and private).

    2. Store the private key for the user in a file to supply to the connector’s configuration.

    3. Assign the public key to the Snowflake service user:

      ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
      

      For more information, see Key-pair authentication and key-pair rotation.

Install the connector

To install the connector, do the following as a data engineer:

  1. Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.

  2. On the Openflow connectors page, find the connector and select Add to runtime.

  3. In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.

    Note

    Before you install the connector, ensure that you have created a database and schema in Snowflake for the connector to store ingested data.

  4. Authenticate to the deployment with your Snowflake account credentials and select Allow when prompted to allow the runtime application to access your Snowflake account. The connector installation process takes a few minutes to complete.

  5. Authenticate to the runtime with your Snowflake account credentials.

The Openflow canvas appears with the connector process group added to it.

Runtime sizing

The runtime size determines the CPU and memory available to the connector. The available sizes are Small, Medium, and Large. The connector requires Medium or Large. Choose the size when you create the runtime: you can’t change the size of an existing runtime in place.

Choose Large if you expect high replication throughput or if source tables contain wide rows.

Resize a runtime

Runtime size is fixed at creation, so to change size you run the connector on a different runtime. You have two options depending on whether you want to preserve the current replication progress.

If you don’t need to keep the progress of the current connector, the simplest path is to create a new runtime at the size you need and install a new connector instance on it. The new connector starts from scratch: it snapshots all configured tables and then captures ongoing changes from that point. The replication progress of the existing connector is discarded.

To keep the progress of the current connector, for example to avoid re-snapshotting tables that took a long time to snapshot initially, migrate the connector to the new runtime. This reuses the existing destination tables and resumes incremental replication from where it left off.

For migration instructions, see Reinstall the connector.

Configure the connector

To configure the connector, do the following as a data engineer:

  1. Right-click on the imported process group and select Parameters.

  2. Populate the required parameter values.

    For more information on the required parameter values, see the following sections:

Start by setting the parameters of the SQLServer Source Parameters context, then the SQLServer Destination Parameters context. After you complete this, enable the connector. The connector connects to both SQLServer and Snowflake and starts running. However, the connector doesn’t replicate any data until any tables to be replicated are explicitly added to its configuration.

To configure specific tables for replication, edit the SQLServer Ingestion Parameters context. After you apply the changes to the SQLServer Ingestion Parameters context, the configuration is picked up by the connector, and the replication lifecycle starts for every table.

SQLServer Source Parameters

ParameterDescription
SQLServer Connection URL

The full JDBC URL used to connect to the source.

For a standalone SQL Server instance or Azure SQL Managed Instance, point the URL at the instance. The connector discovers the databases to replicate from that instance.

  • jdbc:sqlserver://example.com:1433;encrypt=false

For Azure SQL Database, point the URL at a specific database using the databaseName property. Use one connector instance per database you want to replicate.

  • jdbc:sqlserver://your-server.database.windows.net:1433;encrypt=true;databaseName=your_database
SQLServer JDBC DriverSelect the Reference asset checkbox to upload the SQL Server JDBC driver.
SQLServer UsernameThe user name for the connector.
SQLServer PasswordThe password for the connector.

Note

Azure SQL Database refers to the single-database PaaS offering, not Azure SQL Managed Instance.

SQLServer Destination Parameters

ParameterDescriptionRequired
Destination Database

The database where data is persisted. It must already exist in Snowflake. The name is case-sensitive. For unquoted identifiers, provide the name in uppercase.

Yes
Destination Schema Pattern

A pattern for the names of destination schemas where data is persisted. The connector creates the schemas if they don’t exist.

You can customize the pattern per ingested table using these optional variables:

  • ${source.database.name}: a source table’s database.
  • ${source.schema.name}: a source table’s schema.
  • ${source.table.name}: a source table’s name.

For example, for a table with the qualified name source_db.tenant_a.data, the pattern prefix_${source.database.name}_${source.schema.name} evaluates to prefix_source_db_tenant_a.

To ingest all tables into a single schema, provide a schema name without any variables, like destination_schema.

Important

Don’t change this setting after the connector has begun ingesting data. Changing this setting after ingestion has begun breaks the existing ingestion. If you must change this setting, create a new connector instance.

Yes
Snowflake Authentication Strategy

When using:

  • Snowflake Openflow Deployment or BYOC: Use SNOWFLAKE_MANAGED_TOKEN. This token is managed automatically by Snowflake. BYOC deployments must have previously configured runtime roles to use SNOWFLAKE_MANAGED_TOKEN.
  • BYOC: Alternatively BYOC can use KEY_PAIR as the value for authentication strategy.
Yes
Snowflake Account Identifier

When using:

  • Session Token Authentication Strategy: Must be blank.
  • KEY_PAIR: Snowflake account name formatted as [organization-name]-[account-name] where data is persisted.
Yes
Snowflake Connection Strategy

When using KEY_PAIR, specify the strategy for connecting to Snowflake:

  • STANDARD (default): Connect using standard public routing to Snowflake services.
  • PRIVATE_CONNECTIVITY: Connect using private addresses associated with the supporting cloud platform such as AWS PrivateLink.
Required for BYOC with KEY_PAIR only, otherwise ignored.
Snowflake Object Identifier Resolution

Specifies how source object identifiers such as schemas, tables, and columns names are stored and queried in Snowflake. This setting dictates whether you must use double quotes in SQL queries.

Option 1: Default, case-insensitive (recommended).

  • Transformation: All identifiers are converted to uppercase. For example, My_Table becomes MY_TABLE.
  • Queries: SQL queries are case-insensitive and don’t require SQL double quotes.

For example SELECT * FROM my_table; returns the same results as SELECT * FROM MY_TABLE;.

Note

Snowflake recommends using this option if database objects are not expected to have mixed case names.

Important

Do not change this setting after connector ingestion has begun. Changing this setting after ingestion has begun breaks the existing ingestion. If you must change this setting, create a new connector instance.

Option 2: case-sensitive.

  • Transformation: Case is preserved. For example, My_Table remains My_Table.
  • Queries: SQL queries must use double quotes to match the exact case for database objects. For example, SELECT * FROM "My_Table";.

Note

Snowflake recommends using this option if you must preserve source casing for legacy or compatibility reasons. For example, if the source database includes table names that differ in case only, such as MY_TABLE and my_table, that result in a name collision when using case-insensitive comparisons.

Yes
Snowflake Private Key

When using:

  • Session Token Authentication Strategy: Must be blank.
KEY_PAIR: Must be the RSA private key used for authentication.

The RSA key must be formatted according to PKCS8 standards and have standard PEM headers and footers. Note that either a Snowflake Private Key File or a Snowflake Private Key must be defined.

No
Snowflake Private Key File

When using:

  • Session token authentication strategy: The private key file must be blank.
  • KEY_PAIR: Upload the file that contains the RSA private key used for authentication to Snowflake, formatted according to PKCS8 standards and including standard PEM headers and footers. The header line begins with -----BEGIN PRIVATE. To upload the private key file, select the Reference asset checkbox.
No
Snowflake Private Key Password

When using:

  • Session Token Authentication Strategy: Must be blank.
  • KEY_PAIR: Provide the password associated with the Snowflake Private Key File.
No
Snowflake Role

When using:

  • Session Token Authentication Strategy: Use Snowflake Role assigned to the runtime or child role granted to this Snowflake Role. You can find your runtime Snowflake Role in the Openflow UI, by expanding the More Options [⋮] button for your runtime and selecting Set Snowflake role.
  • KEY_PAIR Authentication Strategy: Use a valid role configured for your service user.
Yes
Snowflake Username

When using:

  • Session Token Authentication Strategy: Must be blank.
  • KEY_PAIR: Provide the user name used to connect to the Snowflake instance.
Yes
Oversized Value Strategy

Determines how the connector handles values that exceed its internal size limits (16 MB) during replication. Possible values are:

  • Fail Table (default): The table is marked as permanently failed, and replication stops for that table.
  • Set Null: The value is replaced with NULL in the destination table. Use this to prevent table failures when it is acceptable to lose data in tables beyond the oversized value.
No
Snowflake WarehouseSnowflake warehouse used to run queries.Yes

SQLServer Ingestion Parameters

ParameterDescription
Included Table Names

A comma-separated list of source table paths, including their databases and schemas, for example:

database_1.public.table_1, database_2.schema_2.table_2

Included Table Regex

A regular expression to match against table paths, including database and schema names. Every path matching the expression is replicated, and new tables matching the pattern that are created later are also included automatically, for example:

database_name\.public\.auto_.*

Column Filter JSON

Optional. A JSON array of filter objects specifying which columns to include or exclude per table. For syntax details and examples, see Replicate a subset of columns in a table.

Merge Task Schedule CRON

CRON expression defining periods when merge operations from Journal to Destination Table will be triggered. Set it to * * * * * ? if you want to have continuous merge or time schedule to limit warehouse run time.

For example:

  • The string * 0 * * * ? indicates that you want to schedule merges at full hour for one minute
  • The string * 20 14 ? * MON-FRI indicates that you want to schedule merges at 2:20 PM every Monday through Friday.

For additional information and examples, see the cron triggers tutorial in the Quartz Documentation

Restart table replication

A table in FAILED state — for example, due to a missing primary key or unsupported schema change — does not restart automatically. If a table enters a FAILED state or you need to restart replication from scratch, use the following procedure to remove and re-add the table to replication.

Note

If the failure was caused by an issue in the source table such as a missing primary key, resolve that issue in the source database before continuing.

  1. Remove the table from replication, using one of the following methods:

    • Add the table to the Re-snapshot Table Exclusions parameter to temporarily exclude it from replication. This is convenient when the table is matched by an Included Table Regex that you don’t want to change.
    • In the Ingestion Parameters context, either remove the table from Included Table Names or modify the Included Table Regex so the table is no longer matched.
  2. Verify the table has been removed:

    1. In the Openflow runtime canvas, right-click a processor group and choose Controller Services.
    2. In the table listing controller services, locate the Table State Store row, click the three vertical dots on the right side of the row, then choose View State.

    Important

    You must wait until the table’s state is fully removed from this list before proceeding. Do not continue until this configuration change has completed.

  3. Clean up the destination: Once the table’s state shows as fully removed, manually DROP the destination table in Snowflake. Note that the connector will not overwrite an existing destination table during the snapshot phase; if the table still exists, replication will fail again. Optionally, the journal table and stream can also be removed if they are no longer needed.

  4. Re-add the table by reversing the change you made in the first step: either remove the table from Re-snapshot Table Exclusions, or add it back to Included Table Names or Included Table Regex. The connector then re-snapshots the table.

  5. Verify the restart: Check the Table State Store using the instructions given previously. The state of the table should appear with the status NEW, then transition to SNAPSHOT_REPLICATION, and finally INCREMENTAL_REPLICATION.

Replicate a subset of columns in a table

The connector can filter the data replicated per table to a subset of configured columns. Primary key columns are always included regardless of exclusions.

To apply column filters, set the Column Filter JSON parameter in the Ingestion Parameters context to a JSON array of filter objects, one per table you want to filter.

Columns can be included or excluded by name or by regular expression pattern. You can apply a single condition per table, or combine multiple conditions, with exclusions always taking precedence over inclusions.

Syntax

Each object in the array identifies a table and specifies which columns to include or exclude. Because this connector uses three-part fully qualified names (database, schema, and table), each object can include a database or databasePattern field in addition to the schema and table fields.

[
    {
        "database": "<database>" | "databasePattern": "<regex>",
        "schema": "<schema>" | "schemaPattern": "<regex>",
        "table": "<table>" | "tablePattern": "<regex>",
        "included": ["<column>", "<column>"],
        "excluded": ["<column>", "<column>"],
        "includedPattern": "<regex>",
        "excludedPattern": "<regex>"
    }
]

The following rules apply:

  • Use database, schema, and table for exact name matching, or databasePattern, schemaPattern, and tablePattern for regex matching. You can’t use both a field and its pattern variant in the same object (for example, schema and schemaPattern can’t both appear).
  • At least one of included, excluded, includedPattern, or excludedPattern must be provided.
  • When both included and excluded filters are specified, exclusions take precedence.
  • When multiple filters match the same table, the last matching filter is used, with exact matches taking precedence over pattern-based filters.
  • The value can be an array of objects to apply different filters to different tables.

Examples

Include specific columns by name:

[
    {
        "database": "my_db",
        "schema": "dbo",
        "table": "orders",
        "included": ["account_id", "status", "created_at"]
    }
]

Exclude specific columns by name:

[
    {
        "database": "my_db",
        "schema": "dbo",
        "table": "orders",
        "excluded": ["internal_note", "debug_flag"]
    }
]

Combine an include pattern with a specific exclusion (for example, include all email columns except admin_email):

[
    {
        "database": "my_db",
        "schema": "dbo",
        "table": "contacts",
        "includedPattern": ".*_email",
        "excluded": ["admin_email"]
    }
]

Mix a database pattern with an exact schema and table name to apply a filter across databases:

[
    {
        "databasePattern": "prod_.*",
        "schema": "dbo",
        "table": "customers",
        "excluded": ["internal_note"]
    }
]

Pass multiple filter objects to apply different rules to different tables:

[
    {"database": "my_db", "schema": "dbo", "table": "orders", "included": ["account_id", "status"]},
    {"database": "my_db", "schema": "dbo", "table": "customers", "excludedPattern": ".*_internal"}
]

Including and excluding the same column

Removing a column from a table’s replicated set (by excluding it or by removing it from the included list) has the same effect on the destination as dropping the column at the source: the connector soft-deletes the column on the destination by renaming it with a suffix (by default, __SNOWFLAKE_DELETED). If you then add the column back to the replicated set and later remove it a second time, replication for the affected table fails because the soft-deleted column name is already taken. To recover, restart replication for the affected table.

Replicate a partitioned table

The connector supports replication of partitioned tables. A SQL Server partitioned table is replicated into Snowflake as a single destination table, containing data from all partitions.

To replicate a partitioned table, ensure that CDC is enabled on the partitioned table, as described in Set up your SQL Server instance.

Track data changes in tables

The connector replicates the current state of data from the source tables, as well as detected changes from each polling interval. This data is stored in journal tables created in the same schema as the destination table.

The journal table names are formatted as: <source_table_name>_JOURNAL_<timestamp>_<schema_generation> where <timestamp> is the value of epoch seconds when the source table was added to replication, and <schema_generation> is an integer increasing with every schema change on the source table. As a result, source tables that undergo schema changes will have multiple journal tables.

When you remove a table from replication, then add it back, the <timestamp> value changes, and <schema_generation> starts again from 1.

Important

Snowflake recommends not altering the structure of journal tables in any way. The connector uses them to update the destination table as part of the replication process.

The connector never drops journal tables, but uses the latest journal for every replicated source table, only reading append-only streams on top of journals. To reclaim the storage, you can:

  • Truncate all journal tables at any time.
  • Drop the journal tables related to source tables that were removed from replication.
  • Drop all but the latest generation journal tables for actively replicated tables.

For example, if your connector is set to actively replicate source table orders, and you have earlier removed table customers from replication, you may have the following journal tables. In this case you can drop all of them except orders_5678_2.

customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2

Configure scheduling of merge tasks

The connector uses a warehouse to merge CDC data into destination tables. This operation is triggered by the MergeSnowflakeJournalTable processor. If there are no new changes or if no new flow files are waiting in the MergeSnowflakeJournalTable queue, no merge is triggered and the warehouse auto-suspends.

Use the CRON expression in the Merge task Schedule CRON parameter to limit the warehouse cost and limit merges to only scheduled time. It throttles the flow files coming to the MergeSnowflakeJournalTable processor and merges are triggered only in a dedicated period of time. For more information about scheduling, see Scheduling strategy.

Run the flow

  1. Right-click on the plane and select Enable all Controller Services.
  2. Right-click on the imported process group and select Start. The connector starts the data ingestion.