Paramétrez Openflow Connector for PostgreSQL

Note

Le connecteur est soumis aux conditions d’utilisation du connecteur.

Cette rubrique décrit les étapes pour paramétrer Openflow Connector for PostgreSQL.

Conditions préalables

  1. Assurez-vous d’avoir consulté À propos de Openflow Connector for PostgreSQL.

  2. Assurez-vous d’avoir pris connaissance des versions PostgreSQL prises en charge.

  3. Assurez-vous que vous avez paramétré Openflow.

  4. En tant qu’administrateur de la base de données, effectuez les tâches suivantes :

    1. Configurer wal_level

    2. Créer une publication

    3. Assurez-vous qu’il y a suffisamment d’espace disque sur votre serveur PostgreSQL pour le fichier WAL. En effet, une fois créé, un emplacement de réplication fait en sorte que PostgreSQL conserve les données WAL de la position détenue par l’emplacement de réplication, jusqu’à ce que le connecteur confirme et avance cette position.

    4. Assurez-vous que chaque table activée pour la réplication possède une clé primaire. La clé peut être une colonne unique ou composite.

    5. Paramétrez l”IDENTITY REPLICA des tables sur DEFAULT. Cela garantit que les clés primaires sont représentées dans WAL, et que le connecteur peut les lire.

    6. Créez un utilisateur pour le connecteur. Le connecteur nécessite un utilisateur possédant l’attribut REPLICATION et les autorisations nécessaires pour SELECT à partir de chaque table répliquée. Créez cet utilisateur avec un mot de passe pour entrer dans la configuration du connecteur. Pour plus d’informations sur la sécurité de la réplication, voir Sécurité.

Configurer wal_level

Openflow Connector for PostgreSQL exige que wal_level soit défini sur logical.

En fonction de l’endroit où votre serveur PostgreSQL est hébergé, vous pouvez configurer wal_level comme suit :

Sur place

Exécutez la requête suivante avec le superutilisateur ou l’utilisateur titulaire du privilège ALTER SYSTEM :

ALTER SYSTEM SET wal_level = logical;
Copy

RDS

L’utilisateur utilisé par l’agent doit disposer du rôle rds_superuser ou rds_replication.

Vous devez également définir les éléments suivants :

  • Le paramètre statique rds.logical_replication sur 1.

  • Les paramètres max_replication_slots, max_connections et max_wal_senders en fonction de la configuration de votre base de données et de votre réplication.

AWS Aurora

Définissez le paramètre statique rds.logical_replication sur 1.

GCP

Définissez les indicateurs suivants :

  • cloudsql.logical_decoding=on.

  • cloudsql.enable_pglogical=on.

Pour plus d’informations, voir la documentation Google Cloud.

Azure

Définissez la prise en charge de la réplication sur Logical. Pour plus d’informations, voir la documentation Azure.

Créer une publication

Openflow Connector for PostgreSQL exige qu’une publication https://www.postgresql.org/docs/current/logical-replication-publication.html#LOGICAL-REPLICATION-PUBLICATION soit créée et configurée dans PostgreSQL avant le début de la réplication. Vous pouvez la créer pour toutes les tables ou un sous-ensemble de tables, ainsi que pour des tables spécifiques comportant uniquement des colonnes déterminées. Assurez-vous que toutes les tables et colonnes que vous prévoyez de faire répliquer sont incluses dans la publication. Vous pouvez également modifier la publication ultérieurement, pendant que le connecteur fonctionne. Pour créer et configurer une publication, procédez comme suit :

  1. Connectez-vous en tant qu’utilisateur disposant du privilège CREATE dans la base de données et exécutez la requête suivante :

CREATE PUBLICATION <publication name>;
Copy
  1. Définissez les tables que l’agent de la base de données pourra voir en utilisant :

ALTER PUBLICATION <publication name> ADD TABLE <table name>;
Copy

Important

PostgreSQL 15 et les versions ultérieures prennent en charge la configuration des publications pour un sous-ensemble spécifié de colonnes de table. Pour que le connecteur prenne cela en charge correctement, vous devez utiliser les paramètres de filtrage des colonnes afin d’inclure les mêmes colonnes que celles définies dans la publication.

Sans ce paramètre, le connecteur aura le comportement suivant :

  • Dans la table de destination, les colonnes qui ne sont pas incluses dans le filtre seront suffixées par __DELETED. Toutes les données répliquées lors de la phase instantanée seront encore là.

  • Après l’ajout de nouvelles colonnes à la publication, la table sera en échec permanent et nécessitera le redémarrage de sa réplication.

Pour plus d’informations, voir ALTER PUBLICATION.

  1. En tant qu’administrateur de compte Snowflake, effectuez les tâches suivantes :

    1. Créez un utilisateur Snowflake avec le type SERVICE. Créez une base de données pour stocker les données répliquées et définissez les privilèges permettant à l’utilisateur de Snowflake de créer des objets dans cette base de données en lui accordant les privilèges USAGE et CREATE SCHEMA.

      CREATE DATABASE <destination_database>;
      CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow';
      CREATE ROLE <openflow_role>;
      GRANT ROLE <openflow_role> TO USER <openflow_user>;
      GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>;
      GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
      CREATE WAREHOUSE <openflow_warehouse>
           WITH
                WAREHOUSE_SIZE = 'MEDIUM'
                AUTO_SUSPEND = 300
                AUTO_RESUME = TRUE;
      GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
      
      Copy
    2. Créez une paire de clés sécurisées (publique et privée). Stockez la clé privée de l’utilisateur dans un fichier à fournir à la configuration du connecteur. Attribuez la clé publique à l’utilisateur du service Snowflake :

      ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
      
      Copy

      Pour plus d’informations, voir paire de clés.

    3. Désignez un entrepôt à utiliser par le connecteur. Commencez par la taille d’entrepôt MEDIUM, puis faites des essais en fonction du nombre de tables répliquées et de la quantité de données transférées. Les tables de grande taille s’adaptent généralement mieux aux entrepôts multi-clusters, plutôt qu’à la taille de l’entrepôt.

Importez la définition du connecteur dans Openflow

  1. Naviguez jusqu’à la page d’aperçu d’Openflow. Dans la section Featured connectors, sélectionnez View more connectors.

  2. Sur la page des connecteurs Openflow, trouvez le connecteur et sélectionnez Add to runtime.

  3. Dans la boîte de dialogue Select runtime, sélectionnez votre environnement d’exécution dans la liste déroulante Available runtimes.

  4. Sélectionnez Add.

    Note

    Avant d’installer le connecteur, assurez-vous que vous avez créé une base de données et un schéma dans Snowflake pour que le connecteur puisse stocker les données ingérées.

  5. Authentifiez-vous au déploiement avec les identifiants de votre compte Snowflake et sélectionnez Allow lorsque vous êtes invité à autoriser l’application d’exécution à accéder à votre compte Snowflake. Le processus d’installation du connecteur prend quelques minutes.

  6. Authentifiez-vous auprès de l’environnement d’exécution avec les identifiants de votre compte Snowflake.

Le canevas Openflow apparaît avec le groupe de processus du connecteur ajouté.

Configuration du connecteur

Vous pouvez configurer le connecteur pour les cas d’utilisation suivants :

Répliquer un ensemble de tables en temps réel

  1. Cliquez avec le bouton droit de la souris sur le groupe de processus importé et sélectionnez Parameters.

  2. Renseignez les valeurs des paramètres requis comme décrit dans Paramètres de débit.

Paramètres de débit

Commencez par définir les paramètres du contexte des paramètres source PostgreSQL, puis du contexte des paramètres de destination PostgreSQL. Une fois cela fait, vous pouvez activer le connecteur, qui devrait se connecter à la fois à PostgreSQL et à Snowflake et commencer à fonctionner. Toutefois, il ne répliquera aucune donnée tant que des tables n’auront pas été explicitement ajoutées à sa configuration.

Pour configurer des tables spécifiques pour la réplication, modifiez le contexte des paramètres d’ingestion PostgreSQL. Peu de temps après avoir appliqué les modifications au contexte des paramètres de réplication, la configuration sera reprise par le connecteur et le cycle de vie de la réplication commencera pour chaque table.

Contexte des paramètres source PostgreSQL

Paramètre

Description

URL de connexion Postgres

L’adresse complète URL JDBC de la base de données source. Exemple : jdbc:postgresql://example.com:5432/public

Pilote Postgres JDBC

Le chemin vers les fichiers jar du pilote PostgreSQL JDBC. Téléchargez le jar depuis son site web, puis cochez la case Reference asset pour le télécharger et le joindre.

Mode Postgres SSL

Activez ou désactivez les connexions SSL.

Certificat racine Postgres SSL

Le contenu complet du certificat racine de la base de données. Facultatif si SSL désactivé.

Nom d’utilisateur Postgres

Le nom d’utilisateur du connecteur.

Mot de passe Postgres

Le mot de passe du connecteur.

Publication Name

Le nom de la publication que vous avez créée précédemment.

Contexte des paramètres de destination PostgreSQL

Paramètre

Description

Base de données de destination

La base de données dans laquelle les données seront conservées. Doit déjà exister dans Snowflake.

Identificateur de compte Snowflake

Nom du compte Snowflake formaté comme suit : [[nom de l’organisation] -[[nom du compte] où les données seront conservées

Stratégie d’authentification Snowflake

Stratégie d’authentification auprès de Snowflake. Valeurs possibles : SNOWFLAKE_SESSION_TOKEN, lorsque vous exécutez le flux sur SPCS et KEY_PAIR lorsque vous souhaitez établir l’accès à l’aide d’une clé privée.

Clé privée de Snowflake

La clé privée RSA utilisée pour l’authentification. La clé RSA doit être formatée selon les normes PKCS8 et comporter les en-têtes et pieds de page standard PEM. Notez que vous devez définir soit le fichier de clé privée de Snowflake, soit la clé privée de Snowflake.

Fichier de clé privée de Snowflake

Le fichier qui contient la clé privée RSA utilisée pour l’authentification à Snowflake, formaté selon les normes PKCS8 et comportant les en-têtes et pieds de page standard PEM. La ligne d’en-tête commence par -----BEGIN PRIVATE. Cochez la case Reference asset pour télécharger le fichier de la clé privée.

Mot de passe de la clé privée de Snowflake

Le mot de passe associé au fichier de la clé privée de Snowflake

Rôle Snowflake

Rôle Snowflake utilisé lors de l’exécution de la requête

Nom d’utilisateur Snowflake

Nom d’utilisateur utilisé pour se connecter à l’instance de Snowflake

Entrepôt Snowflake

L’entrepôt de Snowflake est utilisé pour exécuter des requêtes

Contexte des paramètres d’ingestion PostgreSQL

Paramètre

Description

Noms des tables incluses

Une liste de chemins de tables séparés par des virgules, y compris leurs schémas. Exemple : public.my_table, other_schema.other_table

Table incluse Regex

Une expression régulière à associer aux chemins de la table. Chaque chemin correspondant à l’expression sera répliqué, et les nouvelles tables correspondant au modèle qui seront créées ultérieurement seront également incluses automatiquement. Exemple : public\.auto_.*

Filtre JSON

Une adresse JSON contenant une liste de noms de tables entièrement qualifiés et un modèle de regex pour les noms de colonnes à inclure dans la réplication. Exemple : [ {"schema":"public", "table":"table1", "includedPattern":".*name"} ] inclura toutes les colonnes qui se terminent par name dans table1 du schéma public.

Fusionner la planification des tâches CRON

Expression CRON définissant les périodes au cours desquelles les opérations de fusion du journal vers la table de destination seront déclenchées. Paramétrez cet élément sur * * * * * ? si vous souhaitez une fusion continue ou une planification pour limiter la durée d’exécution de l’entrepôt.

Par exemple :

  • La chaîne * 0 * * * ? indique que vous souhaitez planifier des fusions à l’heure pleine pendant une minute

  • La chaîne * 20 14 ? * MON-FRI indique que vous souhaitez planifier les fusions à 14h20 tous les lundis et vendredis.

Pour plus d’informations et d’exemples, consultez le tutoriel sur les déclencheurs cron dans la documentation de Quartz

Supprimez et ajoutez à nouveau une table à la réplication

Pour supprimer une table de la réplication, assurez-vous qu’elle est supprimée des paramètres Noms des tables incluses ou Table incluse Regex dans le contexte des paramètres de réplication.

Si vous souhaitez réajuster la table à la réplication ultérieurement, supprimez d’abord la table de destination correspondante dans Snowflake. Ensuite, ajoutez à nouveau la table aux paramètres Noms des tables incluses ou Table incluse Regex. Cela permet de garantir que le processus de réplication démarre à nouveau pour la table.

Cette approche peut également être utilisée pour récupérer un scénario de réplication de table qui a échoué.

Répliquer un sous-ensemble de colonnes dans une table

Le connecteur peut filtrer les données répliquées par table sur un sous-ensemble de colonnes configurées.

Pour appliquer des filtres aux colonnes, modifiez la propriété Filtre de colonne dans le contexte Paramètres de réplication, en ajoutant un tableau de configurations, une entrée pour chaque table à laquelle vous souhaitez appliquer un filtre.

Les colonnes peuvent être incluses ou exclues par nom ou par modèle. Vous pouvez appliquer une seule condition par table ou combiner plusieurs conditions, les exclusions ayant toujours la priorité sur les inclusions.

L’exemple suivant montre les champs disponibles. schema et table sont obligatoires, puis un ou plusieurs des champs included, excluded, includedPattern, excludedPattern sont requis.

[
    {
        "schema": "<source table schema>",
        "table" : "<source table name>",
        "included": ["<column name>", "<column name>"],
        "excluded": ["<column name>", "<column name>"],
        "includedPattern": "<regular expression>",
        "excludedPattern": "<regular expression>",
    }
]
Copy

Suivre les changements de données dans les tables

Le connecteur réplique non seulement l’état actuel des données des tables sources, mais aussi l’état de chaque ligne de chaque jeu de modifications. Ces données sont stockées dans des tables de journal créées dans le même schéma que la table de destination.

Les noms des tables du journal sont formatés comme suit : <nom de la table source>_JOURNAL_<horodatage>_<génération du schéma>

<horodatage> est la valeur en secondes de l’époque à laquelle la table source a été ajoutée à la réplication, et <génération de schéma> est un nombre entier augmentant à chaque changement de schéma sur la table source. Cela signifie qu’une table source qui subit des modifications de schéma aura plusieurs tables de journal.

Lorsqu’une table est retirée de la réplication, puis réintroduite, la valeur de l’horodatage <> change et la génération du schéma <> reprend à partir de 1.

Important

Snowflake vous recommande de ne pas modifier les tables du journal ou les données qu’elles contiennent, de quelque manière que ce soit. Elles sont utilisées par le connecteur pour mettre à jour la table de destination dans le cadre du processus de réplication.

Le connecteur n’abandonne jamais les tables de logs, mais il n’utilise activement que le dernier journal pour chaque table source répliquée. Si vous souhaitez récupérer le stockage, vous pouvez en toute sécurité supprimer les tables de journal liées aux tables sources qui ont été retirées de la réplication, ainsi que toutes les tables activement répliquées, à l’exception de celles de la dernière génération.

Par exemple, si votre connecteur est paramétré pour répliquer activement la table source orders, et que vous avez précédemment supprimé la table customers de la réplication, vous pouvez avoir les tables de journal suivantes. Dans ce cas, vous pouvez toutes les supprimer, à l’exception de orders_5678_2.

customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2

Configuration de la planification des tâches de fusion

Le connecteur utilise un entrepôt pour fusionner les données de capture des données de changement (CDC) dans les tables de destination. Cette opération est déclenchée par le processeur MergeSnowflakeJournalTable. S’il n’y a pas de nouvelles modifications ou si aucun nouveau fichier de flux n’est en attente dans la file d’attente MergeSnowflakeJournalTable, aucune fusion n’est déclenchée et l’entrepôt se suspend automatiquement.

Pour limiter le coût de l’entrepôt et restreindre les fusions aux seules heures planifiées, utilisez l’expression CRON dans le paramètre de planification CRON de la tâche de fusion. Elle limite les fichiers de flux arrivant au processeur MergeSnowflakeJournalTable et les fusions ne sont déclenchées qu’au cours d’une période donnée. Pour plus d’informations sur la planification, voir Stratégie de planification.

Arrêter ou supprimer le connecteur

Lors de l’arrêt ou de la suppression du connecteur, vous devez tenir compte de l’emplacement de réplication https://www.postgresql.org/docs/current/warm-standby.html#STREAMING-REPLICATION-SLOTS utilisé par le connecteur.

Le connecteur crée son propre slot de réplication avec un nom commençant par snowflake_connector_ suivi d’un suffixe aléatoire. À mesure que le connecteur lit le flux de réplication, il avance l’emplacement, de sorte que PostgreSQL puisse découper son journal WAL et libérer de l’espace disque.

Lorsque le connecteur est en pause, le slot n’est pas avancé et les modifications apportées à la base de données source continuent d’augmenter la taille du journal WAL. Vous ne devez pas laisser le connecteur en pause pendant de longues périodes, en particulier dans les bases de données à fort trafic.

Lorsque le connecteur est supprimé, que ce soit en le supprimant du canevas Openflow ou par tout autre moyen, comme la suppression de l’instance Openflow entière, le slot de réplication reste en place et doit être supprimé manuellement.

Si vous avez plusieurs instances de connecteur qui répliquent à partir de la même base de données PostgreSQL, chaque instance créera son propre slot de réplication portant un nom unique. Lorsque vous supprimez manuellement un emplacement de réplication, assurez-vous qu’il s’agit du bon. Vous pouvez voir quel emplacement de réplication est utilisé par une instance de connecteur donnée en vérifiant l’état du processeur CaptureChangePostgreSQL.

Exécutez le flux

  1. Cliquez avec le bouton droit de la souris sur l’avion et sélectionnez Enable all Controller Services.

  2. Cliquez avec le bouton droit de la souris sur le groupe de processus importé et sélectionnez Start. Le connecteur démarre l’ingestion des données.