Einrichten von Openflow Connector for MySQL¶
Bemerkung
Der Konnektor unterliegt den Bedingungen für Konnektoren.
Unter diesem Thema werden die Schritte zur Einrichtung von Openflow Connector for MySQL beschrieben.
Voraussetzungen¶
Stellen Sie sicher, dass Sie Allgemeine Informationen zu Openflow Connector for MySQL gelesen haben.
Stellen Sie sicher, dass Sie über MySQL 8 oder eine neuere Version verfügen, um Daten mit Snowflake zu synchronisieren.
Stellen Sie sicher, dass Sie Openflow eingerichtet haben.
Führen Sie als Datenbankadministrator die folgenden Aufgaben aus:
Aktivieren Sie Binärprotokolle, speichern Sie dann und konfigurieren Sie das Format wie folgt:
log_bin
Auf
on
setzen.Dies aktiviert das binäre Protokoll, das Struktur- und Datenänderungen aufzeichnet.
binlog_format
Auf
row
setzen.Der Konnektor unterstützt nur zeilenbasierte Replikation. MySQL 8.x-Versionen sind möglicherweise die letzten, die diese Einstellung unterstützen. Zukünftige Versionen werden nur noch die zeilenbasierte Replikation unterstützen.
Nicht anwendbar in GCP-Cloud-SQL, wo er auf den richtigen Wert festgelegt ist.
binlog_row_metadata
Auf
full
setzen.Der Konnektor benötigt alle Zeilenmetadaten, um zu funktionieren, vor allem Spaltennamen und Primärschlüsselinformationen.
binlog_row_image
Auf
full
setzen.Der Konnektor verlangt, dass alle Spalten in das binäre Protokoll geschrieben werden.
Nicht anwendbar in Amazon Aurora, wo er auf den richtigen Wert festgelegt ist.
binlog_row_value_options
Leave empty.
Diese Option wirkt sich nur auf JSON-Spalten aus, wo sie so eingestellt werden kann, dass nur die geänderten Teile von JSON-Dokumenten für
UPDATE
-Anweisungen berücksichtigt werden. Der Konnektor erfordert, dass vollständige Dokumente in das Binärprotokoll geschrieben werden.binlog_expire_logs_seconds
Stellen Sie den Wert auf mindestens einige Stunden oder länger ein, um sicherzustellen, dass der Datenbankagent die inkrementelle Replikation nach längeren Pausen oder Ausfallzeiten fortsetzen kann. Snowflake empfiehlt, den Zeitraum für den Ablauf des Binärprotokolls (binlog_expire_logs_seconds) auf mindestens einige Stunden einzustellen, um ein stabiles Funktionieren des Konnektors zu gewährleisten. Nachdem der Ablaufzeitraum des Binärprotokolls endet, werden die Binärprotokolldateien möglicherweise automatisch entfernt. Wenn die Integration für einen längeren Zeitraum pausiert, z. B. aufgrund von Wartungsarbeiten, und die abgelaufenen Binärprotokolldateien während dieser Zeit gelöscht werden, kann Openflow die Daten aus diesen Dateien nicht replizieren.
Wenn Sie die geplante Replikation verwenden, muss der Wert länger sein als der konfigurierte Zeitplan.
Siehe den folgenden Code als Beispiel:
log_bin = on binlog_format = row binlog_row_metadata = full binlog_row_image = full binlog_row_value_options =
Erhöhen Sie den Wert von
sort_buffer_size
.sort_buffer_size = 4194304
sort_buffer_size
definiert die Menge an Speicher (in Bytes), die pro Abfrage-Thread für Sortieroperationen im Arbeitsspeicher wie ORDER BY zugewiesen wird. Wenn der Wert zu klein ist, kann der Konnektor mit der folgenden Fehlermeldung fehlschlagen:Out of sort memory, consider increasing server sort buffer size
. Dies bedeutet, dasssort_buffer_size
erhöht werden sollte.Wenn Sie Amazon RDS-Datenbanken verwenden, dann erhöhen Sie die Aufbewahrungsfrist entsprechend
binlog_expire_logs_seconds
mitrds_set_configuration
. Wenn Sie beispielsweise Binlogs für 24 Stunden speichern möchten, rufen Siemysql.rds_set_configuration('binlog retention hours', 24)
auf.Verbinden Sie sich über SSL. Wenn Sie eine SSL-Verbindung zu MySQL verwenden möchten, bereiten Sie das Stammzertifikat für Ihren Datenbankserver vor. Es wird während der Konfiguration benötigt.
Erstellen Sie einen Benutzer für den Connector. Der Konnektor benötigt einen Benutzer mit den Berechtigungen REPLICATION_SLAVE und REPLICATION_CLIENT zum Lesen der Binärprotokolle. Erteilen Sie folgende Berechtigungen:
GRANT REPLICATION SLAVE ON *.* TO '<username>'@'%' GRANT REPLICATION CLIENT ON *.* TO '<username>'@'%'
Erteilen Sie die SELECT-Berechtigung für jede replizierte Tabelle:
GRANT SELECT ON <schema>.* TO '<username>'@'%' GRANT SELECT ON <schema>.<table> TO '<username>'@'%'
Weitere Informationen zur Replikationssicherheit finden Sie unter Binärprotokoll.
Als Snowflake-Kontoadministrator führen Sie die folgenden Aufgaben aus:
Erstellen Sie einen Snowflake Benutzer mit dem Typ als SERVICE. Erstellen Sie eine Datenbank, um die replizierten Daten zu speichern, und richten Sie Berechtigungen für den Snowflake-Benutzer ein, um Objekte in dieser Datenbank zu erstellen, indem Sie die Berechtigungen USAGE und CREATE SCHEMA erteilen.
CREATE DATABASE <destination_database>; CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow'; CREATE ROLE <openflow_role>; GRANT ROLE <openflow_role> TO USER <openflow_user>; GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>; GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>; CREATE WAREHOUSE <openflow_warehouse> WITH WAREHOUSE_SIZE = 'MEDIUM' AUTO_SUSPEND = 300 AUTO_RESUME = TRUE; GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
Erstellen Sie ein Paar sicherer Schlüssel (öffentlich und privat). Speichern Sie den privaten Schlüssel des Benutzers in einer Datei, die Sie der Konfiguration des Konnektors zur Verfügung stellen. Weisen Sie den öffentlichen Schlüssel dem Benutzer des Snowflake-Dienstes zu:
ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
Weitere Informationen finden Sie unter Schlüsselpaar.
Bestimmen Sie ein Warehouse, das der Konnektor verwenden soll. Beginnen Sie mit der Größe des
MEDIUM
-Warehouse und experimentieren Sie dann mit der Größe in Abhängigkeit von der Anzahl der zu replizierenden Tabellen und der Menge der übertragenen Daten. Große Tabellenzahlen lassen sich in der Regel besser mit Multi-Cluster-Warehouses als mit der Warehouse-Größe.
Einrichten des Konnektors¶
Als Data Engineer führen Sie die folgenden Aufgaben aus, um den Konnektor zu installieren und zu konfigurieren:
Konnektor installieren¶
Navigieren Sie zur Openflow-Übersichtsseite. Wählen Sie im Abschnitt Featured connectors die Option View more connectors aus.
Suchen Sie auf der Seite Openflow-Konnektoren den Konnektor und wählen Sie Add to runtime.
Wählen Sie im Dialog Select runtime Ihre Laufzeit aus der Dropdown-Liste Available runtimes aus.
Wählen Sie Add aus.
Bemerkung
Bevor Sie den Konnektor installieren, stellen Sie sicher, dass Sie in Snowflake eine Datenbank und ein Schema für den Konnektor erstellt haben, in dem die aufgenommenen Daten gespeichert werden.
Authentifizieren Sie sich bei der Bereitstellung mit den Anmeldedaten Ihres Snowflake-Kontos und wählen Sie Allow, wenn Sie dazu aufgefordert werden, damit die Laufzeitanwendung auf Ihr Snowflake-Konto zugreifen kann. Die Installation des Konnektors nimmt einige Minuten in Anspruch.
Authentifizieren Sie sich bei der Laufzeit mit den Anmeldeinformationen Ihres Snowflake-Kontos.
Das Openflow-Canvas wird mit der hinzugefügten Prozessgruppe des Konnektors angezeigt.
Konnektor konfigurieren¶
Sie können den Konnektor für die folgenden Anwendungsfälle konfigurieren:
Replizieren Sie eine Reihe von Tabellen in Echtzeit¶
Klicken Sie mit der rechten Maustaste auf die importierte Prozessgruppe und wählen Sie Parameters.
Geben Sie die erforderlichen Parameterwerte ein, wie unter Ablaufparameter beschrieben.
Ablaufparameter¶
Beginnen Sie mit dem Festlegen der Quellsystemparamater für MySQL, und legen Sie dann die Zielsystemparameter für MySQL fest. Sobald dies erledigt ist, können Sie den Konnektor aktivieren. Der Konnektor muss sich sowohl mit MySQL als auch mit Snowflake verbinden und dann ausgeführt werden. Der Konnektor repliziert jedoch erst dann Daten, wenn die zu replizierenden Tabellen explizit zu seiner Konfiguration hinzugefügt wurden.
Um bestimmte Tabellen für die Replikation zu konfigurieren, bearbeiten Sie die Aufnahmeparameter für MySQL. Nachdem Sie die Änderungen im Kontext „Replikationsparameter“ übernommen haben, wird die Konfiguration vom Konnektor übernommen und der Replikationslebenszyklus für jede Tabelle gestartet.
Quellsystemparameter für MySQL¶
Parameter |
Beschreibung |
---|---|
MySQL Connection URL |
Die vollständige JDBC URL zur Quelldatenbank. Der Konnektor verwendet den MariaDB-Treiber, der mit MySQL kompatibel ist und erfordert das Präfix Beispiele:
|
MySQL-JDBC-Treiber |
Der absolute Pfad zum MariaDB JDBC-Treiber-Jar-Datei. Der Konnektor verwendet den MariaDB-Treiber, der mit MySQL kompatibel ist. Aktivieren Sie das Kontrollkästchen Reference asset, um den MariaDB JDBC-Treiber hochzuladen. Beispiel: |
MySQL Username |
Der Benutzername für den Konnektor. |
MySQL Password |
Das Kennwort für den Konnektor. |
Zielsystemparameter für MySQL¶
Parameter |
Beschreibung |
---|---|
Destination Database |
Die Datenbank, in der die Daten persistiert werden. Sie muss bereits in Snowflake vorhanden sein. |
Snowflake Account Identifier |
Snowflake-Kontoname im Format [organisation-name]-[account-name], in dem die Daten gespeichert werden |
Snowflake Authentication Strategy |
Strategie zur Authentifizierung bei Snowflake. Mögliche Werte: |
Snowflake Private Key |
Der private RSA Schlüssel, der für die Authentifizierung verwendet wird. Der RSA-Schlüssel muss nach den PKCS8-Standards formatiert sein und den Standard-PEM-Header und -Footer enthalten. Beachten Sie, dass entweder Snowflake Private Key File oder Snowflake Private Key definiert sein muss. |
Snowflake Private Key File |
Die Datei, die den privaten RSA-Schlüssel enthält, der für die Authentifizierung bei Snowflake verwendet wird. Sie ist nach den PKCS8-Standards formatiert und hat die Standard-PEM-Header und -Footer. Die Header beginnt mit |
Snowflake Private Key Password |
Das Kennwort, das mit der Snowflake Private Key-Datei verknüpft ist |
Snowflake Role |
Snowflake-Rolle, die bei der Ausführung der Abfrage verwendet wird |
Snowflake-Benutzername |
Benutzername für die Verbindung zur Snowflake-Instanz |
Snowflake Warehouse |
Snowflake Warehouse zur Ausführung von Abfragen |
Aufnahmeparameter für MySQL¶
Parameter |
Beschreibung |
---|---|
Included Table Names |
Eine durch Kommas getrennte Liste von Tabellenpfaden, einschließlich ihrer Schemas. Beispiel: |
Included Table Regex |
Ein regulärer Ausdruck zum Abgleich mit Tabellenpfaden. Jeder Pfad, der mit dem Ausdruck übereinstimmt, wird repliziert, und neue Tabellen, die dem Muster entsprechen und später erstellt werden, werden ebenfalls automatisch einbezogen. Beispiel: |
Filter JSON |
Eine JSON-Datei, die eine Liste vollständig qualifizierter Tabellennamen und ein reguläres Ausdrucksmuster für Spaltennamen enthält, die in die Replikation einbezogen werden sollen. Beispiel: |
Merge Task Schedule CRON |
CRON-Ausdruck, der Zeiträume definiert, in denen Zusammenführungsoperationen vom Journal zur Zieltabelle ausgelöst werden. Setzen Sie ihn auf |
Entfernen und erneutes Hinzufügen einer Tabelle zur Replikation¶
Um eine Tabelle aus der Replikation zu entfernen, stellen Sie sicher, dass sie aus den Parametern Included Table Names
oder Included Table Regex
im Replikationsparameterkontext entfernt wird.
Wenn Sie die Tabelle später wieder zur Replikation hinzufügen möchten, löschen Sie zunächst die entsprechende Zieltabelle in Snowflake. Danach fügen Sie die Tabelle wieder zu den Parametern Included Table Names
oder Included Table Regex
hinzu. Dadurch wird sichergestellt, dass der Replikationsprozess für die Tabelle neu beginnt.
Diese Vorgehensweise kann auch zur Wiederherstellung nach einer fehlgeschlagenen Tabellenreplikation verwendet werden.
Replizieren einer Teilmenge von Spalten in einer Tabelle¶
Der Konnektor kann die replizierten Daten pro Tabelle auf eine Teilmenge der konfigurierten Spalten filtern.
Um Filter auf Spalten anzuwenden, ändern Sie die Eigenschaft „Column Filter“ im Replikationsparameterkontext und fügen Sie ein Array mit Konfigurationen hinzu, wobei Sie für jede Tabelle, auf die Sie einen Filter anwenden möchten, einen Eintrag hinzufügen.
Spalten können nach Name oder Muster einbezogen oder ausgeschlossen werden. Sie können eine einzelne Bedingung pro Tabelle anwenden oder mehrere Bedingungen kombinieren, wobei Ausschlüsse immer Vorrang vor Einbeziehungen haben.
Das folgende Beispiel zeigt die Felder, die verfügbar sind. Die Felder schema
und table
sind Pflichtfelder. Eine oder mehrere der Optionen included
, excluded
, includedPattern
, excludedPattern
sind erforderlich.
[
{
"schema": "<source table schema>",
"table" : "<source table name>",
"included": ["<column name>", "<column name>"],
"excluded": ["<column name>", "<column name>"],
"includedPattern": "<regular expression>",
"excludedPattern": "<regular expression>",
}
]
Verfolgen von Datenänderungen in Tabellen¶
Der Konnektor repliziert nicht nur den aktuellen Zustand der Daten aus den Quelltabellen, sondern auch jeden Zustand jeder Zeile aus jedem Änderungssatz. Diese Daten werden in Journaltabellen gespeichert, die in demselben Schema wie die Zieltabelle erstellt wurden.
Die Namen der Journaltabellen sind folgendermaßen formatiert: <source table name>_JOURNAL_<timestamp>_<schema generation>
wobei <timestamp>
der Wert der Epochensekunden ist, als die Quelltabelle zur Replikation hinzugefügt wurde, und <schema generation>
eine Ganzzahl ist, die mit jeder Schemaänderung der Quelltabelle steigt. Das bedeutet, dass eine Quelltabelle, die Schemaänderungen erfährt, mehrere Journaltabellen hat.
Wenn eine Tabelle aus der Replikation entfernt und dann wieder hinzugefügt wird, ändert sich der Wert von <timestamp>
und <schema generation>
beginnt wieder bei 1
.
Wichtig
Snowflake empfiehlt Ihnen, die Journaltabellen oder die darin enthaltenen Daten in keiner Weise zu verändern. Sie werden vom Konnektor verwendet, um die Zieltabelle als Teil des Replikationsprozesses zu aktualisieren.
Der Konnektor löscht niemals Journaltabellen, sondern verwendet für jede replizierte Quelltabelle nur das aktuellste Journal. Wenn Sie den Speicherplatz wieder freigeben möchten, können Sie die Journaltabellen, die sich auf die aus der Replikation entfernten Quelltabellen beziehen, sowie alle außer den neuesten Generationen für aktiv replizierte Tabellen sicher löschen.
Wenn Ihr Konnektor beispielsweise so eingestellt ist, dass er die Quelltabelle orders
aktiv repliziert, und Sie zuvor die Tabelle customers
aus der Replikation entfernt haben, haben Sie möglicherweise die folgenden Journaltabellen. In diesem Fall können Sie alle außer orders_5678_2
löschen.
customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2
Planung von Zusammenführungsaufgaben konfigurieren¶
Der Konnektor verwendet ein Warehouse, um Daten aus der Änderungsdatenerfassung (CDC) in Zieltabellen zusammenzuführen. Diese Operation wird durch den Prozessor MergeSnowflakeJournalTable ausgelöst. Wenn es keine neuen Änderungen gibt oder wenn keine neuen FlowFiles in der MergeSnowflakeJournalTable-Warteschlange warten, wird keine Zusammenführung ausgelöst und das Warehouse wird automatisch ausgesetzt.
Um die Warehouse-Kosten zu begrenzen und die Zusammenführungen nur auf die geplante Zeit zu beschränken, verwenden Sie den CRON-Ausdruck im Parameter „Merge task Schedule CRON“. Er drosselt die an den MergeSnowflakeJournalTable-Prozessor gelangenden FlowFiles und die Zusammenführung wird nur in einem bestimmten Zeitraum ausgelöst. Weitere Informationen zur Zeitplanung finden Sie unter Zeitplanungsstrategie.
Führen Sie den Ablauf aus¶
Klicken Sie mit der rechten Maustaste auf die Ebene, und wählen Sie Enable all Controller Services.
Klicken Sie mit der rechten Maustaste auf die importierte Prozessgruppe und wählen Sie Start. Der Konnektor startet die Datenaufnahme.