Paramétrez Openflow Connector for MySQL¶
Note
Le connecteur est soumis aux conditions d’utilisation du connecteur.
Cette rubrique décrit les étapes pour paramétrer Openflow Connector for MySQL.
Conditions préalables¶
Assurez-vous d’avoir consulté À propos de Openflow Connector for MySQL.
Assurez-vous que vous disposez de MySQL 8 ou d’une version ultérieure pour synchroniser les données avec Snowflake.
Assurez-vous que vous avez paramétré Openflow.
En tant qu’administrateur de la base de données, effectuez les tâches suivantes :
Activez les logs binaires, puis enregistrez et configurez leur format comme suit :
log_bin
Définissez sur :
on
.Cela permet d’activer le journal binaire qui enregistre les modifications structurelles et les modifications apportées aux données.
binlog_format
Définissez sur :
row
.Le connecteur ne prend en charge que la réplication basée sur les lignes. Il se peut que les versions 8.x de MySQL soient les dernières à prendre en charge ce paramètre, et les versions futures ne prendront en charge que la réplication basée sur les lignes.
Ne s’applique pas dans GCP Cloud SQL, où il est défini sur la bonne valeur.
binlog_row_metadata
Définissez sur :
full
.Le connecteur a besoin de toutes les métadonnées des lignes pour fonctionner, en particulier des noms des colonnes et des informations sur les clés primaires.
binlog_row_image
Définissez sur :
full
.Le connecteur nécessite que toutes les colonnes soient écrites dans le journal binaire.
Ne s’applique pas dans Amazon Aurora, où cela est défini sur la bonne valeur.
binlog_row_value_options
Laissez vide.
Cette option n’affecte que les colonnes JSON, où elle peut être définie de sorte à n’inclure que les parties modifiées des documents JSON pour les instructions
UPDATE
. Le connecteur nécessite que les documents complets soient écrits dans le journal binaire.binlog_expire_logs_seconds
Définissez cette option au minimum sur quelques heures ou sur une durée supérieure pour garantir que l’agent de base de données puisse poursuivre la réplication incrémentielle après des pauses prolongées ou des temps d’arrêt. Snowflake vous recommande de fixer la période d’expiration des logs binaires (binlog_expire_logs_seconds) à au moins quelques heures afin de garantir un fonctionnement stable du connecteur. À l’issue de la période d’expiration du journal binaire, les fichiers du journal binaire peuvent être automatiquement supprimés. Si l’intégration est interrompue pendant une longue période, par exemple en raison de travaux de maintenance, et que les journaux binaires expirés sont supprimés pendant cette période, Openflow ne sera pas en mesure de répliquer les données de ces fichiers.
Si vous utilisez la réplication planifiée, la valeur doit être supérieure à la planification configurée.
Voyez le code suivant à titre d’exemple :
log_bin = on binlog_format = row binlog_row_metadata = full binlog_row_image = full binlog_row_value_options =
Augmentez la valeur de
sort_buffer_size
.sort_buffer_size = 4194304
sort_buffer_size
définit la quantité de mémoire (en octets) allouée par fil de requête pour les opérations de tri en mémoire, telles que ORDER BY. Si la valeur est trop petite, le connecteur peut échouer avec le message d’erreur suivant :Out of sort memory, consider increasing server sort buffer size
. Cela indique quesort_buffer_size
devrait être augmenté.Si vous utilisez les bases de données Amazon RDS, augmentez la période de conservation pertinente pour
binlog_expire_logs_seconds
en utilisantrds_set_configuration
. Par exemple, si vous souhaitez stocker le binlog pendant 24 heures, appelezmysql.rds_set_configuration('binlog retention hours', 24)
.Connexion via SSL. Si vous planifiez d’utiliser une connexion SSL vers MySQL, préparez le certificat racine de votre serveur de base de données. Elle est exigée lors de la configuration.
Créez un utilisateur pour le connecteur. Le connecteur nécessite un utilisateur disposant des privilèges REPLICATION_SLAVE et REPLICATION_CLIENT pour la lecture des logs binaires. Accordez ces privilèges :
GRANT REPLICATION SLAVE ON *.* TO '<username>'@'%' GRANT REPLICATION CLIENT ON *.* TO '<username>'@'%'
Accordez le privilège SELECT sur chaque table répliquée :
GRANT SELECT ON <schema>.* TO '<username>'@'%' GRANT SELECT ON <schema>.<table> TO '<username>'@'%'
Pour plus d’informations sur la sécurité de la réplication, voir journal binaire.
En tant qu’administrateur de compte Snowflake, effectuez les tâches suivantes :
Créez un utilisateur Snowflake avec le type SERVICE. Créez une base de données pour stocker les données répliquées et définissez les privilèges permettant à l’utilisateur de Snowflake de créer des objets dans cette base de données en lui accordant les privilèges USAGE et CREATE SCHEMA.
CREATE DATABASE <destination_database>; CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow'; CREATE ROLE <openflow_role>; GRANT ROLE <openflow_role> TO USER <openflow_user>; GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>; GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>; CREATE WAREHOUSE <openflow_warehouse> WITH WAREHOUSE_SIZE = 'MEDIUM' AUTO_SUSPEND = 300 AUTO_RESUME = TRUE; GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
Créez une paire de clés sécurisées (publique et privée). Stockez la clé privée de l’utilisateur dans un fichier à fournir à la configuration du connecteur. Attribuez la clé publique à l’utilisateur du service Snowflake :
ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
Pour plus d’informations, voir paire de clés.
Désignez un entrepôt à utiliser par le connecteur. Commencez par la taille d’entrepôt
MEDIUM
, puis faites des essais en fonction du nombre de tables répliquées et de la quantité de données transférées. Les tables de grande taille s’adaptent généralement mieux aux entrepôts multi-clusters, plutôt qu’à la taille de l’entrepôt.
Définir le connecteur¶
En tant qu’ingénieur des données, effectuez les tâches suivantes pour installer et configurer le connecteur :
Installer le connecteur¶
Naviguez jusqu’à la page d’aperçu d’Openflow. Dans la section Featured connectors, sélectionnez View more connectors.
Sur la page des connecteurs Openflow, trouvez le connecteur et sélectionnez Add to runtime.
Dans la boîte de dialogue Select runtime, sélectionnez votre environnement d’exécution dans la liste déroulante Available runtimes.
Sélectionnez Add.
Note
Avant d’installer le connecteur, assurez-vous que vous avez créé une base de données et un schéma dans Snowflake pour que le connecteur puisse stocker les données ingérées.
Authentifiez-vous au déploiement avec les identifiants de votre compte Snowflake et sélectionnez Allow lorsque vous êtes invité à autoriser l’application d’exécution à accéder à votre compte Snowflake. Le processus d’installation du connecteur prend quelques minutes.
Authentifiez-vous auprès de l’environnement d’exécution avec les identifiants de votre compte Snowflake.
Le canevas Openflow apparaît avec le groupe de processus du connecteur ajouté.
Configuration du connecteur¶
Vous pouvez configurer le connecteur pour les cas d’utilisation suivants :
Répliquer un ensemble de tables en temps réel¶
Cliquez avec le bouton droit de la souris sur le groupe de processus importé et sélectionnez Parameters.
Renseignez les valeurs des paramètres requis comme décrit dans Paramètres de débit.
Paramètres de débit¶
Commencez par définir les paramètres du contexte des paramètres source MySQL, puis du contexte des paramètres de destination MySQL. Une fois cette opération effectuée, vous pouvez activer le connecteur. Le connecteur devrait se connecter à la fois à MySQL et à Snowflake et commencer à fonctionner. Toutefois, le connecteur ne réplique aucune donnée tant que les tables à répliquer n’ont pas été explicitement ajoutées à sa configuration.
Pour configurer des tables spécifiques pour la réplication, modifiez le contexte des paramètres d’ingestion MySQL. Une fois que vous avez appliqué les modifications au contexte Paramètres de réplication, la configuration est reprise par le connecteur et le cycle de vie de la réplication démarre pour chaque table.
Contexte des paramètres source MySQL¶
Paramètre |
Description |
---|---|
MySQL Connexion URL |
L’adresse complète URL JDBC de la base de données source. Le connecteur utilise le pilote MariaDB, qui est compatible avec MySQL et nécessite le préfixe Exemples :
|
Pilote MySQL JDBC |
Le chemin absolu vers le fichier jar du pilote MariaDB JDBC. Le connecteur utilise le pilote MariaDB, qui est compatible avec MySQL. Cochez la case Reference asset pour télécharger le pilote MariaDB JDBC. Exemple : |
Nom d’utilisateur MySQL |
Le nom d’utilisateur du connecteur. |
Mot de passe MySQL |
Le mot de passe du connecteur. |
Contexte des paramètres de destination MySQL¶
Paramètre |
Description |
---|---|
Base de données de destination |
La base de données dans laquelle les données seront conservées. Doit déjà exister dans Snowflake. |
Identificateur de compte Snowflake |
Nom du compte Snowflake formaté comme suit : [[nom de l’organisation] -[[nom du compte] où les données seront conservées |
Stratégie d’authentification Snowflake |
Stratégie d’authentification auprès de Snowflake. Valeurs possibles : |
Clé privée de Snowflake |
La clé privée RSA utilisée pour l’authentification. La clé RSA doit être formatée selon les normes PKCS8 et comporter les en-têtes et pieds de page standard PEM. Notez que vous devez définir soit le fichier de clé privée de Snowflake, soit la clé privée de Snowflake. |
Fichier de clé privée de Snowflake |
Le fichier qui contient la clé privée RSA utilisée pour l’authentification à Snowflake, formaté selon les normes PKCS8 et comportant les en-têtes et pieds de page standard PEM. La ligne d’en-tête commence par |
Mot de passe de la clé privée de Snowflake |
Le mot de passe associé au fichier de la clé privée de Snowflake |
Rôle Snowflake |
Rôle Snowflake utilisé lors de l’exécution de la requête |
Nom d’utilisateur Snowflake |
Nom d’utilisateur utilisé pour se connecter à l’instance de Snowflake |
Entrepôt Snowflake |
L’entrepôt de Snowflake est utilisé pour exécuter des requêtes |
Contexte des paramètres d’ingestion MySQL¶
Paramètre |
Description |
---|---|
Noms des tables incluses |
Une liste de chemins de tables séparés par des virgules, y compris leurs schémas. Exemple : |
Table incluse Regex |
Une expression régulière à associer aux chemins de la table. Chaque chemin correspondant à l’expression sera répliqué, et les nouvelles tables correspondant au modèle qui seront créées ultérieurement seront également incluses automatiquement. Exemple : |
Filtre JSON |
Une adresse JSON contenant une liste de noms de tables entièrement qualifiés et un modèle de regex pour les noms de colonnes à inclure dans la réplication. Exemple : |
Fusionner la planification des tâches CRON |
Expression CRON définissant les périodes au cours desquelles les opérations de fusion du journal vers la table de destination seront déclenchées. Paramétrez cet élément sur |
Supprimez et ajoutez à nouveau une table à la réplication¶
Pour supprimer une table de la réplication, assurez-vous qu’elle est supprimée des paramètres Noms des tables incluses
ou Table incluse Regex
dans le contexte des paramètres de réplication.
Si vous souhaitez réajuster la table à la réplication ultérieurement, supprimez d’abord la table de destination correspondante dans Snowflake. Ensuite, ajoutez à nouveau la table aux paramètres Noms des tables incluses
ou Table incluse Regex
. Cela permet de garantir que le processus de réplication démarre à nouveau pour la table.
Cette approche peut également être utilisée pour récupérer un scénario de réplication de table qui a échoué.
Répliquer un sous-ensemble de colonnes dans une table¶
Le connecteur peut filtrer les données répliquées par table sur un sous-ensemble de colonnes configurées.
Pour appliquer des filtres aux colonnes, modifiez la propriété Filtre de colonne dans le contexte Paramètres de réplication, en ajoutant un tableau de configurations, une entrée pour chaque table à laquelle vous souhaitez appliquer un filtre.
Les colonnes peuvent être incluses ou exclues par nom ou par modèle. Vous pouvez appliquer une seule condition par table ou combiner plusieurs conditions, les exclusions ayant toujours la priorité sur les inclusions.
L’exemple suivant montre les champs disponibles. Les champs schema
et table
sont obligatoires. Une ou plusieurs des adresses suivantes : included
, excluded
, includedPattern
, excludedPattern
sont exigées.
[
{
"schema": "<source table schema>",
"table" : "<source table name>",
"included": ["<column name>", "<column name>"],
"excluded": ["<column name>", "<column name>"],
"includedPattern": "<regular expression>",
"excludedPattern": "<regular expression>",
}
]
Suivre les changements de données dans les tables¶
Le connecteur réplique non seulement l’état actuel des données des tables sources, mais aussi l’état de chaque ligne de chaque jeu de modifications. Ces données sont stockées dans des tables de journal créées dans le même schéma que la table de destination.
Les noms des tables du journal sont formatés comme suit : <nom de la table source>_JOURNAL_<horodatage>_<génération du schéma>
où <horodatage>
est la valeur en secondes de l’époque à laquelle la table source a été ajoutée à la réplication, et <génération de schéma>
est un nombre entier augmentant à chaque changement de schéma sur la table source. Cela signifie qu’une table source qui subit des modifications de schéma aura plusieurs tables de journal.
Lorsqu’une table est retirée de la réplication, puis réintroduite, la valeur de l’horodatage <>
change et la génération du schéma <>
reprend à partir de 1
.
Important
Snowflake vous recommande de ne pas modifier les tables du journal ou les données qu’elles contiennent, de quelque manière que ce soit. Elles sont utilisées par le connecteur pour mettre à jour la table de destination dans le cadre du processus de réplication.
Le connecteur n’abandonne jamais les tables de logs, mais il n’utilise activement que le dernier journal pour chaque table source répliquée. Si vous souhaitez récupérer le stockage, vous pouvez en toute sécurité supprimer les tables de journal liées aux tables sources qui ont été retirées de la réplication, ainsi que toutes les tables activement répliquées, à l’exception de celles de la dernière génération.
Par exemple, si votre connecteur est paramétré pour répliquer activement la table source orders
, et que vous avez précédemment supprimé la table customers
de la réplication, vous pouvez avoir les tables de journal suivantes. Dans ce cas, vous pouvez toutes les supprimer, à l’exception de orders_5678_2
.
customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2
Configuration de la planification des tâches de fusion¶
Le connecteur utilise un entrepôt pour fusionner les données de capture des données de changement (CDC) dans les tables de destination. Cette opération est déclenchée par le processeur MergeSnowflakeJournalTable. S’il n’y a pas de nouvelles modifications ou si aucun nouveau fichier de flux n’est en attente dans la file d’attente MergeSnowflakeJournalTable, aucune fusion n’est déclenchée et l’entrepôt se suspend automatiquement.
Pour limiter le coût de l’entrepôt et restreindre les fusions aux seules heures planifiées, utilisez l’expression CRON dans le paramètre de planification CRON de la tâche de fusion. Elle limite les fichiers de flux arrivant au processeur MergeSnowflakeJournalTable et les fusions ne sont déclenchées qu’au cours d’une période donnée. Pour plus d’informations sur la planification, voir Stratégie de planification.
Exécutez le flux¶
Cliquez avec le bouton droit de la souris sur l’avion et sélectionnez Enable all Controller Services.
Cliquez avec le bouton droit de la souris sur le groupe de processus importé et sélectionnez Start. Le connecteur démarre l’ingestion des données.