À propos de Openflow Connector for PostgreSQL

Note

Ce connecteur est soumis aux conditions d’utilisation de Snowflake Connector.

Cette rubrique décrit les concepts de base de Openflow Connector for PostgreSQL, son flux de travail et ses limites.

À propos de Openflow Connector for PostgreSQL

Openflow Connector for PostgreSQL connecte une instance de la base de données PostgreSQL à Snowflake et réplique les données des tables sélectionnées en temps quasi réel ou selon une planification. Le connecteur connecte également un journal de toutes les modifications de données, disponible avec l’état actuel des tables répliquées.

Cas d’utilisation

Utilisez ce connecteur si vous souhaitez effectuer les opérations suivantes :

  • Réplication CDC des données PostgreSQL avec Snowflake pour des rapports complets et centralisés.

Versions de PostgreSQL prises en charge

Les versions de PostgreSQL prises en charge sont répertoriées ci-dessous.

Versions de PostgreSQL prises en charge

11

12

13

14

15

16

17

18

Standard

Oui

Oui

Oui

Oui

Oui

Oui

Oui

Oui

AWS RDS

Oui

Oui

Oui

Oui

Oui

Oui

Oui

Oui

Amazon Aurora

Oui

Oui

Oui

Oui

Oui

Oui

Oui

GCP Cloud SQL

Oui

Oui

Oui

Oui

Oui

Oui

Oui

Base de données Azure

Oui

Oui

Oui

Oui

Oui

Oui

Oui

Exigences Openflow

  • La taille de l’environnement d’exécution doit être au moins moyenne. Utilisez un environnement d’exécution plus grand lorsque vous répliquez de grands volumes de données, en particulier lorsque la taille des lignes est importante.

  • Le connecteur ne prend pas en charge les environnements d’exécution Openflow à plusieurs nœuds. Configurez l’environnement d’exécution de ce connecteur avec Min nodes et Max nodes définis sur 1.

Limitations

  • Le connecteur est compatible avec PostgreSQL version 11 ou ultérieure.

  • Le connecteur ne prend en charge que l’authentification par nom d’utilisateur et mot de passe avec PostgreSQL.

  • Le connecteur ne réplique pas les valeurs individuelles supérieures à 16 MB. Par défaut, le traitement d’une telle valeur entraîne le marquage définitif de l’échec de la table associée. Pour éviter les échecs de table, modifiez le paramètre de destination Stratégie de valeur surdimensionnée.

  • Le connecteur ne réplique pas les tables dont les données dépassent les limitations de types de Snowflake. Les colonnes de type date et heure contenant des valeurs hors limites constituent une exception à cette règle. Pour plus d’informations, consultez Prise en charge des valeurs hors limites.

  • Le connecteur exige que chaque table répliquée ait une clé primaire et que l’identité de la réplique de la table soit la même que la clé primaire.

  • Le connecteur prend en charge les modifications du schéma de la table source, à l’exception de la modification des définitions des clés primaires, de la précision ou de l’échelle d’une colonne numérique.

  • Le connecteur ne prend pas en charge le nouvel ajout d’une colonne après sa suppression.

Note

Les limites affectant certaines colonnes de la table peuvent être contournées en excluant ces colonnes spécifiques de la réplication.

Workflow

  1. Un administrateur de la base de données configure les paramètres de réplication PostgreSQL, crée une publication et des identifiants pour le connecteur. En option, il délivre le certificat SSL.

  2. Un administrateur de compte Snowflake effectue les tâches suivantes :

    1. Crée un utilisateur de service pour le connecteur, un entrepôt pour le connecteur et une base de données de destination pour la réplication.

    2. Installe le connecteur.

    3. Spécifie les paramètres requis pour le modèle de flux.

    4. Gère le flux. Le connecteur effectue les tâches suivantes lorsqu’il est exécuté dans Openflow :

      1. Crée un schéma pour les tables du journal.

      2. Crée les schémas et les tables de destination correspondant aux tables sources configurées pour la réplication.

      3. Démarre la réplication en suivant le cycle de vie de la réplication de la table.

Fonctionnement du connecteur

Les sections suivantes décrivent comment le connecteur fonctionne dans différents scénarios, notamment la réplication, les modifications de schéma et la conservation des données.

Comment les tables sont-elles répliquées ?

  1. Introspection du schéma : le connecteur découvre les colonnes de la table source, leurs noms, leurs types, puis les valide par rapport aux limites de Snowflake et du connecteur. Les échecs de validation entraînent l’échec de cette préparation et le cycle s’achève. Une fois l’introspection du schéma correctement terminée, le connecteur crée une table de destination vide.

  2. Chargement d’un instantané : le connecteur copie toutes les données disponibles dans la table source dans la table de destination. L’échec de cette préparation met fin au cycle et plus aucune donnée n’est répliquée. Une fois l’opération réussie, l’ensemble des données de la table source est disponible dans la table de destination.

  3. Chargement incrémentiel : le connecteur suit en permanence les modifications apportées à la table source et les copie dans la table de destination. Cette opération se poursuit jusqu’à ce que la table soit retirée de la réplication. Une mise en échec à ce stade arrête définitivement la réplication de la table source, jusqu’à ce que le problème soit résolu.

    Note

    Ce connecteur peut être configuré pour démarrer immédiatement la réplication des modifications incrémentielles pour les tables nouvellement ajoutées, en contournant la phase de chargement des instantanés. Cette option est souvent utile lorsque vous réinstallez le connecteur dans un compte où des données répliquées existent déjà et que vous souhaitez poursuivre la réplication sans avoir à refaire des instantanés des tables.

    Pour plus de détails sur le contournement du chargement des instantanés et l’utilisation du processus de chargement incrémentiel, consultez Réplication incrémentielle.

Important

Les défaillances intermédiaires, telles que les erreurs de connexion, n’empêchent pas la table d’être répliquée. Les défaillances permanentes, telles que les types de données non pris en charge, empêchent la table d’être répliquée. Si une défaillance permanente empêche une table d’être répliquée, supprimez cette table de la liste des tables répliquées. Après avoir résolu le problème à l’origine de l’échec, vous pouvez ajouter à nouveau la table à la liste des tables répliquées.

Prise en charge des valeurs TOASTed

Le connecteur prend en charge la réplication de tables avec les valeurs TOAST pour les colonnes de types : array, bytea, json, jsonb, text, varchar, xml.

Chaque fois que le connecteur rencontre une valeur TOASTed dans le flux CDC, il remplace un espace réservé par défaut de __previous_value_unchanged, formaté pour le type de colonne donné, et le stocke dans la table de journal. La requête MERGE prend alors en compte les valeurs d’espaces réservés, de sorte que la table de destination contienne toujours la dernière valeur non-TOASTed.

Prise en charge des valeurs hors limites

Le connecteur prend en charge la réplication de tables avec des colonnes de types date, timestamp et timestamptz contenant des valeurs hors limites. Si le connecteur rencontre une valeur hors limites dans le flux CDC, il remplace un espace réservé par défaut en fonction du type de la colonne.

Valeurs d’espaces réservés pour les valeurs hors limites

Type de colonne

Valeur d’espaces réservés

date

-9999-01-01 par 9999-12-31.

timestamp

0001-01-01 00:00:00 par 9999-12-31 23:59:59.999999999.

timestamptz

0001-01-01 00:00:00+00 par 9999-12-31 23:59:59.999999999+00.

Note

Les valeurs -Infinity et Infinity sont également remplacées par les espaces réservés respectifs pour les trois types.

Statut de réplication de la table

Les défaillances intermédiaires, telles que les erreurs de connexion, n’empêchent pas la réplication de la table. Toutefois, les défaillances permanentes, telles que les types de données non pris en charge, empêchent la réplication de la table.

Pour résoudre les problèmes de réplication ou vérifier qu’une table a été correctement supprimée du flux de réplication, consultez le magasin d’états des tables :

  1. Dans le canevas de l’environnement d’exécution Openflow, faites un clic droit sur un groupe de processeurs et choisissez Controller Services. Une table répertoriant les services du contrôleur s’affiche.

  2. Localisez la ligne intitulée Table State Store, cliquez sur le bouton More Trois points verticaux indiquant plus d'options sur le côté droit de la ligne, puis choisissez View State.

Une liste des tables et de leurs états actuels s’affiche. Renseignez le champ de recherche pour filtrer la liste par nom de table. Les états possibles sont :

  • NEW : La table est planifiée pour la réplication, mais la réplication n’a pas commencé.

  • SNAPSHOT_REPLICATION : Le connecteur copie des données existantes. Cet état s’affiche jusqu’à ce que tous les enregistrements soient stockés dans la table de destination.

  • INCREMENTAL_REPLICATION : Le connecteur réplique activement les modifications. Cet état s’affiche après la fin de la réplication des instantanés et continue de s’afficher indéfiniment jusqu’à ce qu’une table soit supprimée de la réplication ou que la réplication échoue.

  • FAILED : La réplication s’est définitivement arrêtée en raison d’une erreur.

Note

Le canevas de l’environnement d’exécution Openflow n’affiche pas les modifications de l’état de la table, mais uniquement l’état actuel de la table. Toutefois, les modifications de l’état des tables sont enregistrées dans les journaux lorsqu’elles se produisent. Recherchez le message de journal suivant :

Replication state for table <database_name>.<schema_name>.<table_name> changed from <old_state> to <new_state>
Copy

Si une défaillance permanente empêche la réplication d’une table, supprimez la table de la réplication. Après avoir résolu le problème à l’origine de l’échec, vous pouvez ajouter à nouveau la table à la réplication. Pour plus d’informations, consultez Redémarrer la réplication des tables.

Comprendre la conservation des données

Le connecteur suit une logique de conservation des données où les données client ne sont jamais automatiquement supprimées. Vous conservez la propriété et le contrôle total de vos données répliquées, et le connecteur préserve les informations historiques au lieu de les supprimer définitivement.

Cette approche a les implications suivantes :

  • Les lignes supprimées de la table source sont supprimées de manière douce dans la table de destination plutôt que d’être supprimées physiquement.

  • Les colonnes supprimées de la table source sont renommées dans la table de destination au lieu d’être supprimées.

  • Les tables de journal sont conservées indéfiniment et ne sont pas automatiquement nettoyées.

Colonnes de métadonnées de la table de destination

Chaque table de destination comprend les colonnes de métadonnées suivantes qui suivent les informations de réplication :

Nom de la colonne

Type

Description

_SNOWFLAKE_INSERTED_AT

TIMESTAMP_NTZ

Horodatage du moment où la ligne a été initialement insérée dans la table de destination.

_SNOWFLAKE_UPDATED_AT

TIMESTAMP_NTZ

Horodatage de la dernière mise à jour de la ligne dans la table de destination.

_SNOWFLAKE_DELETED

BOOLEAN

Indique si la ligne a été supprimée de la table source. Lorsque``true``, la ligne a été supprimée de manière douce et n’existe plus dans la source.

Lignes supprimées de manière douce

Lorsqu’une ligne est supprimée de la table source, le connecteur ne la supprime pas physiquement de la table de destination. Au lieu de cela, la ligne est marquée comme supprimée en définissant la colonne de métadonnées _SNOWFLAKE_DELETED sur true.

Cette approche vous permet de :

  • Conserver les données historiques à des fins d’audit ou de conformité.

  • Interroger les enregistrements supprimés lorsque cela est nécessaire.

  • Décider quand et comment supprimer définitivement des données en fonction de vos besoins.

Pour interroger uniquement les lignes actives (non supprimées), filtrez sur la colonne _SNOWFLAKE_DELETED :

SELECT * FROM my_table WHERE _SNOWFLAKE_DELETED = FALSE;
Copy

Pour interroger les lignes supprimées :

SELECT * FROM my_table WHERE _SNOWFLAKE_DELETED = TRUE;
Copy

Colonnes détruites

Lorsqu’une colonne est supprimée de la table source, le connecteur ne supprime pas la colonne correspondante de la table de destination. Au lieu de cela, la colonne est renommée en ajoutant le suffixe __SNOWFLAKE_DELETED pour préserver les valeurs historiques.

Par exemple, si une colonne nommée EMAIL est supprimée de la table source, elle est renommée en``EMAIL__SNOWFLAKE_DELETED`` dans la table de destination. Les lignes qui existaient avant la suppression de la colonne conservent leurs valeurs d’origine, tandis que les lignes ajoutées après la suppression affichent NULL dans cette colonne.

Vous pouvez toujours interroger les valeurs historiques à partir de la colonne renommée :

SELECT EMAIL__SNOWFLAKE_DELETED FROM my_table;
Copy

Colonnes renommées

En raison de limitations dans les mécanismes CDC (Change Data Capture), le connecteur ne peut pas faire la distinction entre une colonne renommée et une colonne supprimée, suivie de l’ajout d’une nouvelle colonne. Par conséquent, lorsque vous renommez une colonne dans la table source, le connecteur traite cela comme deux opérations distinctes : supprimer la colonne d’origine et ajouter une nouvelle colonne avec le nouveau nom.

Par exemple, si vous renommez une colonne A en B dans la table source, la table de destination contiendra :

  • A__SNOWFLAKE_DELETED : Contient les valeurs d’avant le changement de nom. Les lignes ajoutées après le changement de nom affichent NULL dans cette colonne.

  • B : Contient les valeurs d’après le changement de nom. Les lignes qui existaient avant le changement de nom affichent NULL dans cette colonne.

Interrogation de colonnes renommées

Pour récupérer les données des colonnes d’origine et des colonnes renommées en une seule colonne unifiée, utilisez une expression COALESCE ou CASE :

SELECT
    COALESCE(B, A__SNOWFLAKE_DELETED) AS A_RENAMED_TO_B
FROM my_table;
Copy

Vous pouvez également utiliser une expression CASE :

SELECT
    CASE
        WHEN B IS NOT NULL THEN B
        ELSE A__SNOWFLAKE_DELETED
    END AS A_RENAMED_TO_B
FROM my_table;
Copy

Création d’une vue pour des colonnes renommées

Plutôt que de modifier manuellement la table de destination, vous pouvez créer une vue qui présente la colonne renommée comme une seule colonne unifiée. Cette approche est recommandée, car elle préserve les données d’origine et évite les problèmes potentiels liés à la réplication en cours.

CREATE VIEW my_table_unified AS
SELECT
    *,
    COALESCE(B, A__SNOWFLAKE_DELETED) AS A_RENAMED_TO_B
FROM my_table;
Copy

Important

La modification manuelle de la structure de la table de destination (par exemple, la suppression ou le renommage de colonnes) n’est pas recommandée, car elle peut interférer avec la réplication en cours et entraîner des incohérences de données.

Tables de journal

Lors de la réplication incrémentielle, les modifications de la base de données source sont d’abord écrites dans les tables de journal avant d’être fusionnées dans les tables de destination. Le connecteur ne supprime pas automatiquement les données des tables de journal, car ces données peuvent être utiles à des fins d’audit, de débogage ou de retraitement.

Les tables de journal sont créées dans le même schéma que les tables de destination correspondantes et suivent cette convention de dénomination :

<TABLE_NAME>_JOURNAL_<timestamp>_<number>

Où :

  • <TABLE_NAME> est le nom de la table de destination.

  • <timestamp> est l’horodatage de création au format d’époque Unix (secondes depuis le 1er janvier 1970), garantissant l’unicité.

  • <number> commence à 1 et incrémente chaque fois que le schéma de la table de destination change, soit en raison de modifications de schéma dans la table source, soit en raison de modifications des filtres de colonnes.

Par exemple, si votre table de destination est SALES.ORDERS, la table du journal peut être nommée``SALES.ORDERS_JOURNAL_1705320000_1``.

Important

Ne supprimez pas les tables de journal pendant la réplication en cours. La suppression d’une table de journal active peut entraîner des pertes de données ou des échecs de réplication. Ne supprimez les tables de journal qu’une fois que la table source correspondante a été entièrement retirée de la réplication.

Gestion du stockage des tables de journal

Si vous devez gérer les coûts de stockage en supprimant les anciennes données du journal, vous pouvez créer une tâche Snowflake qui nettoie périodiquement les tables de journal pour les tables qui ne sont plus répliquées.

Avant d’implémenter le nettoyage du journal, vérifiez que :

  • Les tables sources correspondantes ont été entièrement retirées de la réplication.

  • Vous n’avez plus besoin des données du journal à des fins d’audit ou de traitement.

Pour plus d’informations sur la création et la gestion de tâches pour le nettoyage automatisé, consultez Introduction aux tâches.

Prochaines étapes

Examinez Openflow Connector for PostgreSQL : Mappage de données pour comprendre comment le connecteur fait correspondre les types de données aux types de données Snowflake. Examinez Paramétrez Openflow Connector for PostgreSQL pour configurer le connecteur.