Einrichten von Openflow Connector for SQL Server

Bemerkung

Der Konnektor unterliegt den Bedingungen für Konnektoren.

Unter diesem Thema werden die Schritte zur Einrichtung von Openflow Connector for SQL Server beschrieben.

Voraussetzungen

  1. Stellen Sie sicher, dass Sie Allgemeine Informationen zu Openflow Connector for SQL Server gelesen haben.

  2. Stellen Sie sicher, dass Sie Unterstützte SQL-Server-Versionen gelesen haben.

  3. Empfohlen: Stellen Sie sicher, dass Sie nur eine Konnektor-Instanz pro Laufzeitumgebung hinzufügen.

  4. Stellen Sie sicher, dass Sie Openflow einrichten – BYOC oder Openflow einrichten – Snowflake-Bereitstellung – Überblick zu Aufgaben haben.

  5. Führen Sie als Datenbankadministrator die folgenden Aufgaben aus:

    1. Aktivieren Sie die Änderungsverfolgung für die Datenbank und Tabellen. Der Konnektor erfordert, dass die Änderungsverfolgung für die Datenbank und die Tabellen aktiviert ist, bevor die Replikation beginnt. Stellen Sie sicher, dass für alle Tabellen, die Sie replizieren möchten, die Änderungsverfolgung aktiviert ist. Sie können auch die Änderungsverfolgung für weitere Tabellen aktivieren, während der Konnektor ausgeführt wird. Sehen den folgenden Codeausschnitt:

      ALTER DATABASE <database>
      SET CHANGE_TRACKING = ON
      (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON);
      
      ALTER TABLE <schema>.<table>
      ENABLE CHANGE_TRACKING
      WITH (TRACK_COLUMNS_UPDATED = ON);
      
      Copy
    2. Erstellen Sie einen Benutzer für den Connector. Der Konnektor benötigt einen Benutzer mit der Berechtigung VIEW CHANGE TRACKING für replizierte Tabellen. Geben Sie diesem Benutzer ein Kennwort für den Zugriff auf die Konfiguration des Konnektors.

      CREATE LOGIN <user_name> WITH PASSWORD = <password>;
      CREATE USER <user_name> FOR LOGIN <user_name>;
      GRANT SELECT ON <schema>.<table> TO <user_name>;
      GRANT VIEW CHANGE TRACKING ON <schema>.<table> TO <user_name>;
      
      Copy
    3. Verbinden Sie sich über SSL. Wenn Sie planen, eine SSL-Verbindung zum SQL-Server zu verwenden, bereiten Sie das Stammzertifikat für Ihren Datenbankserver vor. Es wird während der Konfiguration benötigt.

  6. Als Snowflake-Kontoadministrator führen Sie die folgenden Aufgaben aus:

    1. Erstellen Sie einen Snowflake Benutzer mit dem Typ als SERVICE. Erstellen Sie eine Datenbank, um die replizierten Daten zu speichern, und richten Sie Berechtigungen für den Snowflake-Benutzer ein, um Objekte in dieser Datenbank zu erstellen, indem Sie die Berechtigungen USAGE und CREATE SCHEMA erteilen.

      CREATE DATABASE <destination_database>;
      CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow';
      CREATE ROLE <openflow_role>;
      GRANT ROLE <openflow_role> TO USER <openflow_user>;
      GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>;
      GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
      CREATE WAREHOUSE <openflow_warehouse>
           WITH
               WAREHOUSE_SIZE = 'MEDIUM'
               AUTO_SUSPEND = 300
               AUTO_RESUME = TRUE;
      GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
      
      Copy
    2. Erstellen Sie ein Paar sicherer Schlüssel (öffentlich und privat). Speichern Sie den privaten Schlüssel des Benutzers in einer Datei, die Sie der Konfiguration des Konnektors zur Verfügung stellen. Weisen Sie den öffentlichen Schlüssel dem Benutzer des Snowflake-Dienstes zu:

      ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
      
      Copy

      Weitere Informationen dazu finden Sie unter Schlüsselpaar-Authentifizierung und Schlüsselpaar-Rotation.

    3. Bestimmen Sie ein Warehouse, das der Konnektor verwenden soll. Beginnen Sie mit der Größe des MEDIUM-Warehouse und experimentieren Sie dann mit der Größe in Abhängigkeit von der Anzahl der zu replizierenden Tabellen und der Menge der übertragenen Daten. Große Tabellenzahlen lassen sich in der Regel besser mit Multi-Cluster-Warehouses als mit der Warehouse-Größe.

Einrichten des Konnektors

Führen Sie als Data Engineer die folgenden Aufgaben aus, um den Konnektor zu konfigurieren:

Konnektor installieren

  1. Navigieren Sie zur Openflow-Übersichtsseite. Wählen Sie im Abschnitt Featured connectors die Option View more connectors aus.

  2. Suchen Sie auf der Seite Openflow-Konnektoren den Konnektor und wählen Sie Add to runtime.

  3. Wählen Sie im Dialog Select runtime Ihre Laufzeit aus der Dropdown-Liste Available runtimes aus.

  4. Wählen Sie Add aus.

    Bemerkung

    Bevor Sie den Konnektor installieren, stellen Sie sicher, dass Sie in Snowflake eine Datenbank und ein Schema für den Konnektor erstellt haben, in dem die aufgenommenen Daten gespeichert werden.

  5. Authentifizieren Sie sich bei der Bereitstellung mit den Anmeldedaten Ihres Snowflake-Kontos und wählen Sie Allow, wenn Sie dazu aufgefordert werden, damit die Laufzeitanwendung auf Ihr Snowflake-Konto zugreifen kann. Die Installation des Konnektors nimmt einige Minuten in Anspruch.

  6. Authentifizieren Sie sich bei der Laufzeit mit den Anmeldeinformationen Ihres Snowflake-Kontos.

Das Openflow-Canvas wird mit der hinzugefügten Prozessgruppe des Konnektors angezeigt.

Konnektor konfigurieren

Sie können den Konnektor für die folgenden Anwendungsfälle konfigurieren:

Replizieren Sie eine Reihe von Tabellen in Echtzeit

  1. Klicken Sie mit der rechten Maustaste auf die importierte Prozessgruppe und wählen Sie Parameters.

  2. Geben Sie die erforderlichen Parameterwerte ein, wie unter Ablaufparameter beschrieben.

Ablaufparameter

Beginnen Sie mit dem Festlegen der Quellsystemparameter für SQLServer, und legen Sie dann die Zielsystemparameter für SQLServer fest. Sobald dies erledigt ist, können Sie den Konnektor aktivieren. Der Konnektor muss sich sowohl mit dem SQLServer als auch mit Snowflake verbinden und dann ausgeführt werden. Der Konnektor repliziert jedoch erst dann Daten, wenn die zu replizierenden Tabellen explizit zu seiner Konfiguration hinzugefügt wurden.

Um bestimmte Tabellen für die Replikation zu konfigurieren, bearbeiten Sie die Aufnahmeparameter für SQLServer. Nachdem Sie die Änderungen am Kontext der Aufnahmeparameter für SQLServer vorgenommen haben, wird die Konfiguration vom Konnektor übernommen und der Replikationslebenszyklus für jede Tabelle gestartet.

Quellsystemparameter für SQLServer

Parameter

Beschreibung

SQL Server Connection URL

Die vollständige JDBC URL zur Quelldatenbank.

Beispiel:

  • jdbc:sqlserver://example.com:1433;encrypt=false;databaseName=<example_database>

SQL Server JDBC Driver

Aktivieren Sie das Reference asset-Kontrollkästchen, um den SQL Server JDBC-Treiber hochzuladen.

SQL Server SSL Modus

Aktiviert oder deaktiviert SSL Verbindungen.

SQL Server Root SSL Certificate

Der vollständige Inhalt des Stammzertifikats für die Datenbank. Optional, wenn SSL deaktiviert ist.

SQL Server Username

Der Benutzername für den Konnektor.

SQL Server Password

Das Kennwort für den Konnektor.

Zielsystemparameter für SQLServer

Parameter

Beschreibung

Erforderlich

Destination Database

Die Datenbank, in der die Daten persistiert werden. Sie muss bereits in Snowflake vorhanden sein

Ja

Snowflake Account Identifier

Bei Verwendung von:

  • Strategie für die Authentifizierung mit Sitzungstoken: Muss leer sein.

  • KEY_PAIR: Snowflake-Kontoname im Format [Organisationsname]-[Kontoname], wobei die Daten persistent gespeichert werden.

Ja

Snowflake Authentication Strategy

Bei Verwendung von:

  • Snowflake Openflow-Bereitstellung: Verwenden Sie SNOWFLAKE_SESSION_TOKEN. Dieses Token wird automatisch von Snowflake verwaltet.

  • BYOC: Verwenden Sie KEY_PAIR als Wert für die Authentifizierungsstrategie.

Ja

Snowflake Private Key

Bei Verwendung von:

  • Strategie für die Authentifizierung mit Sitzungstoken: Muss leer sein.

  • KEY_PAIR: Muss der RSA private Schlüssel sein, der für die Authentifizierung verwendet wird.

    Der RSA-Schlüssel muss entsprechend den PKCS8-Standards formatiert sein und standardmäßige PEM-Header und Footer haben. Beachten Sie, dass entweder die private Snowflake-Schlüsseldatei oder der private Snowflake-Schlüssel definiert werden muss.

Nein

Snowflake Private Key File

Bei Verwendung von:

  • Authentifizierungsstrategie für Sitzungstoken: Die Datei des privaten Schlüssels muss leer sein.

  • KEY_PAIR: Laden Sie die Datei hoch, die den RSA Private Key für die Authentifizierung bei Snowflake enthält, formatiert nach PKCS8-Standards und mit Standard-PEM-Header und -Footer. Die Header-Zeile beginnt mit -----BEGIN PRIVATE. Aktivieren Sie das Kontrollkästchen Reference asset, um die Private Key-Datei hochzuladen.

Nein

Snowflake Private Key Password

Bei Verwendung von:

  • Strategie für die Authentifizierung mit Sitzungstoken: Muss leer sein.

  • KEY_PAIR: Geben Sie das Kennwort an, das mit der privaten Snowflake-Schlüsseldatei verbunden ist.

Nein

Snowflake Role

Bei Verwendung von:

  • Strategie für die Authentifizierung mit Sitzungstoken: Verwenden Sie Ihre Laufzeitrolle. Sie finden Ihre Laufzeitrolle in der Openflow-UI, indem Sie zu View Details für Ihre Laufzeitumgebung navigieren.

  • KEY_PAIR Authentifizierungsstrategie: Verwenden Sie eine gültige Rolle, die für Ihren Dienstbenutzer konfiguriert ist.

Ja

Snowflake-Benutzername

Bei Verwendung von:

  • Strategie für die Authentifizierung mit Sitzungstoken: Muss leer sein.

  • KEY_PAIR: Geben Sie den Benutzernamen an, der für die Verbindung mit der Snowflake-Instanz verwendet wird.

Ja

Snowflake Warehouse

Snowflake Warehouse, das für die Ausführung von Abfragen verwendet wird.

Ja

Aufnahmeparameter für SQLServer

Parameter

Beschreibung

Included Table Names

Eine durch Kommas getrennte Liste von Tabellenpfaden, einschließlich ihrer Schemas. Beispiel: public.my_table, other_schema.other_table

Included Table Regex

Ein regulärer Ausdruck zum Abgleich mit Tabellenpfaden. Jeder Pfad, der mit dem Ausdruck übereinstimmt, wird repliziert, und neue Tabellen, die dem Muster entsprechen und später erstellt werden, werden ebenfalls automatisch eingeschlossen. Beispiel: public\.auto_.*

Filter JSON

Eine JSON-Datei mit einer Liste vollqualifizierter Tabellennamen und einem Regex-Muster für Spaltennamen, die in die Replikation einbezogen werden sollen. Beispiel: [ {"schema":"public", "table":"table1", "includedPattern":".*name"} ] enthält alle Spalten, die mit name in table1 aus dem Schema public enden.

Merge Task Schedule CRON

CRON-Ausdruck, der Zeiträume definiert, in denen Zusammenführungsoperationen vom Journal zur Zieltabelle ausgelöst werden. Setzen Sie ihn auf * * * * * ?, wenn Sie eine kontinuierliche Zusammenführung oder keinen Zeitplan zur Begrenzung der Warehouse-Laufzeit wünschen.

Beispiel:

  • Die Zeichenfolge * 0 * * * ? gibt an, dass Sie Zusammenführungen zu jeder vollen Stunde für eine Minute planen möchten.

  • Die Zeichenfolge * 20 14 ? * MON-FRI gibt an, dass Sie Zusammenführungen um 2:20 PM jeden Montag bis Freitag planen möchten.

Weitere Informationen und Beispiele finden Sie in der Anleitung zu Cron-Triggern in der Quartz-Dokumentation

Entfernen und erneutes Hinzufügen einer Tabelle zur Replikation

Um eine Tabelle aus der Replikation zu entfernen, stellen Sie sicher, dass sie aus den Parametern Included Table Names oder Included Table Regex im Replikationsparameterkontext entfernt wird.

Wenn Sie die Tabelle später wieder zur Replikation hinzufügen möchten, löschen Sie zunächst die entsprechende Zieltabelle in Snowflake. Danach fügen Sie die Tabelle wieder zu den Parametern Included Table Names oder Included Table Regex hinzu. Dadurch wird sichergestellt, dass der Replikationsprozess für die Tabelle neu beginnt.

Diese Vorgehensweise kann auch zur Wiederherstellung nach einer fehlgeschlagenen Tabellenreplikation verwendet werden.

Replizieren einer Teilmenge von Spalten in einer Tabelle

Der Konnektor kann die replizierten Daten pro Tabelle auf eine Teilmenge der konfigurierten Spalten filtern.

Um Filter auf Spalten anzuwenden, ändern Sie die Eigenschaft „Column Filter“ im Replikationsparameterkontext und fügen Sie ein Array mit Konfigurationen hinzu, wobei Sie für jede Tabelle, auf die Sie einen Filter anwenden möchten, einen Eintrag hinzufügen.

Spalten können nach Name oder Muster einbezogen oder ausgeschlossen werden. Sie können eine einzelne Bedingung pro Tabelle anwenden oder mehrere Bedingungen kombinieren, wobei Ausschlüsse immer Vorrang vor Einbeziehungen haben.

Das folgende Beispiel zeigt die Felder, die verfügbar sind. Die Felder schema und table sind Pflichtfelder. Eine oder mehrere der Optionen included, excluded, includedPattern, excludedPattern sind erforderlich.

[
    {
        "schema": "<source table schema>",
        "table" : "<source table name>",
        "included": ["<column name>", "<column name>"],
        "excluded": ["<column name>", "<column name>"],
        "includedPattern": "<regular expression>",
        "excludedPattern": "<regular expression>",
    }
]
Copy

Verfolgen von Datenänderungen in Tabellen

Der Konnektor repliziert nicht nur den aktuellen Zustand der Daten aus den Quelltabellen, sondern auch jeden Zustand jeder Zeile aus jedem Änderungssatz. Diese Daten werden in Journaltabellen gespeichert, die in demselben Schema wie die Zieltabelle erstellt wurden.

Die Journaltabellennamen haben folgendes Format: <source table name>_JOURNAL_<timestamp>_<schema generation> where <timestamp> is the value of epoch seconds when the source table was added to replication, and <schema generation> ist eine ganze Zahl, die mit jeder Schemaänderung in der Quelltabelle erhöht wird. Infolgedessen haben Quelltabellen, die Schemaänderungen unterliegen, mehrere Journaltabellen.

Wenn eine Tabelle aus der Replikation entfernt und dann wieder hinzugefügt wird, ändert sich der Wert von <timestamp> und <schema generation> beginnt wieder bei 1.

Wichtig

Snowflake empfiehlt, die Struktur von Journaltabellen in keiner Weise zu verändern. Sie werden vom Konnektor verwendet, um die Zieltabelle im Rahmen der Replikation zu aktualisieren.

Der Konnektor löscht nie Journaltabellen, sondern verwendet das neueste Journal für jede replizierte Quelltabelle und liest nur Nur-Anfügen-Streams über Journale. Um den Speicher wieder freizugeben, können Sie Folgendes tun:

  • Sie können alle Journaltabellen jederzeit kürzen.

  • Löschen Sie die Journaltabellen, die sich auf Quelltabellen beziehen, die aus der Replikation entfernt wurden.

  • Löschen Sie alle Journaltabellen bis auf die neueste Generation aktiv replizierter Tabellen.

Wenn Ihr Konnektor beispielsweise so eingestellt ist, dass er die Quelltabelle orders aktiv repliziert, und Sie zuvor die Tabelle customers aus der Replikation entfernt haben, haben Sie möglicherweise die folgenden Journaltabellen. In diesem Fall können Sie alle außer orders_5678_2 löschen.

customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2

Planung von Zusammenführungsaufgaben konfigurieren

Der Konnektor verwendet ein Warehouse, um Daten aus der Änderungsdatenerfassung (CDC) in Zieltabellen zusammenzuführen. Diese Operation wird durch den Prozessor MergeSnowflakeJournalTable ausgelöst. Wenn es keine neuen Änderungen gibt oder wenn keine neuen FlowFiles in der MergeSnowflakeJournalTable-Warteschlange warten, wird keine Zusammenführung ausgelöst und das Warehouse wird automatisch ausgesetzt.

Um die Warehouse-Kosten zu begrenzen und die Zusammenführungen nur auf die geplante Zeit zu beschränken, verwenden Sie den CRON-Ausdruck im Parameter „Merge task Schedule CRON“. Er drosselt die an den MergeSnowflakeJournalTable-Prozessor gelangenden FlowFiles und die Zusammenführung wird nur in einem bestimmten Zeitraum ausgelöst. Weitere Informationen zur Zeitplanung finden Sie unter Zeitplanungsstrategie.

Führen Sie den Ablauf aus

  1. Klicken Sie mit der rechten Maustaste auf die Ebene, und wählen Sie Enable all Controller Services.

  2. Klicken Sie mit der rechten Maustaste auf die importierte Prozessgruppe und wählen Sie Start. Der Konnektor startet die Datenaufnahme.