Set up the Openflow Connector for SQL Server

Note

This connector is subject to the Snowflake Connector Terms.

This topic describes how to set up the Openflow Connector for SQL Server.

For information on the incremental load process, see Incremental replication.

Prerequisites

Before setting up the connector, ensure that you have completed the following prerequisites:

  1. Ensure that you have reviewed About Openflow Connector for SQL Server.

  2. Ensure that you have reviewed Supported SQL Server versions.

  3. Ensure that you have set up your runtime deployment. For more information, see the following topics:

  4. If you use Openflow - Snowflake Deployments, ensure that you have reviewed configuring required domains and have granted access to the required domains for the SQL Server connector.

Set up your SQL Server instance

Before setting up the connector, perform the following tasks in your SQL Server environment:

Note

You must perform these tasks as a database administrator.

  1. Enable change tracking on the databases and tables that you plan to replicate, as shown in the following SQL Server example:

    ALTER DATABASE <database>
      SET CHANGE_TRACKING = ON
      (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON);
    
    ALTER TABLE <schema>.<table>
      ENABLE CHANGE_TRACKING
      WITH (TRACK_COLUMNS_UPDATED = ON);
    
    Copy

    Note

    Run these commands for every database and table that you plan to replicate.

    The connector requires that change tracking is enabled on the databases and tables before replication starts. Ensure that every table that you plan to replicate has enabled change tracking. You can also enable change tracking on additional tables while the connector is running.

  2. Create a login for the SQL Server instance:

    CREATE LOGIN <user_name> WITH PASSWORD = '<password>';
    
    Copy

    This login is used to create users for the databases you plan to replicate.

  3. Create a user for each database you are replicating by running the following SQL Server command in each database:

    USE <source_database>;
    CREATE USER <user_name> FOR LOGIN <user_name>;
    
    Copy
  4. Grant the SELECT and VIEW CHANGE TRACKING permissions to the user for each database that you are replicating:

    GRANT SELECT ON <database>.<schema>.<table> TO <user_name>;
    GRANT VIEW CHANGE TRACKING ON <database>.<schema>.<table> TO <user_name>;
    
    Copy

    Run these commands in each database for every table that you plan to replicate. These permissions must be granted to the user of each database that you created in a previous step.

  5. (Optional) Configure SSL connection.

    If you use an SSL connection to connect SQL Server, create the root certificate for your database server. This is required when configuring the connector.

Set up your Snowflake environment

As a Snowflake administrator, perform the following tasks:

  1. Create a destination database in Snowflake to store the replicated data:

    CREATE DATABASE <destination_database>;
    
    Copy
  2. Create a Snowflake service user:

    CREATE USER <openflow_user>
      TYPE = SERVICE
      COMMENT='Service user for automated access of Openflow';
    
    Copy
  3. Create a Snowflake role for the connector and grant the required privileges:

    CREATE ROLE <openflow_role>;
    GRANT ROLE <openflow_role> TO USER <openflow_user>;
    GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>;
    GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
    
    Copy

    Use this role to manage the connector’s access to the Snowflake database.

    To create objects in the destination database, you must grant the USAGE and CREATE SCHEMA privileges on the database to the role used to manage access.

  4. Create a Snowflake warehouse for the connector and grant the required privileges:

    CREATE WAREHOUSE <openflow_warehouse> WITH
      WAREHOUSE_SIZE = 'MEDIUM'
      AUTO_SUSPEND = 300
      AUTO_RESUME = TRUE;
    GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
    
    Copy

    Snowflake recommends starting with a MEDIUM warehouse size, then experimenting with size depending on the number of tables being replicated and the amount of data transferred. Large numbers of tables typically scale better with multi-cluster warehouses, rather than a larger warehouse size. For more information, see multi-cluster warehouses.

  5. Set up the public and private keys for key pair authentication:

    1. Create a pair of secure keys (public and private).

    2. Store the private key for the user in a file to supply to the connector’s configuration.

    3. Assign the public key to the Snowflake service user:

      ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
      
      Copy

      For more information, see Key-pair authentication and key-pair rotation.

Configure the connector

As a data engineer, install and configure the connector using the following sections.

Install the connector

  1. Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.

  2. On the Openflow connectors page, find the connector and select Add to runtime.

  3. In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.

    Note

    Before you install the connector, ensure that you have created a database and schema in Snowflake for the connector to store ingested data.

  4. Authenticate to the deployment with your Snowflake account credentials and select Allow when prompted to allow the runtime application to access your Snowflake account. The connector installation process takes a few minutes to complete.

  5. Authenticate to the runtime with your Snowflake account credentials.

The Openflow canvas appears with the connector process group added to it.

Configure the connector

To configure the connector, perform the following steps:

  1. Right-click on the imported process group and select Parameters.

  2. Populate the required parameter values as described in Flow parameters.

Flow parameters

Start by setting the parameters of the SQLServer Source Parameters context, then the SQLServer Destination Parameters context. After you complete this, enable the connector. The connector connects to both SQLServer and Snowflake and starts running. However, the connector does not replicate any data until any tables to be replicated are explicitly added to its configuration.

To configure specific tables for replication, edit the SQLServer Ingestion Parameters context. After you apply the changes to the SQLServer Ingestion Parameters context, the configuration is picked up by the connector, and the replication lifecycle starts for every table.

SQLServer Source Parameters context

Parameter

Description

SQL Server Connection URL

The full JDBC URL to the source database.

Example:

  • jdbc:sqlserver://example.com:1433;encrypt=false;

SQL Server JDBC Driver

Select the Reference asset checkbox to upload the SQL Server JDBC driver.

SQL Server Username

The user name for the connector.

SQL Server Password

The password for the connector.

SQLServer Destination Parameters context

Parameter

Description

Required

Destination Database

The database where data is persisted. It must already exist in Snowflake. The name is case-sensitive. For unquoted identifiers, provide the name in uppercase.

Yes

Snowflake Authentication Strategy

When using:

  • Snowflake Openflow Deployment or BYOC: Use SNOWFLAKE_SESSION_TOKEN. This token is managed automatically by Snowflake. BYOC deployments must have previously configured runtime roles to use SNOWFLAKE_SESSION_TOKEN.

  • BYOC: Alternatively BYOC can use KEY_PAIR as the value for authentication strategy.

Yes

Snowflake Account Identifier

When using:

  • Session Token Authentication Strategy: Must be blank.

  • KEY_PAIR: Snowflake account name formatted as [organization-name]-[account-name] where data is persisted.

Yes

Snowflake Object Identifier Resolution

Specifies how source object identifiers such as schemas, tables, and columns names are stored and queried in Snowflake. This setting dictates whether you must use double quotes in SQL queries.

Option 1: Default, case-insensitive (recommended).

  • Transformation: All identifiers are converted to uppercase. For example, My_Table becomes MY_TABLE.

  • Queries: SQL queries are case-insensitive and don’t require SQL double quotes.

    For example SELECT * FROM my_table; returns the same results as SELECT * FROM MY_TABLE;.

Note

Snowflake recommends using this option if database objects are not expected to have mixed case names.

Important

Do not change this setting after connector ingestion has begun. Changing this setting after ingestion has begun breaks the existing ingestion. If you must change this setting, create a new connector instance.

Option 2: case-sensitive.

  • Transformation: Case is preserved. For example, My_Table remains My_Table.

  • Queries: SQL queries must use double quotes to match the exact case for database objects. For example, SELECT * FROM "My_Table";.

Note

Snowflake recommends using this option if you must preserve source casing for legacy or compatibility reasons. For example, if the source database includes table names that differ in case only, such as MY_TABLE and my_table, that result in a name collision when using case-insensitive comparisons.

Yes

Snowflake Private Key

When using:

  • Session Token Authentication Strategy: Must be blank.

  • KEY_PAIR: Must be the RSA private key used for authentication.

    The RSA key must be formatted according to PKCS8 standards and have standard PEM headers and footers. Note that either a Snowflake Private Key File or a Snowflake Private Key must be defined.

No

Snowflake Private Key File

When using:

  • Session token authentication strategy: The private key file must be blank.

  • KEY_PAIR: Upload the file that contains the RSA private key used for authentication to Snowflake, formatted according to PKCS8 standards and including standard PEM headers and footers. The header line begins with -----BEGIN PRIVATE. To upload the private key file, select the Reference asset checkbox.

No

Snowflake Private Key Password

When using:

  • Session Token Authentication Strategy: Must be blank.

  • KEY_PAIR: Provide the password associated with the Snowflake Private Key File.

No

Snowflake Role

When using:

  • Session Token Authentication Strategy: Use your Runtime Role. You can find your Runtime Role in the Openflow UI, by navigating to View Details for your Runtime.

  • KEY_PAIR Authentication Strategy: Use a valid role configured for your service user.

Yes

Snowflake Username

When using:

  • Session Token Authentication Strategy: Must be blank.

  • KEY_PAIR: Provide the user name used to connect to the Snowflake instance.

Yes

Snowflake Warehouse

Snowflake warehouse used to run queries.

Yes

SQLServer Ingestion Parameters context

Parameter

Description

Included Table Names

A comma-separated list of source table paths, including their databases and schemas, for example:

database_1.public.table_1, database_2.schema_2.table_2

Included Table Regex

A regular expression to match against table paths, including database and schema names. Every path matching the expression is replicated, and new tables matching the pattern that are created later are also included automatically, for example:

database_name\.public\.auto_.*

Filter JSON

A JSON containing a list of fully-qualified table names and a regex pattern for column names that should be included into replication.

The following example includes all columns that end with name in table1 from the public schema in the my_db database:

[ {"database":"my_db", "schema":"public", "table":"table1", "includedPattern":".*name"} ]

Merge Task Schedule CRON

CRON expression defining periods when merge operations from Journal to Destination Table will be triggered. Set it to * * * * * ? if you want to have continuous merge or time schedule to limit warehouse run time.

For example:

  • The string * 0 * * * ? indicates that you want to schedule merges at full hour for one minute

  • The string * 20 14 ? * MON-FRI indicates that you want to schedule merges at 2:20 PM every Monday through Friday.

For additional information and examples, see the cron triggers tutorial in the Quartz Documentation

Remove and re-add a table to replication

To remove a table from replication, remove it from the Included Table Names or Included Table Regex parameters in the Replication Parameters context.

To re-add the table to replication later, first delete the corresponding destination table in Snowflake. Afterward, add the table back to the Included Table Names or Included Table Regex parameters. This ensures that the replication process starts fresh for the table.

This approach can also be used to recover from a failed table replication scenario.

Replicate a subset of columns in a table

The connector filters the data replicated per table to a subset of configured columns.

To apply filters to columns, modify the Column Filter property in the Replication Parameters context, adding an array of configurations, one entry for every table to which you want to apply a filter.

Include or exclude columns by name or pattern. You can apply a single condition per table, or combine multiple conditions, with exclusions always taking precedence over inclusions.

The following example shows the fields that are available. The schema and table fields are mandatory. One or more of included, excluded, includedPattern, excludedPattern is required.

[
    {
        "schema": "<source table schema>",
        "table" : "<source table name>",
        "included": ["<column name>", "<column name>"],
        "excluded": ["<column name>", "<column name>"],
        "includedPattern": "<regular expression>",
        "excludedPattern": "<regular expression>",
    }
]
Copy

Track data changes in tables

The connector replicates the current state of data from the source tables, as well as every state of every row from every changeset. This data is stored in journal tables created in the same schema as the destination table.

The journal table names are formatted as: <source table name>_JOURNAL_<timestamp>_<schema generation> where <timestamp> is the value of epoch seconds when the source table was added to replication, and <schema generation> is an integer increasing with every schema change on the source table. As a result, source tables that undergo schema changes will have multiple journal tables.

When you remove a table from replication, then add it back, the <timestamp> value changes, and <schema generation> starts again from 1.

Important

Snowflake recommends not altering the structure of journal tables in any way. The connector uses them to update the destination table as part of the replication process.

The connector never drops journal tables, but uses the latest journal for every replicated source table, only reading append-only streams on top of journals. To reclaim the storage, you can:

  • Truncate all journal tables at any time.

  • Drop the journal tables related to source tables that were removed from replication.

  • Drop all but the latest generation journal tables for actively replicated tables.

For example, if your connector is set to actively replicate source table orders, and you have earlier removed table customers from replication, you may have the following journal tables. In this case you can drop all of them except orders_5678_2.

customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2

Configure scheduling of merge tasks

The connector uses a warehouse to merge change data capture (CDC) data into destination tables. This operation is triggered by the MergeSnowflakeJournalTable processor. If there are no new changes or if no new flow files are waiting in the MergeSnowflakeJournalTable queue, no merge is triggered and the warehouse auto-suspends.

Use the CRON expression in the Merge task Schedule CRON parameter to limit the warehouse cost and limit merges to only scheduled time. It throttles the flow files coming to the MergeSnowflakeJournalTable processor and merges are triggered only in a dedicated period of time. For more information about scheduling, see Scheduling strategy.

Run the flow

  1. Right-click on the plane and select Enable all Controller Services.

  2. Right-click on the imported process group and select Start. The connector starts the data ingestion.