Openflow Connector for PostgreSQL を設定する

注釈

コネクタには、 コネクタ利用規約 が適用されます。

このトピックでは、 Openflow Connector for PostgreSQL を設定する手順について説明します。

前提条件

  1. Openflow Connector for PostgreSQL について を確認してください。

  2. サポートされている PostgreSQL バージョン を確認してください。

  3. Openflowを設定した ことを確認します。

  4. データベース管理者として、以下のタスクを実行します。

    1. wal_levelの構成

    2. パブリケーションの作成

    3. WAL 用の PostgreSQL サーバーに十分なディスク容量があることを確認します。これは、複製スロットが作成されると、コネクタがその位置を確認して進むまで、 PostgreSQL が複製スロットが保持する位置の WAL データを保持するためです。

    4. 複製が有効なすべてのテーブルに主キーがあることを確認します。キーは単一列でも複合列でもかまいません。

    5. テーブルの REPLICA IDENTITYDEFAULT に設定します。これにより、主キーが WAL で表現され、コネクタがそれを読み取れるようになります。

    6. コネクタのユーザーを作成します。コネクタには、 REPLICATION 属性と、複製されたすべてのテーブルから SELECT にアクセスする権限を持つユーザーが必要です。コネクタの構成に入るためのパスワードを持つユーザーを作成します。複製セキュリティの詳細については、 セキュリティ をご参照ください。

wal_levelの構成

Openflow Connector for PostgreSQL では wal_levellogical に設定する必要があります。

PostgreSQL サーバーがホストされている場所に応じて、wal_levelを以下のように構成できます:

オンプレミス

スーパーユーザーまたは ALTER SYSTEM 権限を持つユーザーで、次のクエリを実行します。

ALTER SYSTEM SET wal_level = logical;
Copy

RDS

エージェントが使用するユーザには、 rds_superuser または rds_replication ロールが割り当てられている必要があります。

次も設定する必要があります。

  • rds.logical_replication 静的パラメーターを1に設定します。

  • データベースとレプリケーションの設定に応じて、 max_replication_slotsmax_connectionsmax_wal_senders パラメーターを設定します。

AWS Aurora

rds.logical_replication 情的パラメーターを1に設定します。

GCP

次のフラグを設定します。

  • cloudsql.logical_decoding=on

  • cloudsql.enable_pglogical=on

詳細については、 Google Cloudドキュメント をご参照ください。

Azure

レプリケーションサポートを Logical に設定します。詳細については、 Azureドキュメンテーション をご参照ください。

パブリケーションの作成

Openflow Connector for PostgreSQL では、複製を開始する前に、 パブリケーション を PostgreSQL で作成し、構成する必要があります。すべてのテーブル、またはテーブルのサブセットに対して作成することも、特定の列のみを持つ特定のテーブルに対して作成することもできます。複製を予定しているすべてのテーブルと列がパブリケーションに含まれていることを確認してください。コネクタの実行中であれば、後でパブリケーションを変更することもできます。パブリケーションを作成および構成するには、以下の手順を実行します。

  1. データベースに CREATE 権限を持つユーザーとしてログインし、以下のクエリを実行します。

CREATE PUBLICATION <publication name>;
Copy
  1. 次を使用して、データベースエージェントが閲覧できるテーブルを定義します。

ALTER PUBLICATION <publication name> ADD TABLE <table name>;
Copy

重要

PostgreSQL 15以降 では、指定されたテーブル列のサブセットに対するパブリケーションの構成をサポートしています。コネクタがこれを正しくサポートするには、 列フィルタリング設定 を使用して、パブリケーションで設定された列と同じ列を含める必要があります。

この設定を行わない場合、コネクタで以下の動作が見られます。

  • 宛先テーブルで、フィルターに含まれない列のサフィックスが __DELETED になります。スナップショットフェーズ中にレプリケートされたすべてのデータはそのまま残ります。

  • パブリケーションに新しい列を追加した後、テーブルは永続的に失敗し、複製を再開する必要があります。

詳細については、 ALTER PUBLICATION をご参照ください。

  1. Snowflakeアカウント管理者として、以下のタスクを実行します。

    1. タイプを SERVICE としてSnowflakeユーザーを作成します。複製データを格納するデータベースを作成し、Snowflakeユーザーに USAGE および CREATE SCHEMA 権限 を付与して、そのデータベースにオブジェクトを作成する権限を設定します。

      CREATE DATABASE <destination_database>;
      CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow';
      CREATE ROLE <openflow_role>;
      GRANT ROLE <openflow_role> TO USER <openflow_user>;
      GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>;
      GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
      CREATE WAREHOUSE <openflow_warehouse>
           WITH
                WAREHOUSE_SIZE = 'MEDIUM'
                AUTO_SUSPEND = 300
                AUTO_RESUME = TRUE;
      GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
      
      Copy
    2. セキュアキーのペア(公開キーと秘密キー)を作成します。ユーザーの秘密キーをファイルに格納して、コネクタの構成に提供します。Snowflakeサービスユーザーに公開キーを割り当てます:

      ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
      
      Copy

      詳細については、 キーのペア をご参照ください。

    3. コネクタが使用するウェアハウスを指定します。まずは MEDIUM のウェアハウスサイズから始め、複製するテーブルの量や転送するデータ量に応じてサイズを試してみてください。テーブル数が大きい場合は、通常、ウェアハウスのサイズよりも、 マルチクラスターウェアハウス を使用した方がスケーリングが向上します。

コネクタ定義をOpenflowにインポートする

  1. Openflowの概要ページに移動します。 Featured connectors セクションで、 View more connectors を選択します。

  2. Openflowのコネクタページでコネクタを探し、 Add to runtime を選択します。

  3. Select runtime ダイアログで、 Available runtimes ドロップダウンリストからランタイムを選択します。

  4. Add を選択します。

    注釈

    コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベースとスキーマをSnowflakeで作成したことを確認します。

  5. Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。

  6. Snowflakeアカウント認証情報でランタイムを認証します。

コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。

コネクタを構成する

以下のユースケースにコネクタを構成できます。

リアルタイムでテーブルのセットを複製する

  1. インポートしたプロセスグループを右クリックし、 Parameters を選択します。

  2. フローパラメーター の説明に従って、必要なパラメーター値を入力します。

フローパラメーター

まず、 PostgreSQL ソースパラメーターコンテキストのパラメーターを設定し、次に PostgreSQL 宛先パラメーターコンテキストを設定します。これが完了したら、コネクタを有効にして、 PostgreSQL とSnowflakeの両方に接続し、実行を開始します。ただし、明示的にテーブルが構成に追加されるまでは、データの複製は行われません。

複製する特定のテーブルを構成するには、 PostgreSQL 取り込みパラメーターコンテキストを編集します。複製パラメーターコンテキストに変更を適用した直後に、コネクタが構成を取得し、各テーブルの複製ライフサイクルが開始します。

PostgreSQL ソースパラメーターコンテキスト

パラメーター

説明

Postgres 接続 URL

ソースデータベースへの完全な JDBC URL。例: jdbc:postgresql://example.com:5432/public

Postgres JDBC ドライバー

PostgreSQL JDBC ドライバーのjar へのパス。ウェブサイトからjarをダウンロードし、 Reference asset チェックボックスを選択してアップロードして添付します。

Postgres SSL モード

SSL 接続を有効または無効にします。

Postgresルート SSL 証明書

データベースのルート証明書のフルコンテンツ。SSL が無効の場合はオプション。

Postgresユーザー名

コネクタのユーザー名。

Postgresパスワード

コネクタのパスワード。

出版物名

以前に作成したパブリケーションの名前。

PostgreSQL 宛先パラメーターコンテキスト

パラメーター

説明

宛先データベース

データが永続化されるデータベース。既にSnowflakeに存在している必要があります。

Snowflakeアカウント識別子

データが永続化されるSnowflakeアカウント名(形式: [organization-name]- [account-name])。

Snowflake認証ストラテジー

Snowflakeへの認証のストラテジー。可能な値: SPCS でフローを実行している場合は SNOWFLAKE_SESSION_TOKEN、秘密キーを使用してアクセスを設定する場合は KEY_PAIR

Snowflake秘密キー

認証に使用される RSA 秘密キー。RSA キーは、 PKCS8 標準に従って書式設定され、標準的な PEM ヘッダーとフッターを持つ必要があります。Snowflake秘密キーファイルまたはSnowflake秘密キーのいずれかを定義する必要があることに注意してください。

Snowflake秘密キーファイル

Snowflakeへの認証に使用される RSA 秘密キーを含むファイル。 PKCS8 標準に従って書式設定され、標準的な PEM ヘッダーとフッターを持ちます。ヘッダー行は -----BEGIN PRIVATE で始まります。 Reference asset チェックボックスを選択し、秘密キーファイルをアップロードします。

Snowflake秘密キーパスワード

Snowflake秘密キーファイルに関連付けられたパスワード

Snowflakeロール

クエリ実行時に使用されるSnowflakeロール

Snowflakeのユーザー名

Snowflakeインスタンスへの接続に使用するユーザー名

Snowflakeウェアハウス

クエリの実行に使用されるSnowflakeウェアハウス

PostgreSQL 取り込みパラメーターコンテキスト

パラメーター

説明

含まれるテーブル名

スキーマを含むテーブルパスのコンマ区切りリスト。例: public.my_table, other_schema.other_table

含まれるテーブル正規表現

テーブルパスに一致させる正規表現。式に一致するすべてのパスが複製され、後から作成されたパターンに一致する新しいテーブルも自動的に含まれます。例: public\.auto_.*

フィルター JSON

完全修飾されたテーブル名のリストと、複製に含めるべき列名の正規表現パターンを含む JSON。例: [ {"schema":"public", "table":"table1", "includedPattern":".*name"} ]public スキーマから table1name で終わるすべての列を含めます。

タスクスケジュール CRON をマージする

ジャーナルから宛先テーブルへのマージ操作がトリガーされる期間を定義する CRON 式。連続的なマージやタイムスケジュールでウェアハウスの実行時間を制限したい場合は、 * * * * * ? に設定します。

例:

  • * 0 * * * ? という文字列は、丸1時間で1分間のマージをスケジュールしたいことを示しています。

  • * 20 14 ? * MON-FRI という文字列は、毎週月曜日から金曜日の2:20 PM にマージをスケジュールしたいことを示しています。

その他の情報と例については、 Quartzドキュメント のcronトリガーチュートリアルをご参照ください。

テーブルを削除し、複製に再追加する

複製からテーブルを削除するには、複製パラメーターコンテキストの Included Table Names または Included Table Regex パラメーターからテーブルが削除されていることを確認します。

後でテーブルを複製に再追加する場合は、まずSnowflakeで対応する宛先テーブルを削除します。その後、 Included Table Names または Included Table Regex パラメーターにテーブルを追加して戻します。これにより、テーブルの複製プロセスが新しく開始されます。

このアプローチは、失敗したテーブル複製シナリオからの復旧にも使用できます。

テーブルの列のサブセットを複製します。

コネクタは、テーブルごとに複製されるデータを構成列のサブセットにフィルターできます。

列にフィルターを適用するには、複製パラメーターコンテキストのColumn Filterプロパティを変更し、フィルターを適用したいテーブルごとに1エントリずつ、構成の配列を追加します。

列は、名前またはパターン別に包含したり、除外したりすることができます。テーブルごとに単一の条件を適用することも、複数の条件を組み合わせて適用することもできます。除外は常に包含より優先されます。

以下の例は、利用可能なフィールドを示したものです。 schema および table は必須で、 includedexcludedincludedPatternexcludedPattern のうちの1つ以上が必要です。

[
    {
        "schema": "<source table schema>",
        "table" : "<source table name>",
        "included": ["<column name>", "<column name>"],
        "excluded": ["<column name>", "<column name>"],
        "includedPattern": "<regular expression>",
        "excludedPattern": "<regular expression>",
    }
]
Copy

テーブルでデータ変更を追跡する

コネクタはソーステーブルのデータの現在の状態だけでなく、すべての変更セットのすべての行の状態も複製します。このデータは、宛先テーブルと同じスキーマで作成されたジャーナルテーブルに格納されます。

ジャーナルテーブル名の形式は次のとおりです: <ソーステーブル名>_JOURNAL_<タイムスタンプ>_<スキーマ生成>

ここで、 <タイムスタンプ> はソーステーブルが複製に追加されたときのエポック秒の値で、 <スキーマ生成> はソーステーブルのスキーマが変更されるたびに増加する整数です。つまり、スキーマが変更されたソーステーブルは、複数のジャーナルテーブルを持つことになります。

テーブルが複製から削除され、その後再び追加されると、 <タイムスタンプ> の値が変更され、 <スキーマ生成>1 から再び開始されます。

重要

Snowflakeでは、ジャーナルテーブルやその中のデータを一切変更しないことを推奨しています。これらは、複製プロセスの一環として、コネクタが宛先テーブルを更新するために使用されます。

コネクタはジャーナルテーブルをドロップすることはありませんが、複製されたすべてのソーステーブルに対して最新のジャーナルのみをアクティブに使用します。ストレージを再利用したい場合は、複製から削除されたソーステーブルに関連するジャーナルテーブルと、アクティブに複製されたテーブルの最新の世代を除くすべてのジャーナルテーブルを安全にドロップできます。

例えば、コネクタがソーステーブル orders をアクティブに複製するように設定されており、以前にテーブル customers を複製から削除した場合、以下のようなジャーナルテーブルが存在する可能性があります。この場合、 orders_5678_2除いて、それらのすべてをドロップできます。

customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2

マージタスクのスケジュールを構成する

コネクタはウェアハウスを使用して、変更データキャプチャ(CDC)データを宛先テーブルにマージします。この操作は、 MergeSnowflakeJournalTable プロセッサーによってトリガーされます。新しい変更がない場合、または MergeSnowflakeJournalTable キューで待機する新しいフローファイルがない場合、マージはトリガーされず、ウェアハウスは自動サスペンドします。

ウェアハウスのコストを制限し、スケジュールされた時間のみにマージを制限するには、Merge task Schedule CRON パラメーターで CRON 式を使用します。MergeSnowflakeJournalTable プロセッサーに送られてくるフローファイルをスロットルし、マージは専用の期間のみにトリガーされます。スケジュールに関する詳細は、 スケジュールストラテジー をご参照ください。

コネクタを停止または削除する

コネクタを停止または削除する場合は、コネクタが使用する 複製スロット を考慮する必要があります。

コネクタは、 snowflake_connector_ で始まる名前にランダムなサフィックスが続く独自の複製スロットを作成します。コネクタは複製ストリームを読み取るとスロットを進め、 PostgreSQL は WAL ログをトリミングしてディスク領域を解放します。

コネクタが一時停止している場合、スロットは進行せず、ソースデータベースへの変更によって WAL ログサイズが増加し続けます。特にトラフィックの多いデータベースでは、コネクタを長期間にわたり一時停止したままにしないでください。

コネクタをOpenflowキャンバスから削除したり、Openflowインスタンス全体を削除するなどの方法で削除しても、複製スロットはそのまま残るため、手動でドロップする必要があります。

同じ PostgreSQL データベースから複製する複数のコネクタインスタンスがある場合、各インスタンスは固有の名前の複製スロットを作成します。複製スロットを手動でドロップする場合は、それが正しいスロットであることを確認してください。 CaptureChangePostgreSQL プロセッサーの状態を確認することで、指定されたコネクタインスタンスでどの複製スロットが使用されているかを確認できます。

フローを実行する

  1. プレーンを右クリックし、 Enable all Controller Services を選択します。

  2. インポートしたプロセスグループを右クリックし、 Start を選択します。コネクタがデータの取り込みを開始します。