Openflow Connector for MySQL の設定

注釈

このコネクタは、 Snowflakeコネクタ規約 に従うものとします。

このトピックでは、 Openflow Connector for MySQL を設定する手順について説明します。

注釈

このコネクタは、スナップショットのロードフェーズをバイパスして、新しく追加されたテーブルの増分変更の複製をすぐに開始するように構成できます。このオプションは、以前に複製されたデータが存在し、テーブルを再スナップショットせずに複製を続行するアカウントにコネクタを再インストールする場合に、よく役立ちます。

増分ロードプロセスの詳細については、 増分複製 をご参照ください。

前提条件

  1. Openflow Connector for MySQL について を確認してください。

  2. :doc:`/user-guide/data-integration/openflow/setup-openflow-byoc`または:doc:`Openflow のセットアップ - Snowflake デプロイメント</user-guide/data-integration/openflow/setup-openflow-spcs>`があることを確認してください。

  3. |OFSFSPCS-plural|を使用する場合、:doc:`必要なドメインの構成</user-guide/data-integration/openflow/setup-openflow-spcs-sf-allow-list>`を精査し、:ref:`label-openflow_domains_used_by_openflow_connectors_mysql`コネクタに必要なドメインへのアクセス許可を付与していることを確認してください。

  4. Snowflakeとデータを同期するため、 MySQL 8以降のバージョンを使用していることを確認します。

  5. 推奨:ランタイムごとにコネクタインスタンスを1つだけ追加するようにします。

  6. データベース管理者として、以下のタスクを実行します。

    1. バイナリログ を有効にし、その形式を以下のように保存して構成します。

      log_bin

      on に設定。

      これにより、構造やデータの変更を記録するバイナリログが有効になります。

      binlog_format

      row に設定。

      コネクタは行ベースの複製のみをサポートします。MySQL 8.xバージョンがこの設定をサポートする最後のバージョンとなる可能性があり、将来のバージョンでは行ベースの複製のみがサポートされます。

      GCP クラウド SQL では適用されません。正しい値に固定されています。

      binlog_row_metadata

      full に設定。

      コネクタが動作するためには、すべての行メタデータが必要で、最も重要であるのは列名と主キー情報です。

      MySQL 用のMicrosoft Azureデータベースでは、 binlog_row_metadata フィールドはユーザーが変更できません。この値を変更するには、Microsoftサポートチケットを提出してください。

      binlog_row_image

      full に設定。

      コネクタは、すべての列がバイナリログに書き込まれていることを要求します。

      Amazon Auroraでは適用されません。正しい値に固定されています。

      binlog_row_value_options

      空のままにする。

      このオプションは JSON 列にのみ影響し、 UPDATE ステートメントに対して、 JSON ドキュメントの変更部分のみを含めるように設定できます。コネクタは、完全なドキュメントがバイナリログに書き込まれていることを要求します。

      binlog_expire_logs_seconds

      データベースエージェントが長時間の停止やダウンタイム後も増分複製を継続できるように、少なくとも数時間、またはそれ以上に設定します。Snowflakeでは、コネクタを安定して動作させるために、 バイナリログの有効期限(binlog_expire_logs_seconds) を少なくとも数時間に設定することを推奨します。バイナリログの有効期限終了後に、バイナリログファイルが自動的に削除される場合があります。メンテナンス作業などで統合が長期間中断され、その間に期限切れのバイナリログファイルが削除されると、Openflowはこれらのファイルのデータを複製できなくなります。

      スケジュール複製を使用している場合は、構成されたスケジュールよりも長い値を指定する必要があります。

      例:

      log_bin = on
      binlog_format = row
      binlog_row_metadata = full
      binlog_row_image = full
      binlog_row_value_options =
      
      Copy
    2. sort_buffer_size の値を増やします。

      sort_buffer_size = 4194304
      
      Copy

      sort_buffer_size は、 ORDER BY のようなメモリ内ソート操作のためにクエリスレッドごとに割り当てられるメモリ量(バイト単位)を定義します。値が小さすぎると、以下のエラーメッセージが表示されてコネクタが失敗することがあります。

      Out of sort memory, consider increasing server sort buffer size。これは、 sort_buffer_size を増やす必要があることを示します。

    3. Amazon RDS データベースを使用している場合は、 rds_set_configuration を使用して binlog_expire_logs_seconds に関連する保持期間を増やしてください。例えば、binlogを24時間格納したい場合、 mysql.rds_set_configuration('binlog retention hours', 24) を呼び出します。

    4. 読み取りレプリカを使用して接続する場合は、レプリカでバイナリログを有効にする必要があります。

      構成の詳細は、ステップ4にあります。

    5. バイナリログを有効にした後、ソースから受信したイベントを独自のバイナリログに記録するレプリカを構成します。

      log_replica_updates = ON
      
      Copy

      log_replica_updates は、レプリカがそのソースから受信したイベントを独自のバイナリログに書き込み、それらの変更をレプリカから複製するすべてのデータベースで利用できるようにします。

    6. SSL 経由で接続します。MySQL への SSL 接続を使用する場合は、データベースサーバーのルート証明書を準備します。これは、構成時に必要です。

    7. コネクタのユーザーを作成します。コネクタは、バイナリログを読み取るために REPLICATION_SLAVE と REPLICATION_CLIENT の権限を持つユーザーを必要とします。これらの権限を付与します。

      GRANT REPLICATION SLAVE ON *.* TO '<username>'@'%'
      GRANT REPLICATION CLIENT ON *.* TO '<username>'@'%'
      
      Copy
    8. 複製されたすべてのテーブルに SELECT 権限を付与します。

      GRANT SELECT ON <schema>.* TO '<username>'@'%'
      GRANT SELECT ON <schema>.<table> TO '<username>'@'%'
      
      Copy

      複製セキュリティの詳細については、 バイナリログ をご参照ください。

  7. Snowflakeアカウント管理者として、以下のタスクを実行します。

    1. タイプを SERVICE としてSnowflakeユーザーを作成します。複製データを格納するデータベースを作成し、Snowflakeユーザーに USAGE および CREATE SCHEMA 権限 を付与して、そのデータベースにオブジェクトを作成する権限を設定します。

      CREATE DATABASE <destination_database>;
      CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow';
      CREATE ROLE <openflow_role>;
      GRANT ROLE <openflow_role> TO USER <openflow_user>;
      GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>;
      GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
      CREATE WAREHOUSE <openflow_warehouse>
           WITH
               WAREHOUSE_SIZE = 'XSMALL'
               AUTO_SUSPEND = 300
               AUTO_RESUME = TRUE;
      GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
      
      Copy
    2. セキュアキーのペア(公開キーと秘密キー)を作成します。ユーザーの秘密キーをファイルに格納して、コネクタの構成に提供します。Snowflakeサービスユーザーに公開キーを割り当てます:

      ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
      
      Copy

      詳細については、 キーのペア をご参照ください。

    3. コネクタが使用するウェアハウスを指定します。で開始する XSMALL ウェアハウスのサイズから、レプリケートされるテーブルの量とデータの転送量に応じてサイズを実験します。多数のテーブルを扱う場合、通常はウェアハウスサイズではなく、 マルチクラスターウェアハウス を使用した方がスケールしやすくなります。

コネクタをインストールする

コネクタをインストールするには、データエンジニアとして次を実行します。

  1. Openflow概要ページに移動します。Featured connectors セクションで、 View more connectors を選択します。

  2. Openflowのコネクタページでコネクタを探し、 Add to runtime を選択します。

  3. Select runtime ダイアログで、Available runtimes ドロップダウンリストからランタイムを選択して Add をクリックします。

    注釈

    コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベースとスキーマをSnowflakeで作成したことを確認します。

  4. Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。

  5. Snowflakeアカウント認証情報でランタイムを認証します。

コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。

コネクタを構成する

コネクタを構成するには、データエンジニアとして次の手順を実行します。

  1. インポートしたプロセスグループを右クリックし、 Parameters を選択します。

  2. フローパラメーター の説明に従って、必要なパラメーター値を入力します。

フローパラメーター

まず、 MySQL ソースパラメーターコンテキストのパラメーターを設定し、次に MySQL 宛先パラメーターコンテキストを設定します。これが完了したら、コネクタを有効にします。コネクタを MySQL とSnowflakeの両方に接続し、実行を開始する必要があります。ただし、複製するテーブルが構成に明示的に追加されるまでは、コネクタはデータを複製しません。

複製する特定のテーブルを構成するには、 MySQL 取り込みパラメーターコンテキストを編集します。複製パラメーターコンテキストに変更を適用すると、コネクタが構成を取得し、各テーブルの複製ライフサイクルが開始します。

MySQL ソースパラメーターコンテキスト

パラメーター

説明

MySQL 接続 URL

フル JDBCURL ソースデータベース。コネクタは MariaDB ドライバーを使用します。これは、MySQL と互換性があり、URLで jdbc:mariadb のプレフィックスが必要 です。SSL が無効になっている場合は、接続 URL の allowPublicKeyRetrieval に設定されたパラメーター true が必要です。

例:

  • SSL が有効: jdbc:mariadb://example.com:3306

  • SSL が無効: jdbc:mariadb://example.com:3306?allowPublicKeyRetrieval=true

MySQL JDBC ドライバー

MariaDB JDBC ドライバーのjar への絶対パス。コネクタは MySQL と互換性のある MariaDB ドライバーを使用します。MariaDB JDBC ドライバーをアップロードするには、 Reference asset チェックボックスを選択します。

例: /opt/resources/drivers/mariadb-java-client-3.5.2.jar

MySQL ユーザー名

コネクタのユーザー名。

MySQL パスワード

コネクタのパスワード。

MySQL 宛先パラメーターコンテキスト

パラメーター

説明

必須

宛先データベース

データが永続化されるデータベース。Snowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。

有り

Snowflake認証ストラテジー

以下を使用する場合:

  • Snowflake Openflow Deployment または BYOC:SNOWFLAKE_MANAGED_TOKEN を使用します。このトークンはSnowflakeによって自動的に管理されます。 BYOC デプロイメントでは、 SNOWFLAKE_MANAGED_TOKEN を使用するために、事前に ランタイムロール が構成されている必要があります。

  • BYOC: 代わりに、BYOC では認証戦略の値として KEY_PAIR を使用できます。

有り

Snowflakeアカウント識別子

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR: データが永続化される[organization-name]-[account-name]形式のSnowflakeアカウント名。

有り

Snowflake接続の戦略

KEY_PAIRを使用する場合は、Snowflakeに接続するための戦略を特定します。

  • STANDARD (デフォルト):Snowflakeサービスへの標準パブリックルーティングを使用して接続します。

  • PRIVATE_CONNECTIVITY:AWS PrivateLinkなど、サポート対象のクラウドプラットフォームに関連付けられたプライベートアドレスを使用して接続します。

KEY_PAIRを使用したBYOCのみに必要です。それ以外の場合は無視されます。

Snowflake秘密キー

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:認証に使用される RSA プライベートキーである必要があります。

    その RSA キーは PKCS8 標準に従ってフォーマットされ、標準の PEM ヘッダーとフッターを持つ必要があります。SnowflakeプライベートキーファイルまたはSnowflakeプライベートキーのいずれかを定義する必要があることに注意してください。

無し

Snowflake秘密キーファイル

以下を使用する場合:

  • Session token authentication strategy:プライベートキーファイルは空白である必要があります。

  • KEY_PAIR:Snowflakeへの認証に使用される RSA プライベートキーを含むファイルをアップロードします。これは、PKCS8 標準に従ってフォーマットされ、標準の PEM ヘッダーとフッターを含んでいる必要があります。ヘッダー行は で始まります -----BEGIN PRIVATE.プライベートキーファイルをアップロードするには、Reference asset チェックボックスを選択します。

無し

Snowflake秘密キーパスワード

以下を使用する場合

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:Snowflakeプライベートキーファイルに関連付けられたパスワードを提供します。

無し

Snowflakeロール

以下を使用する場合

  • Session Token Authentication Strategy:ランタイムに割り当てられたSnowflakeロール、またはこのSnowflakeロールに付与された子ロールを使用します。ランタイムのSnowflakeロールは、Openflow UIでランタイムの:ui:`More Options [⋮]`ボタンを展開し、:ui:`Set Snowflake role`を選択すると確認できます。

  • KEY_PAIR Authentication Strategy:サービスユーザーのために構成された有効なロールを使用します。

有り

Snowflakeのユーザー名

以下を使用する場合

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:Snowflakeインスタンスへの接続に使用するユーザー名を提供します。

有り

オーバーサイズ値戦略

複製中にコネクタが内部サイズ制限(16MB)を超える値を処理する方法を決定します。可能な値は次のとおりです。

  • **失敗テーブル**(デフォルト):テーブルは永久に失敗したものとしてマークされ、そのテーブルの複製が停止します。

  • Nullの設定:値は宛先テーブルで``NULL``に置き換えられます。オーバーサイズ値を超えるテーブル内のデータが失われても許容できる場合、これを使用してテーブルの失敗を防ぎます。

無し

Snowflakeウェアハウス

クエリの実行に使用されるSnowflakeウェアハウス。

有り

MySQL 取り込みパラメーターコンテキスト

パラメーター

説明

含まれるテーブル名

スキーマを含むテーブルパスのコンマ区切りリスト。例: public.my_table, other_schema.other_table

含まれるテーブル正規表現

テーブルパスに一致させる正規表現。式に一致するすべてのパスが複製され、後から作成されたパターンに一致する新しいテーブルも自動的に含まれます。例: public\.auto_.*

フィルター JSON

完全修飾されたテーブル名のリストと、複製に含めるべき列名の正規表現パターンを含む JSON。例: [ {"schema":"public", "table":"table1", "includedPattern":".*name"} ]public スキーマから table1name で終わるすべての列を含めます。

タスクスケジュール CRON をマージする

ジャーナルから宛先テーブルへのマージ操作がトリガーされる期間を定義する CRON 式。連続的なマージやタイムスケジュールでウェアハウスの実行時間を制限したい場合は、 * * * * * ? に設定します。例えば、 * 0 * * * ? という文字列は、丸1時間で1分間のマージをスケジュールしたいことを示しています。* 20 14 ? * MON-FRI という文字列は、毎週月曜日から金曜日の2:20 PM にマージをスケジュールしたいことを示しています。詳細と例については、 CronTrigger チュートリアル をご参照ください。

オブジェクト識別子の解決

スキーマ名、テーブル名、列名などのソースオブジェクト識別子がSnowflakeにどのように保存され、クエリされるかを指定します。この設定は、 SQL クエリで二重引用符を使用する必要があることを指定します。

:emph:`オプション1:(デフォルト)大文字と小文字を区別する。 `下位互換性のため。

  • 変換:大文字小文字は維持されます。たとえば、 My_TableMy_Table のままです。

  • クエリ: SQL クエリは、データベースオブジェクトの大文字と小文字を正確に一致させるために、二重引用符を使用する必要があります。例: SELECT * FROM "My_Table";

注釈

Snowflakeでは、レガシーまたは互換性の理由からソースの大文字小文字の区別を保持する必要がある場合、このオプションを使用することを推奨しています。たとえば、 MY_TABLEmy_table のように大文字と小文字だけが異なるテーブル名がソースデータベースに含まれている場合、大文字と小文字を区別しない比較を使用すると、名前の衝突が発生します。

:emph:`オプション 2:(推奨)大文字と小文字を区別しない `

  • 変換:識別子はすべて大文字に変換されます。たとえば、 My_TableMY_TABLE になります。

  • クエリ: SQL クエリは大文字と小文字を区別せず、 SQL 二重引用符は必要ありません。たとえば、 SELECT * FROM my_table;SELECT * FROM MY_TABLE; と同じ結果を返します。

注釈

Snowflakeでは、データベースオブジェクトに大文字と小文字が混在する名前が想定されない場合、このオプションを使用することを推奨しています。

重要

コネクタがデータのインジェスチョンを開始した後は、この設定を変更しないでください。取り込み開始後にこの設定を変更すると、既存の取り込みは中断されます。この設定を変更する必要がある場合は、新しいコネクタインスタンスを作成してください。

テーブル複製の再開

(主キーが欠落している、またはスキーマの変更がサポートされていないなどの理由で) FAILED 状態のテーブルは、自動的に再開されません。テーブルが FAILED 状態になった場合、または複製を最初から再開する必要がある場合は、次の手順に従ってテーブルを削除し、複製に再度追加します。

注釈

主キーが欠落しているなど、ソーステーブルの問題が原因で障害が発生した場合は、続行する前にソースデータベースでその問題を解決します。

  1. フローパラメーターからテーブルを削除します。取り込みパラメーターのコンテキストで、Included Table Names からテーブルを削除するか、Included Table Regex を変更してテーブルが一致しないようにします。

  2. テーブルが削除されたことを確認します。

    1. Openflowランタイムキャンバスで、プロセッサーグループを右クリックし、Controller Services を選択します。

    2. コントローラーサービスをリストしたテーブルで、Table State Store 行を見つけ、行の右側にある縦3つのドットをクリックして、View State を選択します。

    重要

    続行する前に、テーブルの状態がこのリストから完全に削除されるまで待つ必要があります。この構成変更が完了するまで続行しないでください。

  3. 宛先をクリーンアップする:テーブルの状態が完全に削除されたと表示されたら、Snowflake で宛先テーブルを手動で DROP します。スナップショットフェーズ中に、コネクタは既存の宛先テーブルを上書きしないことに注意してください。テーブルがまだ存在する場合、複製は再度失敗します。オプションで、ジャーナルテーブルとストリームが不要になった場合は削除することもできます。

  4. テーブルを再度追加する: Included Table Names または Included Table Regex パラメーターを更新して、テーブルを再度含めます。

  5. 再開を確認する:前述の指示に従って Table State Store をチェックします。テーブルの状態は、ステータス NEW で表示され、次に SNAPSHOT_REPLICATION に移行し、最後に INCREMENTAL_REPLICATION になります。

テーブルの列のサブセットを複製します。

コネクタは、テーブルごとに複製されるデータを構成列のサブセットにフィルターできます。

列にフィルターを適用するには、複製パラメーターコンテキストのColumn Filterプロパティを変更し、フィルターを適用したいテーブルごとに1エントリずつ、構成の配列を追加します。

列は、名前またはパターン別に包含したり、除外したりすることができます。テーブルごとに単一の条件を適用することも、複数の条件を組み合わせて適用することもできます。除外は常に包含より優先されます。

以下の例は、利用可能なフィールドを示しています。schema および table フィールドは必須です。includedexcludedincludedPatternexcludedPattern のうち1つ以上が必要です。

[
    {
        "schema": "<source table schema>",
        "table" : "<source table name>",
        "included": ["<column name>", "<column name>"],
        "excluded": ["<column name>", "<column name>"],
        "includedPattern": "<regular expression>",
        "excludedPattern": "<regular expression>",
    }
]
Copy

テーブルでデータ変更を追跡する

コネクタはソーステーブルのデータの現在の状態だけでなく、すべての変更セットのすべての行の状態も複製します。このデータは、宛先テーブルと同じスキーマで作成されたジャーナルテーブルに格納されます。

ジャーナルテーブル名の形式は``<source_table_name>_JOURNAL_<timestamp>_<schema_generation>``です。ここで、``<timestamp>``はソーステーブルが複製に追加されたときのエポック秒の値であり、``<schema_generation>``はソーステーブルのスキーマ変更ごとに増加する整数です。その結果、スキーマが変更されるソーステーブルには、複数のジャーナルテーブルがあります。

テーブルが複製から削除され、再度追加されると、``<timestamp>``値が変更され、``<schema_generation>``は``1``から再開します。

重要

Snowflakeは、ジャーナルテーブルの構造を一切変更しないことを推奨します。これらは、コネクタによって複製プロセスの一部として宛先テーブルを更新するために使用されます。

コネクタはジャーナルテーブルをドロップすることはありませんが、複製されたすべてのソーステーブルに最新のジャーナルを使用し、ジャーナル上の追加専用ストリームのみを読み取ります。ストレージを回収するには、以下を実行します。

  • すべてのジャーナルテーブルをいつでも切り捨てます。

  • 複製から削除されたソーステーブルに関連するジャーナルテーブルをドロップします。

  • アクティブに複製されたテーブルの最新の生成ジャーナルテーブルを除いて、すべてをドロップします。

例えば、コネクタがソーステーブル orders をアクティブに複製するように設定されており、以前にテーブル customers を複製から削除した場合、以下のようなジャーナルテーブルが存在する可能性があります。この場合、 orders_5678_2除いて、それらのすべてをドロップできます。

customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2

マージタスクのスケジュールを構成する

コネクタはウェアハウスを使用して、変更データキャプチャ(CDC)データを宛先テーブルにマージします。この操作は、 MergeSnowflakeJournalTable プロセッサーによってトリガーされます。新しい変更がない場合、または MergeSnowflakeJournalTable キューで待機する新しいフローファイルがない場合、マージはトリガーされず、ウェアハウスは自動サスペンドします。

ウェアハウスのコストを制限し、スケジュールされた時間のみにマージを制限するには、Merge task Schedule CRON パラメーターで CRON 式を使用します。MergeSnowflakeJournalTable プロセッサーに送られてくるフローファイルをスロットルし、マージは専用の期間のみにトリガーされます。スケジュールに関する詳細は、 スケジュールストラテジー をご参照ください。

フローを実行する

  1. プレーンを右クリックし、 Enable all Controller Services を選択します。

  2. インポートしたプロセスグループを右クリックし、 Start を選択します。コネクタがデータの取り込みを開始します。