の設定 Openflow Connector for MySQL

注釈

このコネクタは、 Snowflakeコネクタ規約 に従うものとします。

このトピックでは、 Openflow Connector for MySQL を設定する手順について説明します。

注釈

This connector can be configured to immediately start replicating incremental changes for newly added tables, bypassing the snapshot load phase. This option is often useful when reinstalling the connector in an account where previously replicated data exists and you want to continue replication without having to re-snapshot tables.

For details on the incremental load process, see Incremental replication.

前提条件

  1. Openflow Connector for MySQL について を確認してください。

  2. Ensure that you have Openflowの設定 - BYOC or Set up Openflow - Snowflake Deployments.

  3. If using Openflow - Snowflake Deployments, ensure that you've reviewed configuring required domains and have granted access to the required domains for the MySQL connector.

  4. Snowflakeとデータを同期するため、 MySQL 8以降のバージョンを使用していることを確認します。

  5. 推奨:ランタイムごとにコネクタインスタンスを1つだけ追加するようにします。

  6. データベース管理者として、以下のタスクを実行します。

    1. バイナリログ を有効にし、その形式を以下のように保存して構成します。

      log_bin

      on に設定。

      これにより、構造やデータの変更を記録するバイナリログが有効になります。

      binlog_format

      row に設定。

      コネクタは行ベースの複製のみをサポートします。MySQL 8.xバージョンがこの設定をサポートする最後のバージョンとなる可能性があり、将来のバージョンでは行ベースの複製のみがサポートされます。

      GCP クラウド SQL では適用されません。正しい値に固定されています。

      binlog_row_metadata

      full に設定。

      コネクタが動作するためには、すべての行メタデータが必要で、最も重要であるのは列名と主キー情報です。

      Under Microsoft Azure Database for MySQL the binlog_row_metadata field is not user modifiable. Raise a Microsoft support ticket to change this value.

      binlog_row_image

      full に設定。

      コネクタは、すべての列がバイナリログに書き込まれていることを要求します。

      Amazon Auroraでは適用されません。正しい値に固定されています。

      binlog_row_value_options

      空のままにする。

      このオプションは JSON 列にのみ影響し、 UPDATE ステートメントに対して、 JSON ドキュメントの変更部分のみを含めるように設定できます。コネクタは、完全なドキュメントがバイナリログに書き込まれていることを要求します。

      binlog_expire_logs_seconds

      データベースエージェントが長時間の停止やダウンタイム後も増分複製を継続できるように、少なくとも数時間、またはそれ以上に設定します。Snowflakeでは、コネクタを安定して動作させるために、 バイナリログの有効期限(binlog_expire_logs_seconds) を少なくとも数時間に設定することを推奨します。バイナリログの有効期限終了後に、バイナリログファイルが自動的に削除される場合があります。メンテナンス作業などで統合が長期間中断され、その間に期限切れのバイナリログファイルが削除されると、Openflowはこれらのファイルのデータを複製できなくなります。

      スケジュール複製を使用している場合は、構成されたスケジュールよりも長い値を指定する必要があります。

      例:

      log_bin = on
      binlog_format = row
      binlog_row_metadata = full
      binlog_row_image = full
      binlog_row_value_options =
      
      Copy
    2. sort_buffer_size の値を増やします。

      sort_buffer_size = 4194304
      
      Copy

      sort_buffer_size は、 ORDER BY のようなメモリ内ソート操作のためにクエリスレッドごとに割り当てられるメモリ量(バイト単位)を定義します。値が小さすぎると、以下のエラーメッセージが表示されてコネクタが失敗することがあります。

      Out of sort memory, consider increasing server sort buffer size。これは、 sort_buffer_size を増やす必要があることを示します。

    3. Amazon RDS データベースを使用している場合は、 rds_set_configuration を使用して binlog_expire_logs_seconds に関連する保持期間を増やしてください。例えば、binlogを24時間格納したい場合、 mysql.rds_set_configuration('binlog retention hours', 24) を呼び出します。

    4. 読み取りレプリカを使用して接続する場合は、レプリカでバイナリログを有効にする必要があります。

      構成の詳細は、ステップ4にあります。

    5. バイナリログを有効にした後、ソースから受信したイベントを独自のバイナリログに記録するレプリカを構成します。

      log_replica_updates = ON
      
      Copy

      log_replica_updates は、レプリカがそのソースから受信したイベントを独自のバイナリログに書き込み、それらの変更をレプリカから複製するすべてのデータベースで利用できるようにします。

    6. SSL 経由で接続します。MySQL への SSL 接続を使用する場合は、データベースサーバーのルート証明書を準備します。これは、構成時に必要です。

    7. コネクタのユーザーを作成します。コネクタは、バイナリログを読み取るために REPLICATION_SLAVE と REPLICATION_CLIENT の権限を持つユーザーを必要とします。これらの権限を付与します。

      GRANT REPLICATION SLAVE ON *.* TO '<username>'@'%'
      GRANT REPLICATION CLIENT ON *.* TO '<username>'@'%'
      
      Copy
    8. 複製されたすべてのテーブルに SELECT 権限を付与します。

      GRANT SELECT ON <schema>.* TO '<username>'@'%'
      GRANT SELECT ON <schema>.<table> TO '<username>'@'%'
      
      Copy

      複製セキュリティの詳細については、 バイナリログ をご参照ください。

  7. Snowflakeアカウント管理者として、以下のタスクを実行します。

    1. タイプを SERVICE としてSnowflakeユーザーを作成します。複製データを格納するデータベースを作成し、Snowflakeユーザーに USAGE および CREATE SCHEMA 権限 を付与して、そのデータベースにオブジェクトを作成する権限を設定します。

      CREATE DATABASE <destination_database>;
      CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow';
      CREATE ROLE <openflow_role>;
      GRANT ROLE <openflow_role> TO USER <openflow_user>;
      GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>;
      GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
      CREATE WAREHOUSE <openflow_warehouse>
           WITH
               WAREHOUSE_SIZE = 'MEDIUM'
               AUTO_SUSPEND = 300
               AUTO_RESUME = TRUE;
      GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
      
      Copy
    2. セキュアキーのペア(公開キーと秘密キー)を作成します。ユーザーの秘密キーをファイルに格納して、コネクタの構成に提供します。Snowflakeサービスユーザーに公開キーを割り当てます:

      ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
      
      Copy

      詳細については、 キーのペア をご参照ください。

    3. コネクタが使用するウェアハウスを指定します。で開始する MEDIUM ウェアハウスのサイズから、レプリケートされるテーブルの量とデータの転送量に応じてサイズを実験します。テーブル数が大きい場合は、通常、ウェアハウスのサイズよりも、 マルチクラスターウェアハウス を使用した方がスケーリングが向上します。

コネクタを設定する

データエンジニアとして、以下のタスクを実行してコネクタをインストールおよび構成します。

コネクタをインストールする

  1. Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.

  2. Openflowのコネクタページでコネクタを探し、 Add to runtime を選択します。

  3. In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.

    注釈

    コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベースとスキーマをSnowflakeで作成したことを確認します。

  4. Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。

  5. Snowflakeアカウント認証情報でランタイムを認証します。

コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。

コネクタを構成する

以下のユースケースにコネクタを構成できます。

リアルタイムでテーブルのセットを複製する

  1. インポートしたプロセスグループを右クリックし、 Parameters を選択します。

  2. フローパラメーター の説明に従って、必要なパラメーター値を入力します。

フローパラメーター

まず、 MySQL ソースパラメーターコンテキストのパラメーターを設定し、次に MySQL 宛先パラメーターコンテキストを設定します。これが完了したら、コネクタを有効にします。コネクタを MySQL とSnowflakeの両方に接続し、実行を開始する必要があります。ただし、複製するテーブルが構成に明示的に追加されるまでは、コネクタはデータを複製しません。

複製する特定のテーブルを構成するには、 MySQL 取り込みパラメーターコンテキストを編集します。複製パラメーターコンテキストに変更を適用すると、コネクタが構成を取得し、各テーブルの複製ライフサイクルが開始します。

MySQL ソースパラメーターコンテキスト

パラメーター

説明

MySQL 接続 URL

フル JDBCURL ソースデータベース。コネクタは MariaDB ドライバーを使用します。これは、MySQL と互換性があり、URLで jdbc:mariadb のプレフィックスが必要 です。SSL が無効になっている場合は、接続 URL の allowPublicKeyRetrieval に設定されたパラメーター true が必要です。

例:

  • SSL が有効: jdbc:mariadb://example.com:3306

  • SSL が無効: jdbc:mariadb://example.com:3306?allowPublicKeyRetrieval=true

MySQL JDBC ドライバー

MariaDB JDBC ドライバーのjar への絶対パス。コネクタは MySQL と互換性のある MariaDB ドライバーを使用します。MariaDB JDBC ドライバーをアップロードするには、 Reference asset チェックボックスを選択します。

例: /opt/resources/drivers/mariadb-java-client-3.5.2.jar

MySQL ユーザー名

コネクタのユーザー名。

MySQL パスワード

コネクタのパスワード。

MySQL 宛先パラメーターコンテキスト

パラメーター

説明

必須

宛先データベース

データが永続化されるデータベース。Snowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。

有り

Snowflake認証ストラテジー

以下を使用する場合:

  • Snowflake Openflow Deployment or BYOC: Use SNOWFLAKE_SESSION_TOKEN. This token is managed automatically by Snowflake. BYOC deployments must have previously configured runtime roles to use SNOWFLAKE_SESSION_TOKEN.

  • BYOC: Alternatively BYOC can use KEY_PAIR as the value for authentication strategy.

有り

Snowflakeアカウント識別子

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR: データが永続化される[organization-name]-[account-name]形式のSnowflakeアカウント名。

有り

Snowflake秘密キー

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:認証に使用される RSA プライベートキーである必要があります。

    その RSA キーは PKCS8 標準に従ってフォーマットされ、標準の PEM ヘッダーとフッターを持つ必要があります。SnowflakeプライベートキーファイルまたはSnowflakeプライベートキーのいずれかを定義する必要があることに注意してください。

無し

Snowflake秘密キーファイル

以下を使用する場合:

  • Session token authentication strategy:プライベートキーファイルは空白である必要があります。

  • KEY_PAIR:Snowflakeへの認証に使用される RSA プライベートキーを含むファイルをアップロードします。これは、PKCS8 標準に従ってフォーマットされ、標準の PEM ヘッダーとフッターを含んでいる必要があります。ヘッダー行は で始まります -----BEGIN PRIVATE.プライベートキーファイルをアップロードするには、Reference asset チェックボックスを選択します。

無し

Snowflake秘密キーパスワード

以下を使用する場合

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:Snowflakeプライベートキーファイルに関連付けられたパスワードを提供します。

無し

Snowflakeロール

以下を使用する場合

  • Session Token Authentication Strategy:ランタイムロールを使用します。ランタイムの View Details に移動すると、Openflow UI でランタイムロールを見つけることができます。

  • KEY_PAIR Authentication Strategy:サービスユーザーのために構成された有効なロールを使用します。

有り

Snowflakeのユーザー名

以下を使用する場合

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:Snowflakeインスタンスへの接続に使用するユーザー名を提供します。

有り

Snowflakeウェアハウス

クエリの実行に使用されるSnowflakeウェアハウス。

有り

MySQL 取り込みパラメーターコンテキスト

パラメーター

説明

含まれるテーブル名

スキーマを含むテーブルパスのコンマ区切りリスト。例: public.my_table, other_schema.other_table

含まれるテーブル正規表現

テーブルパスに一致させる正規表現。式に一致するすべてのパスが複製され、後から作成されたパターンに一致する新しいテーブルも自動的に含まれます。例: public\.auto_.*

フィルター JSON

完全修飾されたテーブル名のリストと、複製に含めるべき列名の正規表現パターンを含む JSON。例: [ {"schema":"public", "table":"table1", "includedPattern":".*name"} ]public スキーマから table1name で終わるすべての列を含めます。

タスクスケジュール CRON をマージする

ジャーナルから宛先テーブルへのマージ操作がトリガーされる期間を定義する CRON 式。連続的なマージやタイムスケジュールでウェアハウスの実行時間を制限したい場合は、 * * * * * ? に設定します。例えば、 * 0 * * * ? という文字列は、丸1時間で1分間のマージをスケジュールしたいことを示しています。* 20 14 ? * MON-FRI という文字列は、毎週月曜日から金曜日の2:20 PM にマージをスケジュールしたいことを示しています。詳細と例については、 CronTrigger チュートリアル をご参照ください。

Object Identifier Resolution

Specifies how source object identifiers such as the names of schemas, tables, and columns are stored and queried in Snowflake. This setting specifies that you must use double quotes in SQL queries.

Option 1: Default, case-sensitive. For backwards compatibility.

  • Transformation: Case is preserved. For example, My_Table remains My_Table.

  • Queries: SQL queries must use double quotes to match the exact case for database objects. For example, SELECT * FROM "My_Table";.

注釈

Snowflake recommends using this option if you must preserve source casing for legacy or compatibility reasons. For example, if the source database includes table names that differ in case only--such as MY_TABLE and my_table--that would result in a name collision when using when using case-insensitive comparisons.

Option 2: Recommended, case-insensitive

  • Transformation: All identifiers are converted to uppercase. For example, My_Table becomes MY_TABLE.

  • Queries: SQL queries are case-insensitive and don't require SQL double quotes. For example, SELECT * FROM my_table; returns the same results as SELECT * FROM MY_TABLE;.

注釈

Snowflake recommends using this option if database objects are not expected to have mixed case names.

重要

Do not change this setting after the connector has begun ingesting data. Changing this setting after ingestion has begun breaks the existing ingestion. If you must change this setting, create a new connector instance.

テーブルを削除し、複製に再追加する

複製からテーブルを削除するには、複製パラメーターコンテキストの Included Table Names または Included Table Regex パラメーターからテーブルが削除されていることを確認します。

後でテーブルを複製に再追加する場合は、まずSnowflakeで対応する宛先テーブルを削除します。その後、 Included Table Names または Included Table Regex パラメーターにテーブルを追加して戻します。これにより、テーブルの複製プロセスが新しく開始されます。

このアプローチは、失敗したテーブル複製シナリオからの復旧にも使用できます。

テーブルの列のサブセットを複製します。

コネクタは、テーブルごとに複製されるデータを構成列のサブセットにフィルターできます。

列にフィルターを適用するには、複製パラメーターコンテキストのColumn Filterプロパティを変更し、フィルターを適用したいテーブルごとに1エントリずつ、構成の配列を追加します。

列は、名前またはパターン別に包含したり、除外したりすることができます。テーブルごとに単一の条件を適用することも、複数の条件を組み合わせて適用することもできます。除外は常に包含より優先されます。

以下の例は、利用可能なフィールドを示しています。schema および table フィールドは必須です。includedexcludedincludedPatternexcludedPattern のうち1つ以上が必要です。

[
    {
        "schema": "<source table schema>",
        "table" : "<source table name>",
        "included": ["<column name>", "<column name>"],
        "excluded": ["<column name>", "<column name>"],
        "includedPattern": "<regular expression>",
        "excludedPattern": "<regular expression>",
    }
]
Copy

テーブルでデータ変更を追跡する

コネクタはソーステーブルのデータの現在の状態だけでなく、すべての変更セットのすべての行の状態も複製します。このデータは、宛先テーブルと同じスキーマで作成されたジャーナルテーブルに格納されます。

ジャーナルテーブル名の形式は次のとおりです。<source table name>_JOURNAL_<timestamp><schema generation> where <timestamp> is the value of epoch seconds when the source table was added to replication, and <schema generation> は整数で、ソーステーブルのスキーマが変更されるごとに増加します。その結果、スキーマが変更されるソーステーブルには、複数のジャーナルテーブルがあります。

テーブルが複製から削除され、その後再び追加されると、 <タイムスタンプ> の値が変更され、 <スキーマ生成>1 から再び開始されます。

重要

Snowflakeは、ジャーナルテーブルの構造を一切変更しないことを推奨します。これらは、コネクタによって複製プロセスの一部として宛先テーブルを更新するために使用されます。

コネクタはジャーナルテーブルをドロップすることはありませんが、複製されたすべてのソーステーブルに最新のジャーナルを使用し、ジャーナル上の追加専用ストリームのみを読み取ります。ストレージを回収するには、以下を実行します。

  • すべてのジャーナルテーブルをいつでも切り捨てます。

  • 複製から削除されたソーステーブルに関連するジャーナルテーブルをドロップします。

  • アクティブに複製されたテーブルの最新の生成ジャーナルテーブルを除いて、すべてをドロップします。

例えば、コネクタがソーステーブル orders をアクティブに複製するように設定されており、以前にテーブル customers を複製から削除した場合、以下のようなジャーナルテーブルが存在する可能性があります。この場合、 orders_5678_2除いて、それらのすべてをドロップできます。

customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2

マージタスクのスケジュールを構成する

コネクタはウェアハウスを使用して、変更データキャプチャ(CDC)データを宛先テーブルにマージします。この操作は、 MergeSnowflakeJournalTable プロセッサーによってトリガーされます。新しい変更がない場合、または MergeSnowflakeJournalTable キューで待機する新しいフローファイルがない場合、マージはトリガーされず、ウェアハウスは自動サスペンドします。

ウェアハウスのコストを制限し、スケジュールされた時間のみにマージを制限するには、Merge task Schedule CRON パラメーターで CRON 式を使用します。MergeSnowflakeJournalTable プロセッサーに送られてくるフローファイルをスロットルし、マージは専用の期間のみにトリガーされます。スケジュールに関する詳細は、 スケジュールストラテジー をご参照ください。

フローを実行する

  1. プレーンを右クリックし、 Enable all Controller Services を選択します。

  2. インポートしたプロセスグループを右クリックし、 Start を選択します。コネクタがデータの取り込みを開始します。