Paramétrez Openflow Connector for SQL Server

Note

This connector is subject to the Snowflake Connector Terms.

This topic describes how to set up the Openflow Connector for SQL Server.

Pour plus d’informations sur le processus de chargement incrémentiel, consultez Réplication incrémentielle.

Conditions préalables

Avant de configurer le connecteur, assurez-vous d’avoir rempli les conditions préalables suivantes :

  1. Assurez-vous d’avoir consulté À propos de Openflow Connector for SQL Server.

  2. Assurez-vous d’avoir consulté Versions du serveur SQL prises en charge.

  3. Assurez-vous d’avoir configuré votre déploiement d’exécution. Pour plus d’informations, consultez les rubriques suivantes :

  4. If you use Openflow - Snowflake Deployments, ensure that you have reviewed configuring required domains and have granted access to the required domains for the SQL Server connector.

Set up your SQL Server instance

Avant de configurer le connecteur, effectuez les tâches suivantes dans votre environnement SQL Server :

Note

Vous devez effectuer ces tâches en tant qu’administrateur de base de données.

  1. Enable change tracking on the databases and tables that you plan to replicate, as shown in the following SQL Server example:

    ALTER DATABASE <database>
      SET CHANGE_TRACKING = ON
      (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON);
    
    ALTER TABLE <schema>.<table>
      ENABLE CHANGE_TRACKING
      WITH (TRACK_COLUMNS_UPDATED = ON);
    
    Copy

    Note

    Exécutez ces commandes pour chaque base de données et chaque table que vous prévoyez de répliquer.

    Le connecteur exige que le suivi des modifications soit activé sur les bases de données et les tables avant le début de la réplication. Assurez-vous que le suivi des modifications est activé pour chaque table que vous prévoyez de répliquer. Vous pouvez également activer le suivi des modifications pour d’autres tables lorsque le connecteur est en cours d’exécution.

  2. Créez une connexion pour l’instance SQL Server :

    CREATE LOGIN <user_name> WITH PASSWORD = '<password>';
    
    Copy

    Cette connexion est utilisée pour créer des utilisateurs pour les bases de données que vous prévoyez de répliquer.

  3. Créez un utilisateur pour chaque base de données que vous répliquez en exécutant la commande SQL Server suivante dans chaque base de données :

    USE <source_database>;
    CREATE USER <user_name> FOR LOGIN <user_name>;
    
    Copy
  4. Accordez les autorisations SELECT et VIEW CHANGE TRACKING à l’utilisateur pour chaque base de données que vous répliquez :

    GRANT SELECT ON <database>.<schema>.<table> TO <user_name>;
    GRANT VIEW CHANGE TRACKING ON <database>.<schema>.<table> TO <user_name>;
    
    Copy

    Exécutez ces commandes dans chaque base de données pour chaque table que vous prévoyez de répliquer. Ces autorisations doivent être accordées à l’utilisateur de chaque base de données que vous avez créée précédemment.

  5. (Optional) Configure SSL connection.

    If you use an SSL connection to connect SQL Server, create the root certificate for your database server. This is required when configuring the connector.

Configurer votre environnement Snowflake

As a Snowflake administrator, perform the following tasks:

  1. Créez une base de données de destination dans Snowflake pour stocker les données répliquées :

    CREATE DATABASE <destination_database>;
    
    Copy
  2. Créez un utilisateur de service Snowflake :

    CREATE USER <openflow_user>
      TYPE = SERVICE
      COMMENT='Service user for automated access of Openflow';
    
    Copy
  3. Créez un rôle Snowflake pour le connecteur et accordez-lui les privilèges requis :

    CREATE ROLE <openflow_role>;
    GRANT ROLE <openflow_role> TO USER <openflow_user>;
    GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>;
    GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
    
    Copy

    Utilisez ce rôle pour gérer l’accès du connecteur à la base de données Snowflake.

    Pour créer des objets dans la base de données de destination, vous devez accorder les privilèges USAGE et CREATE SCHEMA sur la base de données au rôle utilisé pour gérer l’accès.

  4. Créez un entrepôt Snowflake pour le connecteur et accordez-lui les privilèges requis :

    CREATE WAREHOUSE <openflow_warehouse> WITH
      WAREHOUSE_SIZE = 'MEDIUM'
      AUTO_SUSPEND = 300
      AUTO_RESUME = TRUE;
    GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
    
    Copy

    Snowflake recommends starting with a MEDIUM warehouse size, then experimenting with size depending on the number of tables being replicated and the amount of data transferred. Large numbers of tables typically scale better with multi-cluster warehouses, rather than a larger warehouse size. For more information, see multi-cluster warehouses.

  5. Set up the public and private keys for key pair authentication:

    1. Créez une paire de clés sécurisées (publique et privée).

    2. Store the private key for the user in a file to supply to the connector’s configuration.

    3. Attribuez la clé publique à l’utilisateur du service Snowflake :

      ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
      
      Copy

      Pour plus d’informations, voir Authentification par paire de clés et rotation de paires de clés.

Configuration du connecteur

As a data engineer, install and configure the connector using the following sections.

Installer le connecteur

  1. Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.

  2. Sur la page des connecteurs Openflow, trouvez le connecteur et sélectionnez Add to runtime.

  3. In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.

    Note

    Avant d’installer le connecteur, assurez-vous que vous avez créé une base de données et un schéma dans Snowflake pour que le connecteur puisse stocker les données ingérées.

  4. Authentifiez-vous au déploiement avec les identifiants de votre compte Snowflake et sélectionnez Allow lorsque vous êtes invité à autoriser l’application d’exécution à accéder à votre compte Snowflake. Le processus d’installation du connecteur prend quelques minutes.

  5. Authentifiez-vous auprès de l’environnement d’exécution avec les identifiants de votre compte Snowflake.

Le canevas Openflow apparaît avec le groupe de processus du connecteur ajouté.

Configuration du connecteur

To configure the connector, perform the following steps:

  1. Cliquez avec le bouton droit de la souris sur le groupe de processus importé et sélectionnez Parameters.

  2. Populate the required parameter values as described in Paramètres de débit.

Paramètres de débit

Start by setting the parameters of the SQLServer Source Parameters context, then the SQLServer Destination Parameters context. After you complete this, enable the connector. The connector connects to both SQLServer and Snowflake and starts running. However, the connector does not replicate any data until any tables to be replicated are explicitly added to its configuration.

Pour configurer des tables spécifiques pour la réplication, modifiez le contexte des paramètres d’ingestion SQLServer. Une fois que vous avez appliqué les modifications au contexte des paramètres d’ingestion SQLServer, la configuration est reprise par le connecteur et le cycle de vie de la réplication démarre pour chaque table.

Contexte des paramètres source SQLServer

Paramètre

Description

URL de connexion au serveur SQL

L’adresse complète URL JDBC de la base de données source.

Exemple :

  • jdbc:sqlserver://example.com:1433;encrypt=false;

Pilote JDBC du serveur SQL

Select the Reference asset checkbox to upload the SQL Server JDBC driver.

Nom d’utilisateur du serveur SQL

The user name for the connector.

Mot de passe du serveur SQL

Le mot de passe du connecteur.

Contexte des paramètres de destination SQLServer

Paramètre

Description

Obligatoire

Base de données de destination

The database where data is persisted. It must already exist in Snowflake. The name is case-sensitive. For unquoted identifiers, provide the name in uppercase.

Oui

Stratégie d’authentification Snowflake

Lorsque vous utilisez :

  • Déploiement Snowflake Openflow ou BYOC : Utilisez SNOWFLAKE_SESSION_TOKEN. Ce jeton est géré automatiquement par Snowflake. Les déploiements BYOC doivent disposer de rôles d’exécution configurés au préalable pour utiliser SNOWFLAKE_SESSION_TOKEN.

  • BYOC: Alternatively BYOC can use KEY_PAIR as the value for authentication strategy.

Oui

Identificateur de compte Snowflake

Lorsque vous utilisez :

  • Stratégie d’authentification par jeton de session : doit être vide.

  • KEY_PAIR: Snowflake account name formatted as [organization-name]-[account-name] where data is persisted.

Oui

Snowflake Connection Strategy

Lorsque vous utilisez KEY_PAIR, spécifiez la stratégie de connexion à Snowflake :

  • STANDARD (par défaut) : Connectez-vous à l’aide du routage public standard aux services Snowflake.

  • PRIVATE_CONNECTIVITY : Connectez-vous en utilisant des adresses privées associées à la plateforme Cloud prise en charge, comme AWS PrivateLink.

Requis pour BYOC avec KEY_PAIR uniquement ; ignoré dans les autres cas.

Snowflake Object Identifier Resolution

Spécifie la manière dont les identificateurs d’objets sources tels que les schémas, les tables et les noms de colonnes sont stockés et interrogés dans Snowflake. Ce paramètre détermine si vous devrez utiliser des guillemets doubles dans les requêtes SQL.

Option 1 : Par défaut, insensible à la casse (recommandé).

  • Transformation : Tous les identificateurs sont convertis en majuscules. Par exemple, My_Table devient MY_TABLE.

  • Requêtes : les requêtes SQL ne sont pas sensibles à la casse et ne nécessitent pas de guillemets doubles SQL.

    Par exemple SELECT * FROM my_table; renvoie les mêmes résultats que SELECT * FROM MY_TABLE;.

Note

Snowflake recommande d’utiliser cette option si les objets de la base de données ne sont pas censés avoir des noms avec une casse mixte.

Important

Ne modifiez pas ce paramètre une fois que l’ingestion du connecteur a commencé. La modification de ce paramètre après le début de l‘ingestion interrompt l’ingestion existante. Si vous devez modifier ce paramètre, créez une nouvelle instance de connecteur.

Option 2 : sensible à la casse.

  • Transformation : La casse est préservée. Par exemple, My_Table reste My_Table.

  • Requêtes : les requêtes SQL doivent utiliser des guillemets doubles pour respecter la casse exacte des objets de base de données. Par exemple, SELECT * FROM "My_Table";.

Note

Snowflake recommande d’utiliser cette option si vous devez préserver la casse source pour des raisons héritées ou de compatibilité. Par exemple, si la base de données source comprend des noms de tables qui ne diffèrent que par la casse, comme MY_TABLE et my_table, il en résultera un conflit de noms lors de l’utilisation de comparaisons non sensibles à la casse.

Oui

Clé privée de Snowflake

Lorsque vous utilisez :

  • Stratégie d’authentification par jeton de session : doit être vide.

  • KEY_PAIR : Doit correspondre à la clé privée RSA utilisée pour l’authentification.

    The RSA key must be formatted according to PKCS8 standards and have standard PEM headers and footers. Note that either a Snowflake Private Key File or a Snowflake Private Key must be defined.

Non

Fichier de clé privée de Snowflake

Lorsque vous utilisez :

  • Session token authentication strategy: The private key file must be blank.

  • KEY_PAIR : Chargez le fichier qui contient la clé privée RSA utilisée pour l’authentification auprès de Snowflake, formatée conformément aux normes PKCS8 et possédant des en-têtes et des pieds de page PEM standards. La ligne d’en-tête commence par -----BEGIN PRIVATE. Pour charger le fichier de la clé privée, cochez la case Reference asset.

Non

Mot de passe de la clé privée de Snowflake

Lorsque vous utilisez :

  • Stratégie d’authentification par jeton de session : doit être vide.

  • KEY_PAIR : fournissez le mot de passe associé au fichier de la clé privée Snowflake.

Non

Rôle Snowflake

Lorsque vous utilisez :

  • Stratégie d’authentification par jeton de session : Utilisez votre rôle d’exécution. Vous pouvez trouver votre rôle d’exécution dans l’UI d’Openflow, en naviguant jusqu’à View Details pour votre exécution.

  • Stratégie d’authentification KEY_PAIR : Utilisez un rôle valide configuré pour votre utilisateur de service.

Oui

Nom d’utilisateur Snowflake

Lorsque vous utilisez :

  • Stratégie d’authentification par jeton de session : doit être vide.

  • KEY_PAIR : indiquez le nom d’utilisateur utilisé pour vous connecter à l’instance Snowflake.

Oui

Entrepôt Snowflake

Entrepôt Snowflake utilisé pour exécuter des requêtes.

Oui

Contexte des paramètres d’ingestion SQLServer

Paramètre

Description

Noms des tables incluses

A comma-separated list of source table paths, including their databases and schemas, for example:

database_1.public.table_1, database_2.schema_2.table_2

Table incluse Regex

A regular expression to match against table paths, including database and schema names. Every path matching the expression is replicated, and new tables matching the pattern that are created later are also included automatically, for example:

database_name\.public\.auto_.*

Filtre JSON

A JSON containing a list of fully-qualified table names and a regex pattern for column names that should be included into replication.

L’exemple suivant inclut toutes les colonnes qui se terminent par name dans la table1 du schéma public dans la base de données my_db :

[ {"database":"my_db", "schema":"public", "table":"table1", "includedPattern":".*name"} ]

Fusionner la planification des tâches CRON

Expression CRON définissant les périodes au cours desquelles les opérations de fusion du journal vers la table de destination seront déclenchées. Paramétrez cet élément sur * * * * * ? si vous souhaitez une fusion continue ou une planification pour limiter la durée d’exécution de l’entrepôt.

Par exemple :

  • La chaîne * 0 * * * ? indique que vous souhaitez planifier des fusions à l’heure pleine pendant une minute

  • La chaîne * 20 14 ? * MON-FRI indique que vous souhaitez planifier les fusions à 14h20 tous les lundis et vendredis.

Pour plus d’informations et d’exemples, consultez le tutoriel sur les déclencheurs cron dans la documentation de Quartz

Supprimez et ajoutez à nouveau une table à la réplication

To remove a table from replication, remove it from the Included Table Names or Included Table Regex parameters in the Replication Parameters context.

To re-add the table to replication later, first delete the corresponding destination table in Snowflake. Afterward, add the table back to the Included Table Names or Included Table Regex parameters. This ensures that the replication process starts fresh for the table.

Cette approche peut également être utilisée pour récupérer un scénario de réplication de table qui a échoué.

Répliquer un sous-ensemble de colonnes dans une table

The connector filters the data replicated per table to a subset of configured columns.

Pour appliquer des filtres aux colonnes, modifiez la propriété Filtre de colonne dans le contexte Paramètres de réplication, en ajoutant un tableau de configurations, une entrée pour chaque table à laquelle vous souhaitez appliquer un filtre.

Include or exclude columns by name or pattern. You can apply a single condition per table, or combine multiple conditions, with exclusions always taking precedence over inclusions.

L’exemple suivant montre les champs disponibles. Les champs schema et table sont obligatoires. Une ou plusieurs des adresses suivantes : included, excluded, includedPattern, excludedPattern sont exigées.

[
    {
        "schema": "<source table schema>",
        "table" : "<source table name>",
        "included": ["<column name>", "<column name>"],
        "excluded": ["<column name>", "<column name>"],
        "includedPattern": "<regular expression>",
        "excludedPattern": "<regular expression>",
    }
]
Copy

Suivre les changements de données dans les tables

The connector replicates the current state of data from the source tables, as well as every state of every row from every changeset. This data is stored in journal tables created in the same schema as the destination table.

Les noms des tables de journal sont formatés comme suit : <source table name>_JOURNAL_<timestamp>_<schema generation> where <timestamp> is the value of epoch seconds when the source table was added to replication, and <schema generation> est un nombre entier qui augmente à chaque modification du schéma sur la table source. Par conséquent, les tables sources qui subissent des modifications de schéma auront plusieurs tables de journal.

When you remove a table from replication, then add it back, the <timestamp> value changes, and <schema generation> starts again from 1.

Important

Snowflake recommends not altering the structure of journal tables in any way. The connector uses them to update the destination table as part of the replication process.

The connector never drops journal tables, but uses the latest journal for every replicated source table, only reading append-only streams on top of journals. To reclaim the storage, you can:

  • Tronquer toutes les tables de journal à tout moment.

  • Supprimer les tables de journal liées aux tables sources qui ont été retirées de la réplication.

  • Supprimer toutes les tables de journal de la dernière génération, sauf les tables activement répliquées.

Par exemple, si votre connecteur est paramétré pour répliquer activement la table source orders, et que vous avez précédemment supprimé la table customers de la réplication, vous pouvez avoir les tables de journal suivantes. Dans ce cas, vous pouvez toutes les supprimer, à l’exception de orders_5678_2.

customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2

Configuration de la planification des tâches de fusion

Le connecteur utilise un entrepôt pour fusionner les données de capture des données de changement (CDC) dans les tables de destination. Cette opération est déclenchée par le processeur MergeSnowflakeJournalTable. S’il n’y a pas de nouvelles modifications ou si aucun nouveau fichier de flux n’est en attente dans la file d’attente MergeSnowflakeJournalTable, aucune fusion n’est déclenchée et l’entrepôt se suspend automatiquement.

Use the CRON expression in the Merge task Schedule CRON parameter to limit the warehouse cost and limit merges to only scheduled time. It throttles the flow files coming to the MergeSnowflakeJournalTable processor and merges are triggered only in a dedicated period of time. For more information about scheduling, see Scheduling strategy.

Exécutez le flux

  1. Cliquez avec le bouton droit de la souris sur l’avion et sélectionnez Enable all Controller Services.

  2. Cliquez avec le bouton droit de la souris sur le groupe de processus importé et sélectionnez Start. Le connecteur démarre l’ingestion des données.