Einrichten von Openflow Connector for PostgreSQL

Unter diesem Thema werden die Schritte zur Einrichtung von Openflow Connector for PostgreSQL beschrieben.

Bemerkung

This connector can be configured to immediately start replicating incremental changes for newly added tables, bypassing the snapshot load phase. This option is often useful when reinstalling the connector in an account where previously replicated data exists and you want to continue replication without having to re-snapshot tables.

For details on the incremental load process, see Incremental replication.

Voraussetzungen

  1. Stellen Sie sicher, dass Sie Allgemeine Informationen zu Openflow Connector for PostgreSQL gelesen haben.

  2. Vergewissern Sie sich, dass Sie die unterstützten PostgreSQL-Versionen überprüft haben.

  3. Empfohlen: Stellen Sie sicher, dass Sie nur eine Konnektor-Instanz pro Laufzeitumgebung hinzufügen.

  4. Ensure that you have Openflow einrichten – BYOC or Set up Openflow - Snowflake Deployments.

  5. If using Openflow - Snowflake Deployments, ensure that you’ve reviewed configuring required domains and have granted access to the required domains for the PostgreSQL connector.

  6. Führen Sie als Datenbankadministrator die folgenden Aufgaben aus:

    1. Wal_level konfigurieren

    2. Eine Publikation erstellen

    3. Stellen Sie sicher, dass auf Ihrem PostgreSQL-Server genügend Speicherplatz für die WAL vorhanden ist. Das liegt daran, dass ein Replikationsslot nach der Erstellung dafür sorgt, dass PostgreSQL die Daten aus der Position des Replikationsslots beibehält, bis der Konnektor diese Position bestätigt und weiterleitet.

    4. Stellen Sie sicher, dass jede für die Replikation aktivierte Tabelle einen Primärschlüssel hat. Der Schlüssel kann eine einzelne oder eine zusammengesetzte Spalte sein.

    5. Setzen Sie die REPLICA IDENTITY von Tabellen auf DEFAULT. Dadurch wird sichergestellt, dass die Primärschlüssel in der WAL dargestellt werden und der Konnektor sie lesen kann.

    6. Erstellen Sie einen Benutzer für den Connector. Der Konnektor benötigt einen Benutzer mit dem REPLICATION-Attribut und der SELECT-Berechtigung aus jeder replizierten Tabelle. Erstellen Sie diesen Benutzer mit einem Kennwort, das Sie in die Konfiguration des Konnektors eingeben müssen. Weitere Informationen zur Replikationssicherheit finden Sie unter Sicherheit.

  7. Als Snowflake-Kontoadministrator führen Sie die folgenden Aufgaben aus:

    1. Erstellen Sie einen Snowflake Benutzer mit dem Typ als SERVICE. Erstellen Sie eine Datenbank, um die replizierten Daten zu speichern, und richten Sie Berechtigungen für den Snowflake-Benutzer ein, um Objekte in dieser Datenbank zu erstellen, indem Sie die Berechtigungen USAGE und CREATE SCHEMA erteilen.

      CREATE DATABASE <destination_database>;
      CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow';
      CREATE ROLE <openflow_role>;
      GRANT ROLE <openflow_role> TO USER <openflow_user>;
      GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>;
      GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
      CREATE WAREHOUSE <openflow_warehouse>
        WITH
          WAREHOUSE_SIZE = 'MEDIUM'
          AUTO_SUSPEND = 300
          AUTO_RESUME = TRUE;
      GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
      
      Copy
    2. Erstellen Sie ein Paar sicherer Schlüssel (öffentlich und privat). Speichern Sie den privaten Schlüssel für den Benutzer in einer Datei, die Sie für die Konfiguration des Konnektors verwenden können. Weisen Sie dem Snowflake Service-Benutzer den öffentlichen Schlüssel zu.

      ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
      
      Copy

      Weitere Informationen finden Sie unter Schlüsselpaar-Authentifizierung.

    3. Geben Sie ein Warehouse an, das der Konnektor verwenden soll. Beginnen Sie mit dem Warehouse-Größe MEDIUM. Experimentieren Sie dann mit der Größe abhängig von der Menge der zu replizierenden Tabellen und der übertragenen Datenmenge. Eine große Anzahl von Tabellen skaliert normalerweise besser mit Multi-Cluster-Warehouses und nicht anhand der Warehouse-Größe.

Wal_level konfigurieren

Openflow Connector for PostgreSQL erfordert, dass wal_level auf logical gesetzt wird.

Je nachdem, wo Ihr PostgreSQL-Server gehostet wird, können Sie den wal_level wie folgt konfigurieren:

Vor Ort

Führen Sie die folgende Abfrage mit dem Superuser oder einem Benutzer mit der Berechtigung ALTER SYSTEM aus:

ALTER SYSTEM SET wal_level = logical;
Copy

RDS

Dem vom Agenten verwendeten Benutzer muss die Rolle rds_superuser oder rds_replication zugewiesen werden.

Sie müssen auch Einstellungen vornehmen:

  • Statischer rds.logical_replication-Parameter auf 1.

  • Parameter max_replication_slots, max_connections und max_wal_senders entsprechend Ihrer Datenbank- und Replikationseinrichtung.

AWS Aurora

Setzen Sie den statischen Parameter rds.logical_replication auf 1.

GCP

Setzen Sie die folgenden Flags:

  • cloudsql.logical_decoding=on.

  • cloudsql.enable_pglogical=on.

Weitere Informationen finden Sie unter Google Cloud Dokumentation.

Azure

Stellen Sie die Unterstützung der Replikation auf Logical ein. Weitere Informationen finden Sie unter Azure-Dokumentation.

Eine Publikation erstellen

Openflow Connector for PostgreSQL erfordert, dass eine -Veröffentlichung erstellt und in PostgreSQL konfiguriert wird, bevor die Replikation beginnt. Sie können es für alle oder eine Teilmenge von Tabellen sowie für bestimmte Tabellen mit nur bestimmten Spalten erstellen. Vergewissern Sie sich, dass alle Tabellen und Spalten, die Sie replizieren möchten, in der Veröffentlichung enthalten sind. Sie können die Veröffentlichung auch später ändern, während der Konnektor läuft. Um eine Publikation zu erstellen und zu konfigurieren, gehen Sie wie folgt vor:

  1. Melden Sie sich als Benutzer mit der an CREATE-Berechtigung für die Datenbank und führen Sie die folgende Abfrage aus:

    • Für PostgreSQL 13 und höher:

      CREATE PUBLICATION <publication name> WITH (publish_via_partition_root = true);
      
      Copy

      publish_via_partition_root wird zusätzlich für eine korrekte Replikation von partitionierten Tabellen benötigt. Weitere Informationen zur Datenaufnahme von partitionierten Tabellen finden Sie unter Replizieren einer partitionierten Tabelle.

    • Für PostgreSQL-Versionen vor 13:

      CREATE PUBLICATION <publication name>;
      
      Copy
  2. Definieren Sie Tabellen, die der Datenbankagent sehen kann:

ALTER PUBLICATION <publication name> ADD TABLE <table name>;
Copy

Bei partitionierten Tabellen reicht es aus, nur die Stammpartitionstabelle zur Publikation hinzuzufügen. Weitere Details dazu finden Sie unter Replizieren einer partitionierten Tabelle.

Wichtig

PostgreSQL 15 und später unterstützt die Konfiguration von Veröffentlichungen für eine bestimmte Untergruppe von Tabellenspalten. Damit der Konnektor dies korrekt unterstützt, müssen Sie die Spaltenfiltereinstellungen verwenden, um die gleichen Spalten einzuschließen, die auch in der Veröffentlichung eingestellt sind.

Ohne diese Einstellung verhält sich der Konnektor wie folgt:

  • In der Zieltabelle werden die Spalten, die nicht im Filter enthalten sind, mit dem Suffix __DELETED versehen. Alle Daten, die während der Snapshot-Phase repliziert wurden, werden beibehalten.

  • Nachdem Sie der Veröffentlichung neue Spalten hinzugefügt haben, schlägt die Tabelle dauerhaft fehl und Sie müssen ihre Replikation neu starten.

Weitere Informationen dazu finden Sie unter ALTER PUBLICATION.

Konnektor installieren

  1. Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.

  2. Suchen Sie auf der Seite Openflow-Konnektoren den Konnektor und wählen Sie Add to runtime.

  3. In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.

    Bemerkung

    Bevor Sie den Konnektor installieren, stellen Sie sicher, dass Sie in Snowflake eine Datenbank und ein Schema für den Konnektor erstellt haben, in dem die aufgenommenen Daten gespeichert werden.

  4. Authentifizieren Sie sich bei der Bereitstellung mit den Anmeldedaten Ihres Snowflake-Kontos und wählen Sie Allow, wenn Sie dazu aufgefordert werden, damit die Laufzeitanwendung auf Ihr Snowflake-Konto zugreifen kann. Die Installation des Konnektors nimmt einige Minuten in Anspruch.

  5. Authentifizieren Sie sich bei der Laufzeit mit den Anmeldeinformationen Ihres Snowflake-Kontos.

Das Openflow-Canvas wird mit der hinzugefügten Prozessgruppe des Konnektors angezeigt.

Konnektor konfigurieren

Sie können den Konnektor für die folgenden Anwendungsfälle konfigurieren:

Replizieren Sie eine Reihe von Tabellen in Echtzeit

  1. Klicken Sie mit der rechten Maustaste auf die importierte Prozessgruppe und wählen Sie Parameters.

  2. Geben Sie die erforderlichen Parameterwerte ein, wie unter Ablaufparameter beschrieben.

Ablaufparameter

Beginnen Sie mit der Einstellung der Parameter des PostgreSQL-Quellparameterkontext, dann den PostgreSQL-Kontext der Zielparameter. Sobald dies erfolgt ist, können Sie den Konnektor aktivieren, und dieser sollte eine Verbindung mit PostgreSQL und Snowflake herstellen und die Ausführung starten. Es werden jedoch keine Daten repliziert, bis explizit Tabellen zu seiner Konfiguration hinzugefügt wurden.

Um bestimmte Tabellen für die Replikation zu konfigurieren, bearbeiten Sie die Aufnahmeparameter für PostgreSQL. Kurz nachdem Sie die Änderungen an den Replikationsparametern vorgenommen haben, wird die Konfiguration vom Konnektor übernommen und der Replikationslebenszyklus beginnt für jede Tabelle.

Quellsystemparameter für PostgreSQL

Parameter

Beschreibung

PostgreSQL Verbindungs-URL

Die vollständige JDBC URL zur Quelldatenbank. Beispiel: jdbc:postgresql://example.com:5432/public

Wenn Sie eine Verbindung zum PostgreSQL Replikatserver herstellen, siehe Replizieren von Tabellen von einem PostgreSQL Replikatserver.

PostgreSQL-JDBC-Treiber

Der Pfad zur PostgreSQL JDBC-Treiber-Jar-Datei. Laden Sie die JAR-Datei von der Website herunter und aktivieren Sie dann das Kontrollkästchen Reference asset, um sie hochzuladen und anzuhängen.

PostgreSQL Benutzername

Der Benutzername für den Konnektor.

PostgreSQL Kennwort

Das Kennwort für den Konnektor.

Veröffentlichungsname

Der Name der Veröffentlichung, die Sie zuvor erstellt haben.

Replication Slot Name

Optional. When no value is provided, the connector will create a new, uniquely-named slot. When given a value, the connector will use the existing slot, or create a new one with the provided name.

Changing the value for a running connector will restart reading the incremental change data capture (CDC) stream from the updated slot’s position.

Zielsystemparameter für PostgreSQL

Parameter

Beschreibung

Erforderlich

Destination Database

Die Datenbank, in der die Daten als persistent gespeichert werden. Muss bereits in Snowflake vorhanden sein. Beim Namen wird zwischen Groß- und Kleinschreibung unterschieden. Bei Bezeichnern ohne Anführungszeichen geben Sie den Namen in Großbuchstaben an.

Ja

Snowflake Authentication Strategy

Bei Verwendung von:

  • Snowflake Openflow Deployment or BYOC: Use SNOWFLAKE_SESSION_TOKEN. This token is managed automatically by Snowflake. BYOC deployments must have previously configured runtime roles to use SNOWFLAKE_SESSION_TOKEN.

  • BYOC: Alternatively BYOC can use KEY_PAIR as the value for authentication strategy.

Ja

Snowflake Account Identifier

Bei Verwendung von:

  • Strategie für die Authentifizierung mit Sitzungstoken: Muss leer sein.

  • KEY_PAIR: Snowflake-Kontoname im Format [Organisationsname]-[Kontoname], wobei die Daten persistent gespeichert werden.

Ja

Snowflake Private Key

Bei Verwendung von:

  • Strategie für die Authentifizierung mit Sitzungstoken: Muss leer sein.

  • KEY_PAIR$RSA Muss der RSA private Schlüssel sein, der für die Authentifizierung verwendet wird.

    Der RSA-Schlüssel muss entsprechend den PKCS8-Standards formatiert sein und standardmäßige PEM-Header und Footer haben. Beachten Sie, dass entweder eine private Snowflake-Schlüsseldatei oder ein privater Snowflake-Schlüssel definiert werden muss.

Nein

Snowflake Private Key File

Bei Verwendung von:

  • Strategie für die Authentifizierung mit Sitzungstoken: Die private Schlüsseldatei muss leer sein.

  • KEY_PAIR$RSA Laden Sie die Datei hoch, die den privaten RSA-Schlüssel enthält, der für die Authentifizierung bei Snowflake verwendet wird und gemäß PKCS8-Standards und einschließlich standardmäßiger -----BEGIN PRIVATE-Header und und -Footer formatiert ist. Der Header beginnt mit -----BEGIN PRIVATE. Um die private Schlüsseldatei hochzuladen, wählen Sie das Kontrollkästchen Reference asset aus.

Nein

Snowflake Private Key Password

Bei Verwendung von

  • Strategie für die Authentifizierung mit Sitzungstoken: Muss leer sein.

  • KEY_PAIR: Geben Sie das Kennwort an, das mit der privaten Snowflake-Schlüsseldatei verbunden ist.

Nein

Snowflake Role

Bei Verwendung von

  • Strategie für die Authentifizierung mit Sitzungstoken: Verwenden Sie Ihre Laufzeitrolle. Sie finden Ihre Laufzeitrolle in der Openflow-UI, indem Sie zu View Details für Ihre Laufzeitumgebung navigieren.

  • KEY_PAIR Authentifizierungsstrategie: Verwenden Sie eine gültige Rolle, die für Ihren Dienstbenutzer konfiguriert ist.

Ja

Snowflake-Benutzername

Bei Verwendung von

  • Strategie für die Authentifizierung mit Sitzungstoken: Muss leer sein.

  • KEY_PAIR: Geben Sie den Benutzernamen an, der für die Verbindung mit der Snowflake-Instanz verwendet wird.

Ja

Snowflake Warehouse

Snowflake Warehouse, das für die Ausführung von Abfragen verwendet wird.

Ja

Aufnahmeparameter für PostgreSQL

Parameter

Beschreibung

Included Table Names

Eine durch Kommas getrennte Liste von Tabellenpfaden, einschließlich ihrer Schemas. Beispiel: public.my_table, other_schema.other_table.

Wählen Sie Tabellen entweder nach Name oder nach Regex aus. Wenn Sie beides verwenden, werden alle übereinstimmenden Tabellen aus beiden Optionen aufgenommen.

Tabellen, die Unterpartitionen sind, sind immer von der Erfassung ausgeschlossen. Weitere Informationen dazu finden Sie unter Replizieren einer partitionierten Tabelle.

Included Table Regex

Ein regulärer Ausdruck zum Abgleich mit Tabellenpfaden. Jeder Pfad, der mit dem Ausdruck übereinstimmt, wird repliziert, und neue Tabellen, die dem Muster entsprechen und später erstellt werden, werden ebenfalls automatisch einbezogen. Beispiel: public\.auto_.*

Wählen Sie Tabellen entweder nach Name oder nach Regex aus. Wenn Sie beides verwenden, werden alle übereinstimmenden Tabellen aus beiden Optionen aufgenommen.

Tabellen, die Unterpartitionen sind, sind immer von der Erfassung ausgeschlossen. Weitere Informationen dazu finden Sie unter Replizieren einer partitionierten Tabelle.

Spaltenfilter JSON

Optional. Ein JSON mit einer Liste vollqualifizierter Tabellennamen und einem Regex-Muster für Spaltennamen, die in die Replikation einbezogen werden sollen. Beispiel: [ {"schema":"public", "table":"table1", "includedPattern":".*name"} ] enthält alle Spalten, die mit name in table1 aus dem Schema public enden.

Merge Task Schedule CRON

CRON-Ausdruck, der Zeiträume definiert, in denen Zusammenführungsoperationen vom Journal zur Zieltabelle ausgelöst werden. Setzen Sie ihn auf * * * * * ?, wenn Sie eine kontinuierliche Zusammenführung oder keinen Zeitplan zur Begrenzung der Warehouse-Laufzeit wünschen.

Beispiel:

  • Die Zeichenfolge * 0 * * * ? gibt an, dass Sie Zusammenführungen zu jeder vollen Stunde für eine Minute planen möchten.

  • Die Zeichenfolge * 20 14 ? * MON-FRI gibt an, dass Sie Zusammenführungen um 2:20 PM jeden Montag bis Freitag planen möchten.

Weitere Informationen und Beispiele finden Sie in der Anleitung zu Cron-Triggern in der Quartz-Dokumentation

Auflösung des Objektbezeichners

Specifies how source object identifiers such as the names of schemas, tables, and columns are stored and queried in Snowflake. This setting specifies that you must use double quotes in SQL queries.

Option 1: Standard; Groß-/Kleinschreibung wird berücksichtigt. Für Rückwärtskompatibilität.

  • Transformation: Die Groß-/Kleinschreibung bleibt erhalten. Beispiel: My_Table bleibt My_Table.

  • Abfragen: SQL-Abfragen müssen doppelte Anführungszeichen verwenden, um der genauen Schreibweise von Datenbankobjekten zu entsprechen. Beispiel: SELECT * FROM "My_Table";.

Bemerkung

Snowflake recommends using this option if you must preserve source casing for legacy or compatibility reasons. For example, if the source database includes table names that differ in case only–such as MY_TABLE and my_table–that would result in a name collision when using when using case-insensitive comparisons.

Option 2: Empfohlen; Groß- und Kleinschreibung nicht relevant.

  • Transformation: Alle Bezeichner werden in Großbuchstaben umgewandelt. Beispiel: My_Table wird zu MY_TABLE.

  • Queries: SQL queries are case-insensitive and don’t require SQL double quotes. For example, SELECT * FROM my_table; returns the same results as SELECT * FROM MY_TABLE;.

Bemerkung

Snowflake empfiehlt die Verwendung dieser Option, wenn Datenbankobjekte keine Namen mit gemischter Groß-/Kleinschreibung haben.

Wichtig

Do not change this setting after the connector has begun ingesting data. Changing this setting after ingestion has begun breaks the existing ingestion. If you must change this setting, create a new connector instance.

Replizieren von Tabellen von einem PostgreSQL Replikatserver

Der Konnektor kann Daten von einem primären Server, einem Hot-Standby-Replikat oder einem Abonnentenserver mit logischer Replikation erfassen. Bevor Sie den Konnektor für die Verbindung mit einem PostgreSQL Replikat konfigurieren, stellen Sie sicher, dass die Replikation zwischen Primär- und Replikatknoten korrekt funktioniert. Wenn Sie Probleme mit fehlenden Daten im Konnektor untersuchen, stellen Sie zunächst sicher, dass auf dem vom Konnektor verwendeten Server fehlende Zeilen vorhanden sind.

Zusätzliche Hinweise beim Herstellen einer Verbindung mit einem Standby-Replikat:

  • Es wird nur die Verbindung zu einem Hot-Standby-Replikat unterstützt. Beachten Sie, dass Snowflake-Standardreplikate erst dann Verbindungen von Clients akzeptieren können, wenn diese zu einer Primärinstanz heraufgestuft wurden.

  • PostgreSQL-Version des Servers muss >= 16 sein.

  • Die Veröffentlichung, die vom Konnektor benötigt wird, muss auf dem primären Server erstellt werden, nicht auf dem Standby-Server. Der Standby-Server ist schreibgeschützt und erlaubt das Erstellen einer Veröffentlichung nicht.

Wenn Sie eine Verbindung zu einer Hot-Standby-Instanz herstellen und eine Zeitüberschreitung bei Es wird versucht, den Replikations-Slot zu erstellen<replication slot> sehen. Wenn Sie eine Verbindung zu einer Standby-Instanz herstellen, stellen Sie sicher, dass auf der primären PostgreSQL-Instanz etwas Datenverkehr vorhanden ist, da der Aufruf zum Erstellen eines Replikation-Slots sonst nie zurückgegeben wird. Fehler im Openflow-Bundle oder der Read PostgreSQL CDC Stream-Prozessor startet nicht, melden Sie sich bei der primären PostgreSQL-Instanz an und führen Sie die folgende Abfrage aus:

SELECT pg_log_standby_snapshot();
Copy

Der Fehler tritt auf, wenn es keine Datenänderungen auf dem primären Server gibt. So kann der Konnektor installiert werden, während ein Replikations-Slot auf dem Server erstellt wird. Das liegt daran, dass der Replikatserver Informationen über die Ausführung von Transaktionen vom Server benötigt, um einen Replikations-Slot erstellen zu können. Primärserver senden die Informationen nicht, wenn sie im Leerlauf sind. Die Funktion pg_log_standby_snapshot() zwingt den primären Server, Informationen über ausgeführte Transaktionen an den Replikatserver zu senden.

Entfernen und erneutes Hinzufügen einer Tabelle zur Replikation

Um eine Tabelle aus der Replikation zu entfernen, stellen Sie sicher, dass sie aus den Parametern Included Table Names oder Included Table Regex im Replikationsparameterkontext entfernt wird.

Wenn Sie die Tabelle später wieder zur Replikation hinzufügen möchten, löschen Sie zunächst die entsprechende Zieltabelle in Snowflake. Danach fügen Sie die Tabelle wieder zu den Parametern Included Table Names oder Included Table Regex hinzu. Dadurch wird sichergestellt, dass der Replikationsprozess für die Tabelle neu beginnt.

Diese Vorgehensweise kann auch zur Wiederherstellung nach einer fehlgeschlagenen Tabellenreplikation verwendet werden.

Replizieren einer Teilmenge von Spalten in einer Tabelle

Der Konnektor kann die replizierten Daten pro Tabelle auf eine Teilmenge der konfigurierten Spalten filtern.

Um Filter auf Spalten anzuwenden, ändern Sie die Eigenschaft „Column Filter“ im Replikationsparameterkontext und fügen Sie ein Array mit Konfigurationen hinzu, wobei Sie für jede Tabelle, auf die Sie einen Filter anwenden möchten, einen Eintrag hinzufügen.

Spalten können nach Name oder Muster einbezogen oder ausgeschlossen werden. Sie können eine einzelne Bedingung pro Tabelle anwenden oder mehrere Bedingungen kombinieren, wobei Ausschlüsse immer Vorrang vor Einbeziehungen haben.

Das folgende Beispiel zeigt die verfügbaren Felder. schema und table sind obligatorisch, und dann ist eines oder mehrere der Felder included, excluded, includedPattern, excludedPattern erforderlich.

[
    {
        "schema": "<source table schema>",
        "table" : "<source table name>",
        "included": ["<column name>", "<column name>"],
        "excluded": ["<column name>", "<column name>"],
        "includedPattern": "<regular expression>",
        "excludedPattern": "<regular expression>",
    }
]
Copy

Replizieren einer partitionierten Tabelle

Der Konnektor unterstützt die Replikation von partitionierten Tabellen für PostgreSQL-Server mit Version >= 15. Eine partitionierte PostgreSQL-Tabelle wird in Snowflake als eine einzige Zieltabelle repliziert.

Beispiel: Sie haben eine partitionierte Tabelle orders mit den Unterpartitionen orders_2023, orders_2024 und haben den Konnektor so konfiguriert, dass er alle Tabellen einliest, die mit dem Muster orders.* übereinstimmen. Dann wird nur die orders Tabelle in Snowflake repliziert und enthält Daten aus allen Unterpartitionen.

Um die Replikation von partitionierten Tabellen zu unterstützen, stellen Sie sicher, dass für die Veröffentlichung, die in PostgreSQL erstellt wurde, die Option``publish_via_partition_root`` auf true gesetzt ist.

Die Erfassung von partitionierten Tabellen unterliegt derzeit den folgenden Einschränkungen:

  • Wenn eine Tabelle nach dem Beginn der Datenaufnahme als Partition an eine partitionierte Tabelle angehängt wird, ruft der Konnektor keine Daten ab, die vor dem Anhängen in der Partitionstabelle vorhanden waren.

  • Wenn eine Unterpartitionstabelle nach Beginn der Datenaufnahme von der partitionierten Tabelle getrennt wird, markiert der Konnektor die Daten aus dieser Unterpartition in der Stammpartitionstabelle nicht als gelöscht.

  • Bei einem Abschneiden in Unterpartitionen werden die betroffenen Datensätze nicht als gelöscht markiert.

Verfolgen von Datenänderungen in Tabellen

Der Konnektor repliziert nicht nur den aktuellen Zustand der Daten aus den Quelltabellen, sondern auch jeden Zustand jeder Zeile aus jedem Änderungssatz. Diese Daten werden in Journaltabellen gespeichert, die in demselben Schema wie die Zieltabelle erstellt wurden.

Die Journaltabellennamen haben folgendes Format: <source table name>_JOURNAL_<timestamp>_<schema generation> where <timestamp> is the value of epoch seconds when the source table was added to replication, and <schema generation> ist eine ganze Zahl, die mit jeder Schemaänderung in der Quelltabelle erhöht wird. Infolgedessen haben Quelltabellen, die Schemaänderungen unterliegen, mehrere Journaltabellen.

Wenn eine Tabelle aus der Replikation entfernt und dann wieder hinzugefügt wird, ändert sich der Wert von <timestamp> und <schema generation> beginnt wieder bei 1.

Wichtig

Snowflake empfiehlt, die Struktur von Journaltabellen in keiner Weise zu verändern. Sie werden vom Konnektor verwendet, um die Zieltabelle im Rahmen der Replikation zu aktualisieren.

Der Konnektor löscht nie Journaltabellen, sondern verwendet das neueste Journal für jede replizierte Quelltabelle und liest nur Nur-Anfügen-Streams über Journale. Um den Speicher wieder freizugeben, können Sie Folgendes tun:

  • Sie können alle Journaltabellen jederzeit kürzen.

  • Löschen Sie die Journaltabellen, die sich auf Quelltabellen beziehen, die aus der Replikation entfernt wurden.

  • Löschen Sie alle Journaltabellen bis auf die neueste Generation aktiv replizierter Tabellen.

Wenn Ihr Konnektor beispielsweise so eingestellt ist, dass er die Quelltabelle orders aktiv repliziert, und Sie zuvor die Tabelle customers aus der Replikation entfernt haben, haben Sie möglicherweise die folgenden Journaltabellen. In diesem Fall können Sie alle außer orders_5678_2 löschen.

customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2

Planung von Zusammenführungsaufgaben konfigurieren

Der Konnektor verwendet ein Warehouse, um Daten aus der Änderungsdatenerfassung (CDC) in Zieltabellen zusammenzuführen. Diese Operation wird durch den Prozessor MergeSnowflakeJournalTable ausgelöst. Wenn es keine neuen Änderungen gibt oder wenn keine neuen FlowFiles in der MergeSnowflakeJournalTable-Warteschlange warten, wird keine Zusammenführung ausgelöst und das Warehouse wird automatisch ausgesetzt.

Um die Warehouse-Kosten zu begrenzen und die Zusammenführungen nur auf die geplante Zeit zu beschränken, verwenden Sie den CRON-Ausdruck im Parameter „Merge task Schedule CRON“. Er drosselt die an den MergeSnowflakeJournalTable-Prozessor gelangenden FlowFiles und die Zusammenführung wird nur in einem bestimmten Zeitraum ausgelöst. Weitere Informationen zur Zeitplanung finden Sie unter Zeitplanungsstrategie.

Den Konnektor anhalten oder löschen

Wenn Sie den Konnektor anhalten oder entfernen, müssen Sie den Replikationsslot berücksichtigen, den der Konnektor verwendet.

Der Konnektor erstellt einen eigenen Replikationsslot mit einem Namen, der mit snowflake_connector_ beginnt, gefolgt von einem zufälligen Suffix. Wenn der Konnektor den Replikationsstrom liest, schiebt er den Slot weiter, sodass PostgreSQL sein WAL-Protokoll kürzen und Speicherplatz freigeben kann.

Wenn der Konnektor pausiert, wird der Slot nicht erweitert, und Änderungen an der Quelldatenbank erhöhen weiterhin die WAL-Protokollgröße. Sie sollten den Konnektor nicht für längere Zeit pausieren lassen, insbesondere nicht bei Datenbanken mit hohem Datenverkehr.

Wenn der Konnektor entfernt wird, sei es durch Löschen aus dem Openflow-Canvas oder auf andere Weise, z. B. durch Löschen der gesamten Openflow-Instanz, bleibt der Replikationsslot bestehen und muss manuell gelöscht werden.

Wenn Sie mehrere Konnektorinstanzen haben, die von derselben PostgreSQL-Datenbank replizieren, erstellt jede Instanz ihren eigenen, eindeutig benannten Replikationsslot. Wenn Sie einen Replikationsslot manuell löschen, vergewissern Sie sich, dass es sich um den richtigen handelt. Sie können sehen, welcher Replikationsslot von einer bestimmten Konnektorinstanz verwendet wird, indem Sie den Status des CaptureChangePostgreSQL-Prozessors überprüfen.

Führen Sie den Ablauf aus

  1. Klicken Sie mit der rechten Maustaste auf die Ebene, und wählen Sie Enable all Controller Services.

  2. Klicken Sie mit der rechten Maustaste auf die importierte Prozessgruppe und wählen Sie Start. Der Konnektor startet die Datenaufnahme.