Paramétrez Openflow Connector for PostgreSQL¶
Note
Le connecteur est soumis aux conditions d’utilisation du connecteur.
Cette rubrique décrit les étapes pour paramétrer Openflow Connector for PostgreSQL.
Conditions préalables¶
Assurez-vous d’avoir consulté À propos de Openflow Connector for PostgreSQL.
Assurez-vous d’avoir pris connaissance des versions PostgreSQL prises en charge.
Assurez-vous que vous avez paramétré Openflow.
En tant qu’administrateur de la base de données, effectuez les tâches suivantes :
Assurez-vous qu’il y a suffisamment d’espace disque sur votre serveur PostgreSQL pour le fichier WAL. En effet, une fois créé, un emplacement de réplication fait en sorte que PostgreSQL conserve les données WAL de la position détenue par l’emplacement de réplication, jusqu’à ce que le connecteur confirme et avance cette position.
Assurez-vous que chaque table activée pour la réplication possède une clé primaire. La clé peut être une colonne unique ou composite.
Paramétrez l”IDENTITY REPLICA des tables sur
DEFAULT
. Cela garantit que les clés primaires sont représentées dans WAL, et que le connecteur peut les lire.Créez un utilisateur pour le connecteur. Le connecteur nécessite un utilisateur possédant l’attribut
REPLICATION
et les autorisations nécessaires pour SELECT à partir de chaque table répliquée. Créez cet utilisateur avec un mot de passe pour entrer dans la configuration du connecteur. Pour plus d’informations sur la sécurité de la réplication, voir Sécurité.
Configurer wal_level¶
Openflow Connector for PostgreSQL exige que wal_level soit défini sur logical
.
En fonction de l’endroit où votre serveur PostgreSQL est hébergé, vous pouvez configurer wal_level comme suit :
Sur place |
Exécutez la requête suivante avec le superutilisateur ou l’utilisateur titulaire du privilège
|
RDS |
L’utilisateur utilisé par l’agent doit disposer du rôle Vous devez également définir les éléments suivants :
|
AWS Aurora |
Définissez le paramètre statique |
GCP |
Définissez les indicateurs suivants :
|
Azure |
Définissez la prise en charge de la réplication sur |
Créer une publication¶
Openflow Connector for PostgreSQL exige qu’une publication https://www.postgresql.org/docs/current/logical-replication-publication.html#LOGICAL-REPLICATION-PUBLICATION soit créée et configurée dans PostgreSQL avant le début de la réplication. Vous pouvez la créer pour toutes les tables ou un sous-ensemble de tables, ainsi que pour des tables spécifiques comportant uniquement des colonnes déterminées. Assurez-vous que toutes les tables et colonnes que vous prévoyez de faire répliquer sont incluses dans la publication. Vous pouvez également modifier la publication ultérieurement, pendant que le connecteur fonctionne. Pour créer et configurer une publication, procédez comme suit :
Connectez-vous en tant qu’utilisateur disposant du privilège
CREATE
dans la base de données et exécutez la requête suivante :
CREATE PUBLICATION <publication name>;
Définissez les tables que l’agent de la base de données pourra voir en utilisant :
ALTER PUBLICATION <publication name> ADD TABLE <table name>;
Important
PostgreSQL 15 et les versions ultérieures prennent en charge la configuration des publications pour un sous-ensemble spécifié de colonnes de table. Pour que le connecteur prenne cela en charge correctement, vous devez utiliser les paramètres de filtrage des colonnes afin d’inclure les mêmes colonnes que celles définies dans la publication.
Sans ce paramètre, le connecteur aura le comportement suivant :
Dans la table de destination, les colonnes qui ne sont pas incluses dans le filtre seront suffixées par
__DELETED
. Toutes les données répliquées lors de la phase instantanée seront encore là.Après l’ajout de nouvelles colonnes à la publication, la table sera en échec permanent et nécessitera le redémarrage de sa réplication.
Pour plus d’informations, voir ALTER PUBLICATION.
En tant qu’administrateur de compte Snowflake, effectuez les tâches suivantes :
Créez un utilisateur Snowflake avec le type SERVICE. Créez une base de données pour stocker les données répliquées et définissez les privilèges permettant à l’utilisateur de Snowflake de créer des objets dans cette base de données en lui accordant les privilèges USAGE et CREATE SCHEMA.
CREATE DATABASE <destination_database>; CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow'; CREATE ROLE <openflow_role>; GRANT ROLE <openflow_role> TO USER <openflow_user>; GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>; GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>; CREATE WAREHOUSE <openflow_warehouse> WITH WAREHOUSE_SIZE = 'MEDIUM' AUTO_SUSPEND = 300 AUTO_RESUME = TRUE; GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
Créez une paire de clés sécurisées (publique et privée). Stockez la clé privée de l’utilisateur dans un fichier à fournir à la configuration du connecteur. Attribuez la clé publique à l’utilisateur du service Snowflake :
ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
Pour plus d’informations, voir paire de clés.
Désignez un entrepôt à utiliser par le connecteur. Commencez par la taille d’entrepôt
MEDIUM
, puis faites des essais en fonction du nombre de tables répliquées et de la quantité de données transférées. Les tables de grande taille s’adaptent généralement mieux aux entrepôts multi-clusters, plutôt qu’à la taille de l’entrepôt.
Importez la définition du connecteur dans Openflow¶
Naviguez jusqu’à la page d’aperçu d’Openflow. Dans la section Featured connectors, sélectionnez View more connectors.
Sur la page des connecteurs Openflow, trouvez le connecteur et sélectionnez Add to runtime.
Dans la boîte de dialogue Select runtime, sélectionnez votre environnement d’exécution dans la liste déroulante Available runtimes.
Sélectionnez Add.
Note
Avant d’installer le connecteur, assurez-vous que vous avez créé une base de données et un schéma dans Snowflake pour que le connecteur puisse stocker les données ingérées.
Authentifiez-vous au déploiement avec les identifiants de votre compte Snowflake et sélectionnez Allow lorsque vous êtes invité à autoriser l’application d’exécution à accéder à votre compte Snowflake. Le processus d’installation du connecteur prend quelques minutes.
Authentifiez-vous auprès de l’environnement d’exécution avec les identifiants de votre compte Snowflake.
Le canevas Openflow apparaît avec le groupe de processus du connecteur ajouté.
Configuration du connecteur¶
Vous pouvez configurer le connecteur pour les cas d’utilisation suivants :
Répliquer un ensemble de tables en temps réel¶
Cliquez avec le bouton droit de la souris sur le groupe de processus importé et sélectionnez Parameters.
Renseignez les valeurs des paramètres requis comme décrit dans Paramètres de débit.
Paramètres de débit¶
Commencez par définir les paramètres du contexte des paramètres source PostgreSQL, puis du contexte des paramètres de destination PostgreSQL. Une fois cela fait, vous pouvez activer le connecteur, qui devrait se connecter à la fois à PostgreSQL et à Snowflake et commencer à fonctionner. Toutefois, il ne répliquera aucune donnée tant que des tables n’auront pas été explicitement ajoutées à sa configuration.
Pour configurer des tables spécifiques pour la réplication, modifiez le contexte des paramètres d’ingestion PostgreSQL. Peu de temps après avoir appliqué les modifications au contexte des paramètres de réplication, la configuration sera reprise par le connecteur et le cycle de vie de la réplication commencera pour chaque table.
Contexte des paramètres source PostgreSQL¶
Paramètre |
Description |
---|---|
URL de connexion Postgres |
L’adresse complète URL JDBC de la base de données source. Exemple : |
Pilote Postgres JDBC |
Le chemin vers les fichiers jar du pilote PostgreSQL JDBC. Téléchargez le jar depuis son site web, puis cochez la case Reference asset pour le télécharger et le joindre. |
Mode Postgres SSL |
Activez ou désactivez les connexions SSL. |
Certificat racine Postgres SSL |
Le contenu complet du certificat racine de la base de données. Facultatif si SSL désactivé. |
Nom d’utilisateur Postgres |
Le nom d’utilisateur du connecteur. |
Mot de passe Postgres |
Le mot de passe du connecteur. |
Publication Name |
Le nom de la publication que vous avez créée précédemment. |
Contexte des paramètres de destination PostgreSQL¶
Paramètre |
Description |
---|---|
Base de données de destination |
La base de données dans laquelle les données seront conservées. Doit déjà exister dans Snowflake. |
Identificateur de compte Snowflake |
Nom du compte Snowflake formaté comme suit : [[nom de l’organisation] -[[nom du compte] où les données seront conservées |
Stratégie d’authentification Snowflake |
Stratégie d’authentification auprès de Snowflake. Valeurs possibles : |
Clé privée de Snowflake |
La clé privée RSA utilisée pour l’authentification. La clé RSA doit être formatée selon les normes PKCS8 et comporter les en-têtes et pieds de page standard PEM. Notez que vous devez définir soit le fichier de clé privée de Snowflake, soit la clé privée de Snowflake. |
Fichier de clé privée de Snowflake |
Le fichier qui contient la clé privée RSA utilisée pour l’authentification à Snowflake, formaté selon les normes PKCS8 et comportant les en-têtes et pieds de page standard PEM. La ligne d’en-tête commence par |
Mot de passe de la clé privée de Snowflake |
Le mot de passe associé au fichier de la clé privée de Snowflake |
Rôle Snowflake |
Rôle Snowflake utilisé lors de l’exécution de la requête |
Nom d’utilisateur Snowflake |
Nom d’utilisateur utilisé pour se connecter à l’instance de Snowflake |
Entrepôt Snowflake |
L’entrepôt de Snowflake est utilisé pour exécuter des requêtes |
Contexte des paramètres d’ingestion PostgreSQL¶
Paramètre |
Description |
---|---|
Noms des tables incluses |
Une liste de chemins de tables séparés par des virgules, y compris leurs schémas. Exemple : |
Table incluse Regex |
Une expression régulière à associer aux chemins de la table. Chaque chemin correspondant à l’expression sera répliqué, et les nouvelles tables correspondant au modèle qui seront créées ultérieurement seront également incluses automatiquement. Exemple : |
Filtre JSON |
Une adresse JSON contenant une liste de noms de tables entièrement qualifiés et un modèle de regex pour les noms de colonnes à inclure dans la réplication. Exemple : |
Fusionner la planification des tâches CRON |
Expression CRON définissant les périodes au cours desquelles les opérations de fusion du journal vers la table de destination seront déclenchées. Paramétrez cet élément sur Par exemple :
Pour plus d’informations et d’exemples, consultez le tutoriel sur les déclencheurs cron dans la documentation de Quartz |
Supprimez et ajoutez à nouveau une table à la réplication¶
Pour supprimer une table de la réplication, assurez-vous qu’elle est supprimée des paramètres Noms des tables incluses
ou Table incluse Regex
dans le contexte des paramètres de réplication.
Si vous souhaitez réajuster la table à la réplication ultérieurement, supprimez d’abord la table de destination correspondante dans Snowflake. Ensuite, ajoutez à nouveau la table aux paramètres Noms des tables incluses
ou Table incluse Regex
. Cela permet de garantir que le processus de réplication démarre à nouveau pour la table.
Cette approche peut également être utilisée pour récupérer un scénario de réplication de table qui a échoué.
Répliquer un sous-ensemble de colonnes dans une table¶
Le connecteur peut filtrer les données répliquées par table sur un sous-ensemble de colonnes configurées.
Pour appliquer des filtres aux colonnes, modifiez la propriété Filtre de colonne dans le contexte Paramètres de réplication, en ajoutant un tableau de configurations, une entrée pour chaque table à laquelle vous souhaitez appliquer un filtre.
Les colonnes peuvent être incluses ou exclues par nom ou par modèle. Vous pouvez appliquer une seule condition par table ou combiner plusieurs conditions, les exclusions ayant toujours la priorité sur les inclusions.
L’exemple suivant montre les champs disponibles. schema
et table
sont obligatoires, puis un ou plusieurs des champs included
, excluded
, includedPattern
, excludedPattern
sont requis.
[
{
"schema": "<source table schema>",
"table" : "<source table name>",
"included": ["<column name>", "<column name>"],
"excluded": ["<column name>", "<column name>"],
"includedPattern": "<regular expression>",
"excludedPattern": "<regular expression>",
}
]
Suivre les changements de données dans les tables¶
Le connecteur réplique non seulement l’état actuel des données des tables sources, mais aussi l’état de chaque ligne de chaque jeu de modifications. Ces données sont stockées dans des tables de journal créées dans le même schéma que la table de destination.
Les noms des tables du journal sont formatés comme suit : <nom de la table source>_JOURNAL_<horodatage>_<génération du schéma>
où <horodatage>
est la valeur en secondes de l’époque à laquelle la table source a été ajoutée à la réplication, et <génération de schéma>
est un nombre entier augmentant à chaque changement de schéma sur la table source. Cela signifie qu’une table source qui subit des modifications de schéma aura plusieurs tables de journal.
Lorsqu’une table est retirée de la réplication, puis réintroduite, la valeur de l’horodatage <>
change et la génération du schéma <>
reprend à partir de 1
.
Important
Snowflake vous recommande de ne pas modifier les tables du journal ou les données qu’elles contiennent, de quelque manière que ce soit. Elles sont utilisées par le connecteur pour mettre à jour la table de destination dans le cadre du processus de réplication.
Le connecteur n’abandonne jamais les tables de logs, mais il n’utilise activement que le dernier journal pour chaque table source répliquée. Si vous souhaitez récupérer le stockage, vous pouvez en toute sécurité supprimer les tables de journal liées aux tables sources qui ont été retirées de la réplication, ainsi que toutes les tables activement répliquées, à l’exception de celles de la dernière génération.
Par exemple, si votre connecteur est paramétré pour répliquer activement la table source orders
, et que vous avez précédemment supprimé la table customers
de la réplication, vous pouvez avoir les tables de journal suivantes. Dans ce cas, vous pouvez toutes les supprimer, à l’exception de orders_5678_2
.
customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2
Configuration de la planification des tâches de fusion¶
Le connecteur utilise un entrepôt pour fusionner les données de capture des données de changement (CDC) dans les tables de destination. Cette opération est déclenchée par le processeur MergeSnowflakeJournalTable. S’il n’y a pas de nouvelles modifications ou si aucun nouveau fichier de flux n’est en attente dans la file d’attente MergeSnowflakeJournalTable, aucune fusion n’est déclenchée et l’entrepôt se suspend automatiquement.
Pour limiter le coût de l’entrepôt et restreindre les fusions aux seules heures planifiées, utilisez l’expression CRON dans le paramètre de planification CRON de la tâche de fusion. Elle limite les fichiers de flux arrivant au processeur MergeSnowflakeJournalTable et les fusions ne sont déclenchées qu’au cours d’une période donnée. Pour plus d’informations sur la planification, voir Stratégie de planification.
Arrêter ou supprimer le connecteur¶
Lors de l’arrêt ou de la suppression du connecteur, vous devez tenir compte de l’emplacement de réplication https://www.postgresql.org/docs/current/warm-standby.html#STREAMING-REPLICATION-SLOTS utilisé par le connecteur.
Le connecteur crée son propre slot de réplication avec un nom commençant par snowflake_connector_
suivi d’un suffixe aléatoire. À mesure que le connecteur lit le flux de réplication, il avance l’emplacement, de sorte que PostgreSQL puisse découper son journal WAL et libérer de l’espace disque.
Lorsque le connecteur est en pause, le slot n’est pas avancé et les modifications apportées à la base de données source continuent d’augmenter la taille du journal WAL. Vous ne devez pas laisser le connecteur en pause pendant de longues périodes, en particulier dans les bases de données à fort trafic.
Lorsque le connecteur est supprimé, que ce soit en le supprimant du canevas Openflow ou par tout autre moyen, comme la suppression de l’instance Openflow entière, le slot de réplication reste en place et doit être supprimé manuellement.
Si vous avez plusieurs instances de connecteur qui répliquent à partir de la même base de données PostgreSQL, chaque instance créera son propre slot de réplication portant un nom unique. Lorsque vous supprimez manuellement un emplacement de réplication, assurez-vous qu’il s’agit du bon. Vous pouvez voir quel emplacement de réplication est utilisé par une instance de connecteur donnée en vérifiant l’état du processeur CaptureChangePostgreSQL
.
Exécutez le flux¶
Cliquez avec le bouton droit de la souris sur l’avion et sélectionnez Enable all Controller Services.
Cliquez avec le bouton droit de la souris sur le groupe de processus importé et sélectionnez Start. Le connecteur démarre l’ingestion des données.