Einrichten von Openflow Connector for SQL Server¶
Bemerkung
This connector is subject to the Snowflake Connector Terms.
This topic describes how to set up the Openflow Connector for SQL Server.
Weitere Informationen zum inkrementellen Ladeprozess finden Sie unter Inkrementelle Replikation.
Voraussetzungen¶
Stellen Sie vor dem Einrichten des Konnektors sicher, dass Sie die folgenden Voraussetzungen erfüllt haben:
Stellen Sie sicher, dass Sie Allgemeine Informationen zu Openflow Connector for SQL Server gelesen haben.
Stellen Sie sicher, dass Sie Unterstützte SQL-Server-Versionen gelesen haben.
Vergewissern Sie sich, dass Sie Ihre Laufzeitbereitstellung eingerichtet haben. Weitere Informationen dazu finden Sie unter folgenden Themen:
If you use Openflow - Snowflake Deployments, ensure that you have reviewed configuring required domains and have granted access to the required domains for the SQL Server connector.
Set up your SQL Server instance¶
Bevor Sie den Konnektor einrichten, führen Sie die folgenden Aufgaben in Ihrer SQL Server-Umgebung durch:
Bemerkung
Sie müssen diese Aufgaben als Datenbankadministrator ausführen.
Enable change tracking on the databases and tables that you plan to replicate, as shown in the following SQL Server example:
ALTER DATABASE <database> SET CHANGE_TRACKING = ON (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON); ALTER TABLE <schema>.<table> ENABLE CHANGE_TRACKING WITH (TRACK_COLUMNS_UPDATED = ON);
Bemerkung
Führen Sie diese Befehle für jede Datenbank und Tabelle aus, die Sie replizieren möchten.
Der Konnektor setzt voraus, dass die Änderungsverfolgung für Datenbanken und Tabellen aktiviert ist, bevor die Replikation beginnt. Stellen Sie sicher, dass für jede Tabelle, die Sie replizieren möchten, die Änderungsverfolgung aktiviert ist. Sie können die Änderungsverfolgung auch für zusätzliche Tabellen aktivieren, während der Konnektor läuft.
Eine Anmeldung für die SQL Server-Instanz erstellen:
CREATE LOGIN <user_name> WITH PASSWORD = '<password>';
Diese Anmeldung wird verwendet, um Benutzer für die Datenbanken zu erstellen, die Sie replizieren möchten.
Erstellen Sie einen Benutzer für jede Datenbank, die Sie replizieren, indem Sie den folgenden SQL Server-Befehl in jeder Datenbank ausführen:
USE <source_database>; CREATE USER <user_name> FOR LOGIN <user_name>;
Gewähren Sie die SELECT- und VIEW CHANGE TRACKING-Berechtigungen für den Benutzer für jede Datenbank, die Sie replizieren:
GRANT SELECT ON <database>.<schema>.<table> TO <user_name>; GRANT VIEW CHANGE TRACKING ON <database>.<schema>.<table> TO <user_name>;
Führen Sie diese Befehle in jeder Datenbank für jede Tabelle aus, die Sie replizieren möchten. Diese Berechtigungen müssen dem Benutzer jeder Datenbank erteilt werden, die Sie in einem vorherigen Schritt erstellt haben.
(Optional) Configure SSL connection.
If you use an SSL connection to connect SQL Server, create the root certificate for your database server. This is required when configuring the connector.
Einrichten Ihrer Snowflake-Umgebung¶
As a Snowflake administrator, perform the following tasks:
Erstellen Sie in Snowflake eine Zieldatenbank zum Speichern der replizierten Daten:
CREATE DATABASE <destination_database>;
Snowflake Servicebenutzer erstellen:
CREATE USER <openflow_user> TYPE = SERVICE COMMENT='Service user for automated access of Openflow';
Erstellen Sie eine Snowflake-Rolle für den Konnektor, und erteilen Sie die erforderlichen Berechtigungen:
CREATE ROLE <openflow_role>; GRANT ROLE <openflow_role> TO USER <openflow_user>; GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>; GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
Verwenden Sie diese Rolle, um den Zugriff des Konnektors auf die Snowflake-Datenbank zu verwalten.
Um Objekte in der Zieldatenbank zu erstellen, müssen Sie die Berechtigungen USAGE und CREATE SCHEMA für die Datenbank der Rolle gewähren, die für die Zugriffsverwaltung verwendet wird.
Erstellen Sie ein Snowflake-Warehouse für den Konnektor, und erteilen Sie die erforderlichen Berechtigungen:
CREATE WAREHOUSE <openflow_warehouse> WITH WAREHOUSE_SIZE = 'MEDIUM' AUTO_SUSPEND = 300 AUTO_RESUME = TRUE; GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
Snowflake recommends starting with a MEDIUM warehouse size, then experimenting with size depending on the number of tables being replicated and the amount of data transferred. Large numbers of tables typically scale better with multi-cluster warehouses, rather than a larger warehouse size. For more information, see multi-cluster warehouses.
Set up the public and private keys for key pair authentication:
Erstellen Sie ein Paar sicherer Schlüssel (öffentlich und privat).
Store the private key for the user in a file to supply to the connector’s configuration.
Weisen Sie dem Snowflake Service-Benutzer den öffentlichen Schlüssel zu.
ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
Weitere Informationen dazu finden Sie unter Schlüsselpaar-Authentifizierung und Schlüsselpaar-Rotation.
Konnektor konfigurieren¶
As a data engineer, install and configure the connector using the following sections.
Konnektor installieren¶
Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.
Suchen Sie auf der Seite Openflow-Konnektoren den Konnektor und wählen Sie Add to runtime.
In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.
Bemerkung
Bevor Sie den Konnektor installieren, stellen Sie sicher, dass Sie in Snowflake eine Datenbank und ein Schema für den Konnektor erstellt haben, in dem die aufgenommenen Daten gespeichert werden.
Authentifizieren Sie sich bei der Bereitstellung mit den Anmeldedaten Ihres Snowflake-Kontos und wählen Sie Allow, wenn Sie dazu aufgefordert werden, damit die Laufzeitanwendung auf Ihr Snowflake-Konto zugreifen kann. Die Installation des Konnektors nimmt einige Minuten in Anspruch.
Authentifizieren Sie sich bei der Laufzeit mit den Anmeldeinformationen Ihres Snowflake-Kontos.
Das Openflow-Canvas wird mit der hinzugefügten Prozessgruppe des Konnektors angezeigt.
Konnektor konfigurieren¶
To configure the connector, perform the following steps:
Klicken Sie mit der rechten Maustaste auf die importierte Prozessgruppe und wählen Sie Parameters.
Populate the required parameter values as described in Ablaufparameter.
Ablaufparameter¶
Start by setting the parameters of the SQLServer Source Parameters context, then the SQLServer Destination Parameters context. After you complete this, enable the connector. The connector connects to both SQLServer and Snowflake and starts running. However, the connector does not replicate any data until any tables to be replicated are explicitly added to its configuration.
Um bestimmte Tabellen für die Replikation zu konfigurieren, bearbeiten Sie die Aufnahmeparameter für SQLServer. Nachdem Sie die Änderungen am Kontext der Aufnahmeparameter für SQLServer vorgenommen haben, wird die Konfiguration vom Konnektor übernommen und der Replikationslebenszyklus für jede Tabelle gestartet.
Quellsystemparameter für SQLServer¶
Parameter |
Beschreibung |
|---|---|
SQL Server Connection URL |
Die vollständige JDBC URL zur Quelldatenbank. Beispiel:
|
SQL Server JDBC Driver |
Select the Reference asset checkbox to upload the SQL Server JDBC driver. |
SQL Server Username |
The user name for the connector. |
SQL Server Password |
Das Kennwort für den Konnektor. |
Zielsystemparameter für SQLServer¶
Parameter |
Beschreibung |
Erforderlich |
|---|---|---|
Destination Database |
The database where data is persisted. It must already exist in Snowflake. The name is case-sensitive. For unquoted identifiers, provide the name in uppercase. |
Ja |
Snowflake Authentication Strategy |
Bei Verwendung von:
|
Ja |
Snowflake Account Identifier |
Bei Verwendung von:
|
Ja |
Snowflake Connection Strategy |
Bei Verwendung von KEY_PAIR geben Sie die Strategie für die Verbindung zu Snowflake an:
|
Nur erforderlich für BYOC mit KEY_PAIR, andernfalls wird dies ignoriert. |
Snowflake Object Identifier Resolution |
Gibt an, wie Quellobjektbezeichner wie Schemas, Tabellen und Spaltennamen in Snowflake gespeichert und abgefragt werden. Diese Einstellung bestimmt, ob Sie in SQL-Abfragen doppelte Anführungszeichen verwenden müssen. Option 1: Standard ist die Beachtung der Groß- und Kleinschreibung (empfohlen).
Bemerkung Snowflake empfiehlt die Verwendung dieser Option, wenn Datenbankobjekte keine Namen mit gemischter Groß-/Kleinschreibung haben. Wichtig Ändern Sie diese Einstellung nicht, nachdem die Datenaufnahme des Konnektors begonnen hat. Das Ändern dieser Einstellung nach Beginn der Datenaufnahme führt zum Abbruch der bestehenden Datenaufnahme. Wenn Sie diese Einstellung ändern müssen, erstellen Sie eine neue Konnektorinstanz. Option 2: Groß-/Kleinschreibung wird berücksichtigt.
Bemerkung Snowflake empfiehlt die Verwendung dieser Option, wenn Sie die Groß-/Kleinschreibung der Quelle aus Gründen der Kompatibilität beibehalten müssen. Wenn beispielsweise die Quelldatenbank Tabellennamen enthält, die sich nur in der Groß-/Kleinschreibung unterscheiden, wie z. B. |
Ja |
Snowflake Private Key |
Bei Verwendung von:
|
Nein |
Snowflake Private Key File |
Bei Verwendung von:
|
Nein |
Snowflake Private Key Password |
Bei Verwendung von:
|
Nein |
Snowflake Role |
Bei Verwendung von:
|
Ja |
Snowflake-Benutzername |
Bei Verwendung von:
|
Ja |
Snowflake Warehouse |
Snowflake Warehouse, das für die Ausführung von Abfragen verwendet wird. |
Ja |
Aufnahmeparameter für SQLServer¶
Parameter |
Beschreibung |
|---|---|
Included Table Names |
A comma-separated list of source table paths, including their databases and schemas, for example:
|
Included Table Regex |
A regular expression to match against table paths, including database and schema names. Every path matching the expression is replicated, and new tables matching the pattern that are created later are also included automatically, for example:
|
Filter JSON |
A JSON containing a list of fully-qualified table names and a regex pattern for column names that should be included into replication. Das folgende Beispiel enthält alle Spalten, die mit
|
Merge Task Schedule CRON |
CRON-Ausdruck, der Zeiträume definiert, in denen Zusammenführungsoperationen vom Journal zur Zieltabelle ausgelöst werden. Setzen Sie ihn auf Beispiel:
Weitere Informationen und Beispiele finden Sie in der Anleitung zu Cron-Triggern in der Quartz-Dokumentation |
Entfernen und erneutes Hinzufügen einer Tabelle zur Replikation¶
To remove a table from replication, remove it from the Included Table Names or Included Table Regex parameters in the Replication Parameters context.
To re-add the table to replication later, first delete the corresponding destination table in Snowflake.
Afterward, add the table back to the Included Table Names or Included Table Regex parameters.
This ensures that the replication process starts fresh for the table.
Diese Vorgehensweise kann auch zur Wiederherstellung nach einer fehlgeschlagenen Tabellenreplikation verwendet werden.
Replizieren einer Teilmenge von Spalten in einer Tabelle¶
The connector filters the data replicated per table to a subset of configured columns.
Um Filter auf Spalten anzuwenden, ändern Sie die Eigenschaft „Column Filter“ im Replikationsparameterkontext und fügen Sie ein Array mit Konfigurationen hinzu, wobei Sie für jede Tabelle, auf die Sie einen Filter anwenden möchten, einen Eintrag hinzufügen.
Include or exclude columns by name or pattern. You can apply a single condition per table, or combine multiple conditions, with exclusions always taking precedence over inclusions.
Das folgende Beispiel zeigt die Felder, die verfügbar sind. Die Felder schema und table sind Pflichtfelder. Eine oder mehrere der Optionen included, excluded, includedPattern, excludedPattern sind erforderlich.
[
{
"schema": "<source table schema>",
"table" : "<source table name>",
"included": ["<column name>", "<column name>"],
"excluded": ["<column name>", "<column name>"],
"includedPattern": "<regular expression>",
"excludedPattern": "<regular expression>",
}
]
Verfolgen von Datenänderungen in Tabellen¶
The connector replicates the current state of data from the source tables, as well as every state of every row from every changeset. This data is stored in journal tables created in the same schema as the destination table.
Die Journaltabellennamen haben folgendes Format: <source table name>_JOURNAL_<timestamp>_<schema generation> where <timestamp> is the value of epoch seconds when the source table was added to replication, and <schema generation> ist eine ganze Zahl, die mit jeder Schemaänderung in der Quelltabelle erhöht wird. Infolgedessen haben Quelltabellen, die Schemaänderungen unterliegen, mehrere Journaltabellen.
When you remove a table from replication, then add it back, the <timestamp> value changes, and <schema generation> starts again from 1.
Wichtig
Snowflake recommends not altering the structure of journal tables in any way. The connector uses them to update the destination table as part of the replication process.
The connector never drops journal tables, but uses the latest journal for every replicated source table, only reading append-only streams on top of journals. To reclaim the storage, you can:
Sie können alle Journaltabellen jederzeit kürzen.
Löschen Sie die Journaltabellen, die sich auf Quelltabellen beziehen, die aus der Replikation entfernt wurden.
Löschen Sie alle Journaltabellen bis auf die neueste Generation aktiv replizierter Tabellen.
Wenn Ihr Konnektor beispielsweise so eingestellt ist, dass er die Quelltabelle orders aktiv repliziert, und Sie zuvor die Tabelle customers aus der Replikation entfernt haben, haben Sie möglicherweise die folgenden Journaltabellen. In diesem Fall können Sie alle außer orders_5678_2 löschen.
customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2
Planung von Zusammenführungsaufgaben konfigurieren¶
Der Konnektor verwendet ein Warehouse, um Daten aus der Änderungsdatenerfassung (CDC) in Zieltabellen zusammenzuführen. Diese Operation wird durch den Prozessor MergeSnowflakeJournalTable ausgelöst. Wenn es keine neuen Änderungen gibt oder wenn keine neuen FlowFiles in der MergeSnowflakeJournalTable-Warteschlange warten, wird keine Zusammenführung ausgelöst und das Warehouse wird automatisch ausgesetzt.
Use the CRON expression in the Merge task Schedule CRON parameter to limit the warehouse cost and limit merges to only scheduled time. It throttles the flow files coming to the MergeSnowflakeJournalTable processor and merges are triggered only in a dedicated period of time. For more information about scheduling, see Scheduling strategy.
Führen Sie den Ablauf aus¶
Klicken Sie mit der rechten Maustaste auf die Ebene, und wählen Sie Enable all Controller Services.
Klicken Sie mit der rechten Maustaste auf die importierte Prozessgruppe und wählen Sie Start. Der Konnektor startet die Datenaufnahme.