Openflow Connector for MySQL の設定¶
注釈
このコネクタは、 Snowflakeコネクタ規約 に従うものとします。
このトピックでは、 Openflow Connector for MySQL を設定する手順について説明します。
注釈
このコネクタは、スナップショットのロードフェーズをバイパスして、新しく追加されたテーブルの増分変更の複製をすぐに開始するように構成できます。このオプションは、以前に複製されたデータが存在し、テーブルを再スナップショットせずに複製を続行するアカウントにコネクタを再インストールする場合に、よく役立ちます。
増分ロードプロセスの詳細については、 増分複製 をご参照ください。
前提条件¶
Openflow Connector for MySQL について を確認してください。
:doc:`/user-guide/data-integration/openflow/setup-openflow-byoc`または:doc:`Openflow のセットアップ - Snowflake デプロイメント</user-guide/data-integration/openflow/setup-openflow-spcs>`があることを確認してください。
|OFSFSPCS-plural|を使用する場合、:doc:`必要なドメインの構成</user-guide/data-integration/openflow/setup-openflow-spcs-sf-allow-list>`を精査し、:ref:`label-openflow_domains_used_by_openflow_connectors_mysql`コネクタに必要なドメインへのアクセス許可を付与していることを確認してください。
Snowflakeとデータを同期するため、 MySQL 8以降のバージョンを使用していることを確認します。
推奨:ランタイムごとにコネクタインスタンスを1つだけ追加するようにします。
データベース管理者として、以下のタスクを実行します。
バイナリログ を有効にし、その形式を以下のように保存して構成します。
log_binonに設定。これにより、構造やデータの変更を記録するバイナリログが有効になります。
binlog_formatrowに設定。コネクタは行ベースの複製のみをサポートします。MySQL 8.xバージョンがこの設定をサポートする最後のバージョンとなる可能性があり、将来のバージョンでは行ベースの複製のみがサポートされます。
GCP クラウド SQL では適用されません。正しい値に固定されています。
binlog_row_metadatafullに設定。コネクタが動作するためには、すべての行メタデータが必要で、最も重要であるのは列名と主キー情報です。
MySQL 用のMicrosoft Azureデータベースでは、
binlog_row_metadataフィールドはユーザーが変更できません。この値を変更するには、Microsoftサポートチケットを提出してください。binlog_row_imagefullに設定。コネクタは、すべての列がバイナリログに書き込まれていることを要求します。
Amazon Auroraでは適用されません。正しい値に固定されています。
binlog_row_value_options空のままにする。
このオプションは JSON 列にのみ影響し、
UPDATEステートメントに対して、 JSON ドキュメントの変更部分のみを含めるように設定できます。コネクタは、完全なドキュメントがバイナリログに書き込まれていることを要求します。binlog_expire_logs_secondsデータベースエージェントが長時間の停止やダウンタイム後も増分複製を継続できるように、少なくとも数時間、またはそれ以上に設定します。Snowflakeでは、コネクタを安定して動作させるために、 バイナリログの有効期限(binlog_expire_logs_seconds) を少なくとも数時間に設定することを推奨します。バイナリログの有効期限終了後に、バイナリログファイルが自動的に削除される場合があります。メンテナンス作業などで統合が長期間中断され、その間に期限切れのバイナリログファイルが削除されると、Openflowはこれらのファイルのデータを複製できなくなります。
スケジュール複製を使用している場合は、構成されたスケジュールよりも長い値を指定する必要があります。
例:
log_bin = on binlog_format = row binlog_row_metadata = full binlog_row_image = full binlog_row_value_options =
sort_buffer_sizeの値を増やします。sort_buffer_size = 4194304
sort_buffer_sizeは、 ORDER BY のようなメモリ内ソート操作のためにクエリスレッドごとに割り当てられるメモリ量(バイト単位)を定義します。値が小さすぎると、以下のエラーメッセージが表示されてコネクタが失敗することがあります。Out of sort memory, consider increasing server sort buffer size。これは、sort_buffer_sizeを増やす必要があることを示します。Amazon RDS データベースを使用している場合は、
rds_set_configurationを使用してbinlog_expire_logs_secondsに関連する保持期間を増やしてください。例えば、binlogを24時間格納したい場合、mysql.rds_set_configuration('binlog retention hours', 24)を呼び出します。読み取りレプリカを使用して接続する場合は、レプリカでバイナリログを有効にする必要があります。
構成の詳細は、ステップ4にあります。
バイナリログを有効にした後、ソースから受信したイベントを独自のバイナリログに記録するレプリカを構成します。
log_replica_updates = ON
log_replica_updatesは、レプリカがそのソースから受信したイベントを独自のバイナリログに書き込み、それらの変更をレプリカから複製するすべてのデータベースで利用できるようにします。SSL 経由で接続します。MySQL への SSL 接続を使用する場合は、データベースサーバーのルート証明書を準備します。これは、構成時に必要です。
コネクタのユーザーを作成します。コネクタは、バイナリログを読み取るために REPLICATION_SLAVE と REPLICATION_CLIENT の権限を持つユーザーを必要とします。これらの権限を付与します。
GRANT REPLICATION SLAVE ON *.* TO '<username>'@'%' GRANT REPLICATION CLIENT ON *.* TO '<username>'@'%'
複製されたすべてのテーブルに SELECT 権限を付与します。
GRANT SELECT ON <schema>.* TO '<username>'@'%' GRANT SELECT ON <schema>.<table> TO '<username>'@'%'
複製セキュリティの詳細については、 バイナリログ をご参照ください。
Snowflakeアカウント管理者として、以下のタスクを実行します。
タイプを SERVICE としてSnowflakeユーザーを作成します。複製データを格納するデータベースを作成し、Snowflakeユーザーに USAGE および CREATE SCHEMA 権限 を付与して、そのデータベースにオブジェクトを作成する権限を設定します。
CREATE DATABASE <destination_database>; CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow'; CREATE ROLE <openflow_role>; GRANT ROLE <openflow_role> TO USER <openflow_user>; GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>; GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>; CREATE WAREHOUSE <openflow_warehouse> WITH WAREHOUSE_SIZE = 'XSMALL' AUTO_SUSPEND = 300 AUTO_RESUME = TRUE; GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
セキュアキーのペア(公開キーと秘密キー)を作成します。ユーザーの秘密キーをファイルに格納して、コネクタの構成に提供します。Snowflakeサービスユーザーに公開キーを割り当てます:
ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
詳細については、 キーのペア をご参照ください。
コネクタが使用するウェアハウスを指定します。で開始する
XSMALLウェアハウスのサイズから、レプリケートされるテーブルの量とデータの転送量に応じてサイズを実験します。多数のテーブルを扱う場合、通常はウェアハウスサイズではなく、 マルチクラスターウェアハウス を使用した方がスケールしやすくなります。
コネクタをインストールする¶
コネクタをインストールするには、データエンジニアとして次を実行します。
Openflow概要ページに移動します。Featured connectors セクションで、 View more connectors を選択します。
Openflowのコネクタページでコネクタを探し、 Add to runtime を選択します。
Select runtime ダイアログで、Available runtimes ドロップダウンリストからランタイムを選択して Add をクリックします。
注釈
コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベースとスキーマをSnowflakeで作成したことを確認します。
Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。
Snowflakeアカウント認証情報でランタイムを認証します。
コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。
コネクタを構成する¶
コネクタを構成するには、データエンジニアとして次の手順を実行します。
インポートしたプロセスグループを右クリックし、 Parameters を選択します。
フローパラメーター の説明に従って、必要なパラメーター値を入力します。
フローパラメーター¶
まず、 MySQL ソースパラメーターコンテキストのパラメーターを設定し、次に MySQL 宛先パラメーターコンテキストを設定します。これが完了したら、コネクタを有効にします。コネクタを MySQL とSnowflakeの両方に接続し、実行を開始する必要があります。ただし、複製するテーブルが構成に明示的に追加されるまでは、コネクタはデータを複製しません。
複製する特定のテーブルを構成するには、 MySQL 取り込みパラメーターコンテキストを編集します。複製パラメーターコンテキストに変更を適用すると、コネクタが構成を取得し、各テーブルの複製ライフサイクルが開始します。
MySQL ソースパラメーターコンテキスト¶
パラメーター |
説明 |
|---|---|
MySQL 接続 URL |
フル JDBCURL ソースデータベース。コネクタは MariaDB ドライバーを使用します。これは、MySQL と互換性があり、URLで 例:
|
MySQL JDBC ドライバー |
MariaDB JDBC ドライバーのjar への絶対パス。コネクタは MySQL と互換性のある MariaDB ドライバーを使用します。MariaDB JDBC ドライバーをアップロードするには、 Reference asset チェックボックスを選択します。 例: |
MySQL ユーザー名 |
コネクタのユーザー名。 |
MySQL パスワード |
コネクタのパスワード。 |
MySQL 宛先パラメーターコンテキスト¶
パラメーター |
説明 |
必須 |
|---|---|---|
宛先データベース |
データが永続化されるデータベース。Snowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。 |
有り |
Snowflake認証ストラテジー |
以下を使用する場合:
|
有り |
Snowflakeアカウント識別子 |
以下を使用する場合:
|
有り |
Snowflake接続の戦略 |
KEY_PAIRを使用する場合は、Snowflakeに接続するための戦略を特定します。
|
KEY_PAIRを使用したBYOCのみに必要です。それ以外の場合は無視されます。 |
Snowflake秘密キー |
以下を使用する場合:
|
無し |
Snowflake秘密キーファイル |
以下を使用する場合:
|
無し |
Snowflake秘密キーパスワード |
以下を使用する場合
|
無し |
Snowflakeロール |
以下を使用する場合 |
有り |
Snowflakeのユーザー名 |
以下を使用する場合
|
有り |
オーバーサイズ値戦略 |
複製中にコネクタが内部サイズ制限(16MB)を超える値を処理する方法を決定します。可能な値は次のとおりです。
|
無し |
Snowflakeウェアハウス |
クエリの実行に使用されるSnowflakeウェアハウス。 |
有り |
MySQL 取り込みパラメーターコンテキスト¶
パラメーター |
説明 |
|---|---|
含まれるテーブル名 |
スキーマを含むテーブルパスのコンマ区切りリスト。例: |
含まれるテーブル正規表現 |
テーブルパスに一致させる正規表現。式に一致するすべてのパスが複製され、後から作成されたパターンに一致する新しいテーブルも自動的に含まれます。例: |
フィルター JSON |
完全修飾されたテーブル名のリストと、複製に含めるべき列名の正規表現パターンを含む JSON。例: |
タスクスケジュール CRON をマージする |
ジャーナルから宛先テーブルへのマージ操作がトリガーされる期間を定義する CRON 式。連続的なマージやタイムスケジュールでウェアハウスの実行時間を制限したい場合は、 |
オブジェクト識別子の解決 |
スキーマ名、テーブル名、列名などのソースオブジェクト識別子がSnowflakeにどのように保存され、クエリされるかを指定します。この設定は、 SQL クエリで二重引用符を使用する必要があることを指定します。 :emph:`オプション1:(デフォルト)大文字と小文字を区別する。 `下位互換性のため。
注釈 Snowflakeでは、レガシーまたは互換性の理由からソースの大文字小文字の区別を保持する必要がある場合、このオプションを使用することを推奨しています。たとえば、 :emph:`オプション 2:(推奨)大文字と小文字を区別しない `
注釈 Snowflakeでは、データベースオブジェクトに大文字と小文字が混在する名前が想定されない場合、このオプションを使用することを推奨しています。 重要 コネクタがデータのインジェスチョンを開始した後は、この設定を変更しないでください。取り込み開始後にこの設定を変更すると、既存の取り込みは中断されます。この設定を変更する必要がある場合は、新しいコネクタインスタンスを作成してください。 |
テーブル複製の再開¶
(主キーが欠落している、またはスキーマの変更がサポートされていないなどの理由で) FAILED 状態のテーブルは、自動的に再開されません。テーブルが FAILED 状態になった場合、または複製を最初から再開する必要がある場合は、次の手順に従ってテーブルを削除し、複製に再度追加します。
注釈
主キーが欠落しているなど、ソーステーブルの問題が原因で障害が発生した場合は、続行する前にソースデータベースでその問題を解決します。
フローパラメーターからテーブルを削除します。取り込みパラメーターのコンテキストで、Included Table Names からテーブルを削除するか、Included Table Regex を変更してテーブルが一致しないようにします。
テーブルが削除されたことを確認します。
Openflowランタイムキャンバスで、プロセッサーグループを右クリックし、Controller Services を選択します。
コントローラーサービスをリストしたテーブルで、Table State Store 行を見つけ、行の右側にある縦3つのドットをクリックして、View State を選択します。
重要
続行する前に、テーブルの状態がこのリストから完全に削除されるまで待つ必要があります。この構成変更が完了するまで続行しないでください。
宛先をクリーンアップする:テーブルの状態が完全に削除されたと表示されたら、Snowflake で宛先テーブルを手動で DROP します。スナップショットフェーズ中に、コネクタは既存の宛先テーブルを上書きしないことに注意してください。テーブルがまだ存在する場合、複製は再度失敗します。オプションで、ジャーナルテーブルとストリームが不要になった場合は削除することもできます。
テーブルを再度追加する: Included Table Names または Included Table Regex パラメーターを更新して、テーブルを再度含めます。
再開を確認する:前述の指示に従って Table State Store をチェックします。テーブルの状態は、ステータス NEW で表示され、次に SNAPSHOT_REPLICATION に移行し、最後に INCREMENTAL_REPLICATION になります。
テーブルの列のサブセットを複製します。¶
コネクタは、テーブルごとに複製されるデータを構成列のサブセットにフィルターできます。
列にフィルターを適用するには、複製パラメーターコンテキストのColumn Filterプロパティを変更し、フィルターを適用したいテーブルごとに1エントリずつ、構成の配列を追加します。
列は、名前またはパターン別に包含したり、除外したりすることができます。テーブルごとに単一の条件を適用することも、複数の条件を組み合わせて適用することもできます。除外は常に包含より優先されます。
以下の例は、利用可能なフィールドを示しています。schema および table フィールドは必須です。included、 excluded、 includedPattern、 excludedPattern のうち1つ以上が必要です。
[
{
"schema": "<source table schema>",
"table" : "<source table name>",
"included": ["<column name>", "<column name>"],
"excluded": ["<column name>", "<column name>"],
"includedPattern": "<regular expression>",
"excludedPattern": "<regular expression>",
}
]
テーブルでデータ変更を追跡する¶
コネクタはソーステーブルのデータの現在の状態だけでなく、すべての変更セットのすべての行の状態も複製します。このデータは、宛先テーブルと同じスキーマで作成されたジャーナルテーブルに格納されます。
ジャーナルテーブル名の形式は``<source_table_name>_JOURNAL_<timestamp>_<schema_generation>``です。ここで、``<timestamp>``はソーステーブルが複製に追加されたときのエポック秒の値であり、``<schema_generation>``はソーステーブルのスキーマ変更ごとに増加する整数です。その結果、スキーマが変更されるソーステーブルには、複数のジャーナルテーブルがあります。
テーブルが複製から削除され、再度追加されると、``<timestamp>``値が変更され、``<schema_generation>``は``1``から再開します。
重要
Snowflakeは、ジャーナルテーブルの構造を一切変更しないことを推奨します。これらは、コネクタによって複製プロセスの一部として宛先テーブルを更新するために使用されます。
コネクタはジャーナルテーブルをドロップすることはありませんが、複製されたすべてのソーステーブルに最新のジャーナルを使用し、ジャーナル上の追加専用ストリームのみを読み取ります。ストレージを回収するには、以下を実行します。
すべてのジャーナルテーブルをいつでも切り捨てます。
複製から削除されたソーステーブルに関連するジャーナルテーブルをドロップします。
アクティブに複製されたテーブルの最新の生成ジャーナルテーブルを除いて、すべてをドロップします。
例えば、コネクタがソーステーブル orders をアクティブに複製するように設定されており、以前にテーブル customers を複製から削除した場合、以下のようなジャーナルテーブルが存在する可能性があります。この場合、 orders_5678_2 を 除いて、それらのすべてをドロップできます。
customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2
マージタスクのスケジュールを構成する¶
コネクタはウェアハウスを使用して、変更データキャプチャ(CDC)データを宛先テーブルにマージします。この操作は、 MergeSnowflakeJournalTable プロセッサーによってトリガーされます。新しい変更がない場合、または MergeSnowflakeJournalTable キューで待機する新しいフローファイルがない場合、マージはトリガーされず、ウェアハウスは自動サスペンドします。
ウェアハウスのコストを制限し、スケジュールされた時間のみにマージを制限するには、Merge task Schedule CRON パラメーターで CRON 式を使用します。MergeSnowflakeJournalTable プロセッサーに送られてくるフローファイルをスロットルし、マージは専用の期間のみにトリガーされます。スケジュールに関する詳細は、 スケジュールストラテジー をご参照ください。
フローを実行する¶
プレーンを右クリックし、 Enable all Controller Services を選択します。
インポートしたプロセスグループを右クリックし、 Start を選択します。コネクタがデータの取り込みを開始します。