Paramétrez Openflow Connector for MySQL

Note

Le connecteur est soumis aux conditions d’utilisation du connecteur.

Cette rubrique décrit les étapes pour paramétrer Openflow Connector for MySQL.

Conditions préalables

  1. Assurez-vous d’avoir consulté À propos de Openflow Connector for MySQL.

  2. Assurez-vous que vous disposez de MySQL 8 ou d’une version ultérieure pour synchroniser les données avec Snowflake.

  3. Assurez-vous que vous avez paramétré Openflow.

  4. En tant qu’administrateur de la base de données, effectuez les tâches suivantes :

    1. Activez les logs binaires, puis enregistrez et configurez leur format comme suit :

      log_bin

      Définissez sur : on.

      Cela permet d’activer le journal binaire qui enregistre les modifications structurelles et les modifications apportées aux données.

      binlog_format

      Définissez sur : row.

      Le connecteur ne prend en charge que la réplication basée sur les lignes. Il se peut que les versions 8.x de MySQL soient les dernières à prendre en charge ce paramètre, et les versions futures ne prendront en charge que la réplication basée sur les lignes.

      Ne s’applique pas dans GCP Cloud SQL, où il est défini sur la bonne valeur.

      binlog_row_metadata

      Définissez sur : full.

      Le connecteur a besoin de toutes les métadonnées des lignes pour fonctionner, en particulier des noms des colonnes et des informations sur les clés primaires.

      binlog_row_image

      Définissez sur : full.

      Le connecteur nécessite que toutes les colonnes soient écrites dans le journal binaire.

      Ne s’applique pas dans Amazon Aurora, où cela est défini sur la bonne valeur.

      binlog_row_value_options

      Laissez vide.

      Cette option n’affecte que les colonnes JSON, où elle peut être définie de sorte à n’inclure que les parties modifiées des documents JSON pour les instructions UPDATE. Le connecteur nécessite que les documents complets soient écrits dans le journal binaire.

      binlog_expire_logs_seconds

      Définissez cette option au minimum sur quelques heures ou sur une durée supérieure pour garantir que l’agent de base de données puisse poursuivre la réplication incrémentielle après des pauses prolongées ou des temps d’arrêt. Snowflake vous recommande de fixer la période d’expiration des logs binaires (binlog_expire_logs_seconds) à au moins quelques heures afin de garantir un fonctionnement stable du connecteur. À l’issue de la période d’expiration du journal binaire, les fichiers du journal binaire peuvent être automatiquement supprimés. Si l’intégration est interrompue pendant une longue période, par exemple en raison de travaux de maintenance, et que les journaux binaires expirés sont supprimés pendant cette période, Openflow ne sera pas en mesure de répliquer les données de ces fichiers.

      Si vous utilisez la réplication planifiée, la valeur doit être supérieure à la planification configurée.

      Voyez le code suivant à titre d’exemple :

      log_bin = on
      binlog_format = row
      binlog_row_metadata = full
      binlog_row_image = full
      binlog_row_value_options =
      
      Copy
    2. Augmentez la valeur de sort_buffer_size.

      sort_buffer_size = 4194304
      
      Copy

      sort_buffer_size définit la quantité de mémoire (en octets) allouée par fil de requête pour les opérations de tri en mémoire, telles que ORDER BY. Si la valeur est trop petite, le connecteur peut échouer avec le message d’erreur suivant :

      Out of sort memory, consider increasing server sort buffer size. Cela indique que sort_buffer_size devrait être augmenté.

    3. Si vous utilisez les bases de données Amazon RDS, augmentez la période de conservation pertinente pour binlog_expire_logs_seconds en utilisant rds_set_configuration. Par exemple, si vous souhaitez stocker le binlog pendant 24 heures, appelez mysql.rds_set_configuration('binlog retention hours', 24).

    4. Connexion via SSL. Si vous planifiez d’utiliser une connexion SSL vers MySQL, préparez le certificat racine de votre serveur de base de données. Elle est exigée lors de la configuration.

    5. Créez un utilisateur pour le connecteur. Le connecteur nécessite un utilisateur disposant des privilèges REPLICATION_SLAVE et REPLICATION_CLIENT pour la lecture des logs binaires. Accordez ces privilèges :

      GRANT REPLICATION SLAVE ON *.* TO '<username>'@'%'
      GRANT REPLICATION CLIENT ON *.* TO '<username>'@'%'
      
      Copy
    6. Accordez le privilège SELECT sur chaque table répliquée :

      GRANT SELECT ON <schema>.* TO '<username>'@'%'
      GRANT SELECT ON <schema>.<table> TO '<username>'@'%'
      
      Copy

      Pour plus d’informations sur la sécurité de la réplication, voir journal binaire.

  5. En tant qu’administrateur de compte Snowflake, effectuez les tâches suivantes :

    1. Créez un utilisateur Snowflake avec le type SERVICE. Créez une base de données pour stocker les données répliquées et définissez les privilèges permettant à l’utilisateur de Snowflake de créer des objets dans cette base de données en lui accordant les privilèges USAGE et CREATE SCHEMA.

      CREATE DATABASE <destination_database>;
      CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow';
      CREATE ROLE <openflow_role>;
      GRANT ROLE <openflow_role> TO USER <openflow_user>;
      GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>;
      GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
      CREATE WAREHOUSE <openflow_warehouse>
           WITH
               WAREHOUSE_SIZE = 'MEDIUM'
               AUTO_SUSPEND = 300
               AUTO_RESUME = TRUE;
      GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
      
      Copy
    2. Créez une paire de clés sécurisées (publique et privée). Stockez la clé privée de l’utilisateur dans un fichier à fournir à la configuration du connecteur. Attribuez la clé publique à l’utilisateur du service Snowflake :

      ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
      
      Copy

      Pour plus d’informations, voir paire de clés.

    3. Désignez un entrepôt à utiliser par le connecteur. Commencez par la taille d’entrepôt MEDIUM, puis faites des essais en fonction du nombre de tables répliquées et de la quantité de données transférées. Les tables de grande taille s’adaptent généralement mieux aux entrepôts multi-clusters, plutôt qu’à la taille de l’entrepôt.

Définir le connecteur

En tant qu’ingénieur des données, effectuez les tâches suivantes pour installer et configurer le connecteur :

Installer le connecteur

  1. Naviguez jusqu’à la page d’aperçu d’Openflow. Dans la section Featured connectors, sélectionnez View more connectors.

  2. Sur la page des connecteurs Openflow, trouvez le connecteur et sélectionnez Add to runtime.

  3. Dans la boîte de dialogue Select runtime, sélectionnez votre environnement d’exécution dans la liste déroulante Available runtimes.

  4. Sélectionnez Add.

    Note

    Avant d’installer le connecteur, assurez-vous que vous avez créé une base de données et un schéma dans Snowflake pour que le connecteur puisse stocker les données ingérées.

  5. Authentifiez-vous au déploiement avec les identifiants de votre compte Snowflake et sélectionnez Allow lorsque vous êtes invité à autoriser l’application d’exécution à accéder à votre compte Snowflake. Le processus d’installation du connecteur prend quelques minutes.

  6. Authentifiez-vous auprès de l’environnement d’exécution avec les identifiants de votre compte Snowflake.

Le canevas Openflow apparaît avec le groupe de processus du connecteur ajouté.

Configuration du connecteur

Vous pouvez configurer le connecteur pour les cas d’utilisation suivants :

Répliquer un ensemble de tables en temps réel

  1. Cliquez avec le bouton droit de la souris sur le groupe de processus importé et sélectionnez Parameters.

  2. Renseignez les valeurs des paramètres requis comme décrit dans Paramètres de débit.

Paramètres de débit

Commencez par définir les paramètres du contexte des paramètres source MySQL, puis du contexte des paramètres de destination MySQL. Une fois cette opération effectuée, vous pouvez activer le connecteur. Le connecteur devrait se connecter à la fois à MySQL et à Snowflake et commencer à fonctionner. Toutefois, le connecteur ne réplique aucune donnée tant que les tables à répliquer n’ont pas été explicitement ajoutées à sa configuration.

Pour configurer des tables spécifiques pour la réplication, modifiez le contexte des paramètres d’ingestion MySQL. Une fois que vous avez appliqué les modifications au contexte Paramètres de réplication, la configuration est reprise par le connecteur et le cycle de vie de la réplication démarre pour chaque table.

Contexte des paramètres source MySQL

Paramètre

Description

MySQL Connexion URL

L’adresse complète URL JDBC de la base de données source. Le connecteur utilise le pilote MariaDB, qui est compatible avec MySQL et nécessite le préfixe jdbc:mariadb dans l’URL. Si le SSL est désactivé, le paramètre allowPublicKeyRetrieval de l’URL de connexion doit être réglée sur true.

Exemples :

  • SSL activé : jdbc:mariadb://example.com:3306

  • SSL désactivé : jdbc:mariadb://example.com:3306?allowPublicKeyRetrieval=true

Pilote MySQL JDBC

Le chemin absolu vers le fichier jar du pilote MariaDB JDBC. Le connecteur utilise le pilote MariaDB, qui est compatible avec MySQL. Cochez la case Reference asset pour télécharger le pilote MariaDB JDBC.

Exemple : /opt/resources/drivers/mariadb-java-client-3.5.2.jar

Nom d’utilisateur MySQL

Le nom d’utilisateur du connecteur.

Mot de passe MySQL

Le mot de passe du connecteur.

Contexte des paramètres de destination MySQL

Paramètre

Description

Base de données de destination

La base de données dans laquelle les données seront conservées. Doit déjà exister dans Snowflake.

Identificateur de compte Snowflake

Nom du compte Snowflake formaté comme suit : [[nom de l’organisation] -[[nom du compte] où les données seront conservées

Stratégie d’authentification Snowflake

Stratégie d’authentification auprès de Snowflake. Valeurs possibles : SNOWFLAKE_SESSION_TOKEN, lorsque vous exécutez le flux sur SPCS et KEY_PAIR lorsque vous souhaitez établir l’accès à l’aide d’une clé privée.

Clé privée de Snowflake

La clé privée RSA utilisée pour l’authentification. La clé RSA doit être formatée selon les normes PKCS8 et comporter les en-têtes et pieds de page standard PEM. Notez que vous devez définir soit le fichier de clé privée de Snowflake, soit la clé privée de Snowflake.

Fichier de clé privée de Snowflake

Le fichier qui contient la clé privée RSA utilisée pour l’authentification à Snowflake, formaté selon les normes PKCS8 et comportant les en-têtes et pieds de page standard PEM. La ligne d’en-tête commence par -----BEGIN PRIVATE. Cochez la case Reference asset pour télécharger le fichier de la clé privée.

Mot de passe de la clé privée de Snowflake

Le mot de passe associé au fichier de la clé privée de Snowflake

Rôle Snowflake

Rôle Snowflake utilisé lors de l’exécution de la requête

Nom d’utilisateur Snowflake

Nom d’utilisateur utilisé pour se connecter à l’instance de Snowflake

Entrepôt Snowflake

L’entrepôt de Snowflake est utilisé pour exécuter des requêtes

Contexte des paramètres d’ingestion MySQL

Paramètre

Description

Noms des tables incluses

Une liste de chemins de tables séparés par des virgules, y compris leurs schémas. Exemple : public.my_table, other_schema.other_table

Table incluse Regex

Une expression régulière à associer aux chemins de la table. Chaque chemin correspondant à l’expression sera répliqué, et les nouvelles tables correspondant au modèle qui seront créées ultérieurement seront également incluses automatiquement. Exemple : public\.auto_.*

Filtre JSON

Une adresse JSON contenant une liste de noms de tables entièrement qualifiés et un modèle de regex pour les noms de colonnes à inclure dans la réplication. Exemple : [ {"schema":"public", "table":"table1", "includedPattern":".*name"} ] inclura toutes les colonnes qui se terminent par name dans table1 du schéma public.

Fusionner la planification des tâches CRON

Expression CRON définissant les périodes au cours desquelles les opérations de fusion du journal vers la table de destination seront déclenchées. Paramétrez cet élément sur * * * * * ? si vous souhaitez une fusion continue ou une planification pour limiter la durée d’exécution de l’entrepôt. Par exemple, la chaîne * 0 * * * ? indique que vous souhaitez planifier des fusions à l’heure pleine pendant une minute. La chaîne * 20 14 ? * MON-FRI indique que vous souhaitez planifier les fusions à 14h20 tous les lundis et vendredis. Pour plus d’informations et d’exemples, consultez le CronTrigger tutoriel.

Supprimez et ajoutez à nouveau une table à la réplication

Pour supprimer une table de la réplication, assurez-vous qu’elle est supprimée des paramètres Noms des tables incluses ou Table incluse Regex dans le contexte des paramètres de réplication.

Si vous souhaitez réajuster la table à la réplication ultérieurement, supprimez d’abord la table de destination correspondante dans Snowflake. Ensuite, ajoutez à nouveau la table aux paramètres Noms des tables incluses ou Table incluse Regex. Cela permet de garantir que le processus de réplication démarre à nouveau pour la table.

Cette approche peut également être utilisée pour récupérer un scénario de réplication de table qui a échoué.

Répliquer un sous-ensemble de colonnes dans une table

Le connecteur peut filtrer les données répliquées par table sur un sous-ensemble de colonnes configurées.

Pour appliquer des filtres aux colonnes, modifiez la propriété Filtre de colonne dans le contexte Paramètres de réplication, en ajoutant un tableau de configurations, une entrée pour chaque table à laquelle vous souhaitez appliquer un filtre.

Les colonnes peuvent être incluses ou exclues par nom ou par modèle. Vous pouvez appliquer une seule condition par table ou combiner plusieurs conditions, les exclusions ayant toujours la priorité sur les inclusions.

L’exemple suivant montre les champs disponibles. Les champs schema et table sont obligatoires. Une ou plusieurs des adresses suivantes : included, excluded, includedPattern, excludedPattern sont exigées.

[
    {
        "schema": "<source table schema>",
        "table" : "<source table name>",
        "included": ["<column name>", "<column name>"],
        "excluded": ["<column name>", "<column name>"],
        "includedPattern": "<regular expression>",
        "excludedPattern": "<regular expression>",
    }
]
Copy

Suivre les changements de données dans les tables

Le connecteur réplique non seulement l’état actuel des données des tables sources, mais aussi l’état de chaque ligne de chaque jeu de modifications. Ces données sont stockées dans des tables de journal créées dans le même schéma que la table de destination.

Les noms des tables du journal sont formatés comme suit : <nom de la table source>_JOURNAL_<horodatage>_<génération du schéma>

<horodatage> est la valeur en secondes de l’époque à laquelle la table source a été ajoutée à la réplication, et <génération de schéma> est un nombre entier augmentant à chaque changement de schéma sur la table source. Cela signifie qu’une table source qui subit des modifications de schéma aura plusieurs tables de journal.

Lorsqu’une table est retirée de la réplication, puis réintroduite, la valeur de l’horodatage <> change et la génération du schéma <> reprend à partir de 1.

Important

Snowflake vous recommande de ne pas modifier les tables du journal ou les données qu’elles contiennent, de quelque manière que ce soit. Elles sont utilisées par le connecteur pour mettre à jour la table de destination dans le cadre du processus de réplication.

Le connecteur n’abandonne jamais les tables de logs, mais il n’utilise activement que le dernier journal pour chaque table source répliquée. Si vous souhaitez récupérer le stockage, vous pouvez en toute sécurité supprimer les tables de journal liées aux tables sources qui ont été retirées de la réplication, ainsi que toutes les tables activement répliquées, à l’exception de celles de la dernière génération.

Par exemple, si votre connecteur est paramétré pour répliquer activement la table source orders, et que vous avez précédemment supprimé la table customers de la réplication, vous pouvez avoir les tables de journal suivantes. Dans ce cas, vous pouvez toutes les supprimer, à l’exception de orders_5678_2.

customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2

Configuration de la planification des tâches de fusion

Le connecteur utilise un entrepôt pour fusionner les données de capture des données de changement (CDC) dans les tables de destination. Cette opération est déclenchée par le processeur MergeSnowflakeJournalTable. S’il n’y a pas de nouvelles modifications ou si aucun nouveau fichier de flux n’est en attente dans la file d’attente MergeSnowflakeJournalTable, aucune fusion n’est déclenchée et l’entrepôt se suspend automatiquement.

Pour limiter le coût de l’entrepôt et restreindre les fusions aux seules heures planifiées, utilisez l’expression CRON dans le paramètre de planification CRON de la tâche de fusion. Elle limite les fichiers de flux arrivant au processeur MergeSnowflakeJournalTable et les fusions ne sont déclenchées qu’au cours d’une période donnée. Pour plus d’informations sur la planification, voir Stratégie de planification.

Exécutez le flux

  1. Cliquez avec le bouton droit de la souris sur l’avion et sélectionnez Enable all Controller Services.

  2. Cliquez avec le bouton droit de la souris sur le groupe de processus importé et sélectionnez Start. Le connecteur démarre l’ingestion des données.