Set up the Openflow Connector for MySQL

Note

The connector is subject to the Connector Terms.

This topic describes the steps to set up the Openflow Connector for MySQL.

Prerequisites

  1. Ensure that you have reviewed About Openflow Connector for MySQL.

  2. Ensure that you have a MySQL 8 or a later version to synchronize data with Snowflake.

  3. Ensure that you have set up Openflow.

  4. As a database administrator, perform the following tasks:

    1. Enable binary logs, then save and configure its format as follows:

      log_bin

      Set to on.

      This enables the binary log that records structural and data changes.

      binlog_format

      Set to row.

      The connector supports only row-based replication. MySQL 8.x versions may be the last ones to support this setting, and future versions will only support row-based replication.

      Not applicable in GCP Cloud SQL, where it is fixed at the right value.

      binlog_row_metadata

      Set to full.

      The connector requires all row metadata to operate, most importantly, column names and primary key information.

      binlog_row_image

      Set to full.

      The connector requires that all columns be written into the binary log.

      Not applicable in Amazon Aurora, where it is fixed at the right value.

      binlog_row_value_options

      Leave empty.

      This option ony affects JSON columns, where it can be set to include only the modified parts of JSON documents for UPDATE statements. The connector requires that full documents are written into the binary log.

      binlog_expire_logs_seconds

      Set to at least a few hours, or longer to ensure that the database agent can continue incremental replication after extended pauses or downtime. Snowflake recommends that you set the binary log expiration period (``binlog_expire_logs_seconds`) <https://dev.mysql.com/doc/refman/8.4/en/replication-options-binary-log.html#sysvar_binlog_expire_logs_seconds>`_ to at least a few hours to ensure stable working of the connector. After binary log expiration period ends, binary log files might be automatically removed. If the integration is paused for a long period, for example due to maintenance work, and the expired binary log files are deleted during this time, Openflow will not be able to replicate the data from these files.

      If you’re using scheduled replication, the value needs to be longer than the configured schedule.

      See the following code as a example:

      log_bin = on
      binlog_format = row
      binlog_row_metadata = full
      binlog_row_image = full
      binlog_row_value_options =
      
      Copy
    2. Connect via SSL. If you’re planning to use an SSL connection to MySQL, prepare the root certificate for your database server. It is required during configuration.

    3. Create a user for the connector. The connector requires a user with the REPLICATION_SLAVE and REPLICATION_CLIENT privileges for reading the binary logs. Grant these privileges:

      GRANT REPLICATION SLAVE ON *.* TO '<username>'@'%'
      GRANT REPLICATION CLIENT ON *.* TO '<username>'@'%'
      
      Copy
    4. Grant the SELECT privilege on every replicated table:

      GRANT SELECT ON <schema>.* TO '<username>'@'%'
      GRANT SELECT ON <schema>.<table> TO '<username>'@'%'
      
      Copy

      For more information on replication security, see Binary log.

  5. As a Snowflake account administrator, perform the following tasks:

    1. Create a Snowflake user with the type as SERVICE. Create a database to store the replicated data, and set up privileges for the Snowflake user to create objects in that database by granting the USAGE and CREATE SCHEMA privileges.

      CREATE DATABASE <destination_database>;
      CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow';
      CREATE ROLE <openflow_role>;
      GRANT ROLE <openflow_role> TO USER <openflow_user>;
      GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>;
      GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
      CREATE WAREHOUSE <openflow_warehouse>
           WITH
               WAREHOUSE_SIZE = 'MEDIUM'
               AUTO_SUSPEND = 300
               AUTO_RESUME = TRUE;
      GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
      
      Copy
    2. Create a pair of secure keys (public and private). Store the private key for the user in a file to supply to the connector’s configuration. Assign the public key to the Snowflake service user:

      ALTER USER openflow_user SET RSA_PUBLIC_KEY = 'thekey';
      
      Copy

      For more information, see pair of keys.

    3. Designate a warehouse for the connector to use. Start with the smallest warehouse size, then experiment with size depending on the amount of tables being replicated, and the amount of data transferred. Large table numbers typically scale better with multi-cluster warehouses, rather than the warehouse size.

Import the connector definition into Openflow

  1. Download the connector definition file.

  2. Import the connector definition into Openflow:

    1. Open the Snowflake Openflow canvas.

    2. Add a process group. To do this, drag and drop the Process Group icon from the tool palette at the top of the page onto the canvas. Once you release your pointer, a Create Process Group pop-up appears.

    3. On the Create Process Group pop-up, select the connector definition file to import.

Configure the connector

You can configure the connector for the following use cases:

Replicate a set of tables in real-time

  1. Right-click on the imported process group and select Parameters.

  2. Populate the required parameter values as described in Flow parameters.

Flow parameters

Start with setting the parameters of the Source Database Parameters context, then the Snowflake Parameters context. After this is done, you can enable the connector. The connector should connect to both PostgreSQL and Snowflake and start running . However, the connector does not replicate any data until any tables to be replicated are explicitly added to its configuration.

To configure specific tables for replication, edit the Replication Parameters context. After you apply the changes to the Replication Parameters context, the configuration is picked up by the connector, and the replication lifecycle starts for every table.

Source Database Parameters context

Parameter

Description

MySQL Connection URL

The full JDBC URL to the source database. The connector uses MariaDB driver, which is compatible with MySQL and requires the jdbc:mariadb prefix in the URL. If the SSL is disabled, then the connection URL should have the allowPublicKeyRetrieval parameter set to true.

Examples:

  • With SSL enabled: jdbc:mariadb://example.com:3306

  • With SSL disabled: jdbc:mariadb://example.com:3306?allowPublicKeyRetrieval=true

MariaDB JDBC Driver

The absolute path to the MariaDB JDBC driver jar. The connector uses the MariaDB driver, which is compatible with MySQL. Select the Reference asset checkbox to upload the MariaDB JDBC driver.

Example: /opt/resources/drivers/mariadb-java-client-3.5.2.jar

MySQL SSL Mode

Enable or disable SSL connections.

MySQL Root SSL Certificate

The full contents of the root certificate for the database. Optional if SSL is disabled.

MySQL Username

The username for the connector.

MySQL Password

The password for the connector.

Snowflake Parameters context

Parameter

Description

Snowflake Account

Name of the Snowflake account used to replicate into.

Snowflake Database

The name of the destination database used to replicate into. Any unquoted name will be automatically converted to uppercase, whereas enclosing the name in double quotes (“”) preserves its exact case.

Snowflake Private Key

The RSA private key used for authentication. The RSA key must be formatted according to PKCS8 standards and have standard PEM headers and footers. Note that either Snowflake Private Key File or Snowflake Private Key must be defined.

Snowflake Private Key File

The file that contains the RSA Private Key used for authentication to Snowflake, formatted according to PKCS8 standards and having standard PEM headers and footers. The header line starts with -----BEGIN PRIVATE. Select the Reference asset checkbox to upload the private key file.

Snowflake Private Key Password

The password associated with the Snowflake Private Key File.

Snowflake User Role

The Snowflake account role for the connector to use.

Snowflake Username

The name of the Snowflake user for the connector.

Snowflake Warehouse

The name of the warehouse used by the connector.

Replication Parameters context

Parameter

Description

Included Table Names

A comma-separated list of table paths, including their schemas. Example: public.my_table, other_schema.other_table

Included Table Regex

A regular expression to match against table paths. Every path matching the expression is replicated. Any new tables that match the pattern are included automatically. Example: public\.auto_.*

Remove and re-add a table to replication

To remove a table from replication, ensure that it is removed from the Included Table Names or Included Table Regex parameters in the Replication Parameters context.

If you want to re-add the table to replication later, first delete the corresponding destination table in Snowflake. Afterward, add the table back to the Included Table Names or Included Table Regex parameters. This ensures that the replication process starts fresh for the table.

This approach can also be used to recover from a failed table replication scenario.

Replicate a subset of columns in a table

The connector can filter the data replicated per table to a subset of configured columns.

To apply filters to columns, modify the Column Filter property in the Replication Parameters context, adding an array of configurations, one entry for every table to which you want to apply a filter.

Columns can be included or excluded by name or pattern. You can apply a single condition per table, or combine multiple conditions, with exclusions always taking precedence over inclusions.

The following example shows the fields that are available. The schema and table fields are mandatory. One or more of included, excluded, includedPattern, excludedPattern is required.

[
    {
        "schema": "<source table schema>",
        "table" : "<source table name>",
        "included": ["<column name>", "<column name>"],
        "excluded": ["<column name>", "<column name>"],
        "includedPattern": "<regular expression>",
        "excludedPattern": "<regular expression>",
    }
]
Copy

Track data changes in tables

The connector replicates not only the current state of data from the source tables, but also every state of every row from every changeset. This data is stored in journal tables created in the same schema as the destination table.

The journal table names are formatted as: <source table name>_JOURNAL_<timestamp>_<schema generation>

where <timestamp> is the value of epoch seconds when the source table was added to replication, and <schema generation> is an integer increasing with every schema change on the source table. This means a source table that undergoes schema changes will have multiple journal tables.

When a table is removed from replication, then added back, the <timestamp> value will change, and <schema generation> will start again from 1.

Important

Snowflake recommends that you do not alter the journal tables or the data in them, in any way. They are used by the connector to update the destination table as part of the replication process.

The connector never drops journal tables, but it only makes active use of the latest journal for every replicated source table. If you wish to reclaim the storage, you can safely drop the journal tables related to source tables that were removed from replication, and all but the latest generation ones for actively replicated tables.

For example, if your connector is set to actively replicate source table orders, and you have earlier removed table customers from replication, you may have the following journal tables. In this case you can drop all of them except orders_5678_2.

customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2

Configure scheduling of merge tasks

The connector uses a warehouse to merge change data capture (CDC) data into destination tables. This operation is triggered by the MergeSnowflakeJournalTable processor. If there are no new changes or if no new flow files are waiting in the MergeSnowflakeJournalTable queue, no merge is triggered and the warehouse auto-suspends.

To limit the warehouse cost and limit merges to only scheduled time, use the CRON expression in the Merge task Schedule CRON parameter. It throttles the flow files coming to the MergeSnowflakeJournalTable processor and merges are triggered only in a dedicated period of time. For more information about scheduling, see Scheduling strategy.

Run the flow

  1. Right-click on the plane and select Enable all Controller Services.

  2. Right-click on the imported process group and select Start. The connector starts the data ingestion.