Openflow Connector for PostgreSQL の設定

注釈

This connector is subject to the Snowflake Connector Terms.

このトピックでは、 Openflow Connector for PostgreSQL を設定する手順について説明します。

前提条件

  1. Openflow Connector for PostgreSQL について を確認してください。

  2. サポートされている PostgreSQL バージョン を確認してください。

  3. 推奨:ランタイムごとにコネクタインスタンスを1つだけ追加するようにします。

  4. Ensure that you have Openflowの設定 - BYOC or Set up Openflow - Snowflake Deployments.

  5. |OFSFSPCS-plural|を使用する場合、:doc:`必要なドメインの構成</user-guide/data-integration/openflow/setup-openflow-spcs-sf-allow-list>`を精査し、:ref:`label-openflow_domains_used_by_openflow_connectors_postgresql`コネクタに必要なドメインへのアクセス許可を付与していることを確認してください。

  6. データベース管理者として、以下のタスクを実行します。

    1. wal_levelの構成

    2. パブリケーションの作成

    3. WAL 用の PostgreSQL サーバーに十分なディスク容量があることを確認します。これは、複製スロットが作成されると、コネクタがその位置を確認して進むまで、 PostgreSQL が複製スロットが保持する位置の WAL データを保持するためです。

    4. 複製が有効なすべてのテーブルに主キーがあることを確認します。キーは単一列でも複合列でもかまいません。

    5. テーブルの REPLICA IDENTITYDEFAULT に設定します。これにより、主キーが WAL で表現され、コネクタがそれを読み取れるようになります。

    6. コネクタのユーザーを作成します。コネクタには、 REPLICATION 属性と、複製されたすべてのテーブルから SELECT にアクセスする権限を持つユーザーが必要です。コネクタの構成に入るためのパスワードを持つユーザーを作成します。複製セキュリティの詳細については、 セキュリティ をご参照ください。

  7. Snowflakeアカウント管理者として、以下のタスクを実行します。

    1. タイプを SERVICE としてSnowflakeユーザーを作成します。複製データを格納するデータベースを作成し、Snowflakeユーザーに USAGE および CREATE SCHEMA 権限 を付与して、そのデータベースにオブジェクトを作成する権限を設定します。

      CREATE DATABASE <destination_database>;
      CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow';
      CREATE ROLE <openflow_role>;
      GRANT ROLE <openflow_role> TO USER <openflow_user>;
      GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>;
      GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
      CREATE WAREHOUSE <openflow_warehouse>
        WITH
          WAREHOUSE_SIZE = 'MEDIUM'
          AUTO_SUSPEND = 300
          AUTO_RESUME = TRUE;
      GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
      
      Copy
    2. 安全なキーのペア(パブリックおよび秘密)を作成します。コネクタの構成時に使用するために、ユーザーの秘密キーを ファイルに保存します。Snowflakeユーザーに公開キーを割り当てます:

      ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
      
      Copy

      詳細については、:doc:` キーペア認証 </user-guide/key-pair-auth>` をご参照ください。

    3. コネクタが使用するウェアハウスを指定します。で開始する MEDIUM ウェアハウスのサイズから、レプリケートされるテーブルの量とデータの転送量に応じてサイズを実験します。多数のテーブルを扱う場合、通常はウェアハウスサイズではなく、 マルチクラスターウェアハウス を使用した方がスケールしやすくなります。

wal_levelの構成

Openflow Connector for PostgreSQL では wal_levellogical に設定する必要があります。

PostgreSQL サーバーがホストされている場所に応じて、wal_levelを以下のように構成できます:

オンプレミス

スーパーユーザーまたは ALTER SYSTEM 権限を持つユーザーで、次のクエリを実行します。

ALTER SYSTEM SET wal_level = logical;
Copy

RDS

エージェントが使用するユーザには、 rds_superuser または rds_replication ロールが割り当てられている必要があります。

次も設定する必要があります。

  • rds.logical_replication 静的パラメーターを1に設定します。

  • データベースとレプリケーションの設定に応じて、 max_replication_slotsmax_connectionsmax_wal_senders パラメーターを設定します。

AWS Aurora

rds.logical_replication 情的パラメーターを1に設定します。

GCP

次のフラグを設定します。

  • cloudsql.logical_decoding=on

  • cloudsql.enable_pglogical=on

詳細については、 Google Cloudドキュメント をご参照ください。

Azure

レプリケーションサポートを Logical に設定します。詳細については、 Azureドキュメンテーション をご参照ください。

パブリケーションの作成

Openflow Connector for PostgreSQL では、複製を開始する前に、 パブリケーション を PostgreSQL で作成し、構成する必要があります。すべてのテーブル、またはテーブルのサブセットに対して作成することも、特定の列のみを持つ特定のテーブルに対して作成することもできます。複製を予定しているすべてのテーブルと列がパブリケーションに含まれていることを確認してください。コネクタの実行中であれば、後でパブリケーションを変更することもできます。パブリケーションを作成および構成するには、以下の手順を実行します。

  1. データベースに対する権限、を持つユーザーとしてログインし、次のクエリを実行します:

    • PostgreSQL 13以降の場合

      CREATE PUBLICATION <publication name> WITH (publish_via_partition_root = true);
      
      Copy

      パーティション分割されたテーブルを正しく複製するためには、 publish_via_partition_root を追加する必要があります。パーティション分割されたテーブルの取り込みについては、 パーティショニングされたテーブルを複製する をご参照ください。

    • PostgreSQL 13より前のバージョンの場合

      CREATE PUBLICATION <publication name>;
      
      Copy
  2. 次を使用して、データベースエージェントが閲覧できるテーブルを定義します。

ALTER PUBLICATION <publication name> ADD TABLE <table name>;
Copy

パーティション分割されたテーブルの場合、ルートパーティションテーブルをパブリケーションに追加するだけで十分です。詳細については、 パーティショニングされたテーブルを複製する をご参照ください。

重要

PostgreSQL 15以降 では、指定されたテーブル列のサブセットに対するパブリケーションの構成をサポートしています。コネクタがこれを正しくサポートするには、 列フィルタリング設定 を使用して、パブリケーションで設定された列と同じ列を含める必要があります。

この設定がないと、コネクタは以下のように動作します:

  • 宛先データベースでは、フィルターに含まれていない列に __DELETED というサフィックスが付きます。スナップショットフェーズ中にレプリケートされたすべてのデータは保持されます。

  • After you add new columns to the publication, the table will be permanently failed, and you will need to restart its replication.

詳細については、 ALTER PUBLICATION をご参照ください。

コネクタをインストールする

  1. Openflowの概要ページに移動します。Featured connectors セクションで、 View more connectors を選択します。

  2. Openflowのコネクタページでコネクタを探し、 Add to runtime を選択します。

  3. Select runtime ダイアログで、 Available runtimes ドロップダウンリストからランタイムを選択します。

  4. Add を選択します。

    注釈

    コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベースとスキーマをSnowflakeで作成したことを確認します。

  5. Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。

  6. Snowflakeアカウント認証情報でランタイムを認証します。

コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。

コネクタを構成する

以下のユースケースにコネクタを構成できます。

リアルタイムでテーブルのセットを複製する

  1. インポートしたプロセスグループを右クリックし、 Parameters を選択します。

  2. フローパラメーター の説明に従って、必要なパラメーター値を入力します。

フローパラメーター

Start with setting the parameters of the PostgreSQL Source Parameters context, then the PostgreSQL Destination Parameters context. Once this is done, you can enable the connector, and it should connect both to PostgreSQL and Snowflake and start running. However, it will not replicate any data until any tables are explicitly added to its configuration.

複製する特定のテーブルを構成するには、 PostgreSQL 取り込みパラメーターコンテキストを編集します。複製パラメーターコンテキストに変更を適用した直後に、コネクタが構成を取得し、各テーブルの複製ライフサイクルが開始します。

PostgreSQL ソースパラメーターコンテキスト

パラメーター

説明

PostgreSQL Connection URL

ソースデータベースへの完全な JDBC URL。例: jdbc:postgresql://example.com:5432/public

PostgreSQL レプリカサーバーに接続している場合、PostgreSQL レプリカサーバーからテーブルを複製する をご参照ください。

PostgreSQL JDBC Driver

PostgreSQL JDBC ドライバーのjar へのパス。ウェブサイトからjarをダウンロードし、 Reference asset チェックボックスを選択してアップロードして添付します。

PostgreSQL Username

コネクタのユーザー名。

PostgreSQL Password

コネクタのパスワード。

Publication Name

以前に作成したパブリケーションの名前。

複製スロット名

オプション。値が指定されていない場合、コネクタは新しい一意の名前のスロットを作成します。値が指定されている場合、コネクタは既存のスロットを使用するか、指定された名前で新しいスロットを作成します。

実行中のコネクタの値を変更すると、更新されたスロットの位置から増分変更データキャプチャ(CDC)ストリームの読み取りが再開されます。

PostgreSQL 宛先パラメーターコンテキスト

パラメーター

説明

必須

宛先データベース

データが永続化されるデータベース。Snowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。

有り

Snowflakeアカウント識別子

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR: データが永続化される[organization-name]-[account-name]形式のSnowflakeアカウント名。

有り

Snowflake認証ストラテジー

以下を使用する場合:

  • Snowflake Openflow Deployment または BYOC:SNOWFLAKE_SESSION_TOKEN を使用します。このトークンはSnowflakeによって自動的に管理されます。BYOC デプロイメントでは、SNOWFLAKE_SESSION_TOKENを使用するために、事前に:ref:ランタイムロール <label-deployment_byoc_setup_runtime_role> が構成されている必要があります。

  • BYOC: Alternatively BYOC can use KEY_PAIR as the value for authentication strategy.

有り

Snowflake秘密キー

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:認証に使用される RSA プライベートキーである必要があります。

    The RSA key must be formatted according to PKCS8 standards and have standard PEM headers and footers. Note that either a Snowflake Private Key File or a Snowflake Private Key must be defined.

無し

Snowflake秘密キーファイル

以下を使用する場合:

  • Session token authentication strategy:プライベートキーファイルは空白である必要があります。

  • KEY_PAIR:Snowflakeへの認証に使用される RSA プライベートキーを含むファイルをアップロードします。これは、PKCS8 標準に従ってフォーマットされ、標準の PEM ヘッダーとフッターを含んでいる必要があります。ヘッダー行は -----BEGIN PRIVATE で始まります。プライベートキーファイルをアップロードするには、Reference asset チェックボックスを選択します。

無し

Snowflake秘密キーパスワード

以下を使用する場合

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:Snowflakeプライベートキーファイルに関連付けられたパスワードを提供します。

無し

Snowflakeロール

以下を使用する場合

  • Session Token Authentication Strategy:ランタイムロールを使用します。ランタイムの View Details に移動すると、Openflow UI でランタイムロールを見つけることができます。

  • KEY_PAIR Authentication Strategy:サービスユーザーのために構成された有効なロールを使用します。

有り

Snowflakeのユーザー名

以下を使用する場合

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:Snowflakeインスタンスへの接続に使用するユーザー名を提供します。

有り

Snowflakeウェアハウス

クエリの実行に使用されるSnowflakeウェアハウス。

有り

PostgreSQL 取り込みパラメーターコンテキスト

パラメーター

説明

含まれるテーブル名

スキーマを含む、テーブルパスのコンマ区切りリスト。例: public.my_table, other_schema.other_table

名前または正規表現のいずれかでテーブルを選択します。両方を使用すると、いずれかのオプションで一致するすべてのテーブルが含まれます。

サブパーティションであるテーブルは常に取り込みから除外されます。詳細については、 パーティショニングされたテーブルを複製する をご参照ください。

含まれるテーブル正規表現

テーブルパスに一致させる正規表現。式に一致するすべてのパスが複製され、後から作成されたパターンに一致する新しいテーブルも自動的に含まれます。例: public\.auto_.*

名前または正規表現のいずれかでテーブルを選択します。両方を使用すると、いずれかのオプションで一致するすべてのテーブルが含まれます。

サブパーティションであるテーブルは常に取り込みから除外されます。詳細については、 パーティショニングされたテーブルを複製する をご参照ください。

列フィルター JSON

オプション。完全修飾テーブル名のリストと、複製に含める必要がある列名の正規表現パターンが含まれる JSON。例: [ {"schema":"public", "table":"table1", "includedPattern":".*name"} ] には、public スキーマの table1name で終わるすべての列が含まれます。

タスクスケジュール CRON をマージする

ジャーナルから宛先テーブルへのマージ操作がトリガーされる期間を定義する CRON 式。連続的なマージやタイムスケジュールでウェアハウスの実行時間を制限したい場合は、 * * * * * ? に設定します。

例:

  • * 0 * * * ? という文字列は、丸1時間で1分間のマージをスケジュールしたいことを示しています。

  • * 20 14 ? * MON-FRI という文字列は、毎週月曜日から金曜日の2:20 PM にマージをスケジュールしたいことを示しています。

その他の情報と例については、 Quartzドキュメント のcronトリガーチュートリアルをご参照ください。

オブジェクト識別子の解決

スキーマ名、テーブル名、列名などのソースオブジェクト識別子をSnowflakeに格納し、クエリする方法を指定します。この設定により、 SQL クエリで二重引用符を使用する必要があるかどうかを既定します。

:emph:`オプション1:(デフォルト)大文字と小文字を区別する。 `下位互換性のため。

  • 変換:大文字小文字は維持されます。たとえば、 My_TableMy_Table のままです。

  • クエリ: SQL クエリは、データベースオブジェクトの大文字と小文字を正確に一致させるために、二重引用符を使用する必要があります。例: SELECT * FROM "My_Table";

注釈

Snowflakeでは、レガシーまたは互換性の理由からソースの大文字小文字の区別を保持する必要がある場合、このオプションを使用することを推奨しています。たとえば、 MY_TABLEmy_table のように大文字と小文字だけが異なるテーブル名がソースデータベースに含まれている場合、大文字と小文字を区別しない比較を使用すると、名前の衝突が発生します。

:emph:`オプション 2:(推奨)大文字と小文字を区別しない `

  • 変換:識別子はすべて大文字に変換されます。たとえば、 My_TableMY_TABLE になります。

  • クエリ: SQL クエリは大文字と小文字を区別せず、 SQL 二重引用符は必要ありません。たとえば、 SELECT * FROM my_table;SELECT * FROM MY_TABLE; と同じ結果を返します。

注釈

Snowflakeでは、データベースオブジェクトに大文字と小文字が混在する名前が想定されない場合、このオプションを使用することを推奨しています。

重要

コネクタの取り込み開始後は、この設定を変更しないでください。取り込み開始後にこの設定を変更すると、既存の取り込みは中断されます。この設定を変更する必要がある場合は、新しいコネクタインスタンスを作成してください。

PostgreSQL レプリカサーバーからテーブルを複製する

コネクタは、論理的複製 を使用してプライマリサーバー、ホットスタンバイレプリカ、またはサブスクライバーサーバーからデータをインジェストできます。PostgreSQL レプリカに接続するコネクタを構成する前に、プライマリノードとレプリカノード間の複製が正しく機能することを確認します。コネクタでデータが欠落している問題を調査する場合は、まずコネクタが使用するレプリカサーバーに欠落している行が存在することを確認します。

スタンバイレプリカに接続する際のその他の考慮事項:

  • ホットスタンバイレプリカへの接続のみがサポートされています。ウォームスタンバイレプリカは、プライマリインスタンスにプロモートされるまでクライアントからの接続を受け入れることができないことに注意してください。

  • サーバーの PostgreSQL バージョンは>= 16である必要があります。

  • コネクタが必要とする:ref:パブリケーション <label-postgres_connector_create_a_publication> は、スタンバイサーバーではなく、プライマリサーバーで作成する必要があります。スタンバイサーバーは読み取り専用で、パブリケーションの作成は許可されません。

ホットスタンバイインスタンスに接続して :ui:` 複製スロットを作成しようとすると、'<replication slot>' がタイムアウトします。スタンバイインスタンスに接続する場合は、プライマリ PostgreSQL インスタンスにトラフィックがあることを確認します。そうでなければ、複製スロットを作成するための呼び出しは`Openflow通知でエラーを返さないか、 Read PostgreSQL CDC Stream プロセッサーが起動しません。プライマリ PostgreSQL インスタンスにログインし、次のクエリを実行します。

SELECT pg_log_standby_snapshot();
Copy

エラーは、プライマリサーバーにデータ変更がない場合に発生します。そのため、レプリカサーバーで複製スロットを作成しているときに、コネクタが停止する可能性があります。これは、レプリカサーバーが複製スロットを作成できるように、プライマリサーバーから実行中のトランザクションに関する情報を要求するためです。プライマリサーバーはアイドル状態の間、情報を送信しません。pg_log_standby_snapshot() 関数は、プライマリサーバーが実行中のトランザクションに関する情報をレプリカサーバーに送信するように強制します。

テーブルを削除し、複製に再追加する

複製からテーブルを削除するには、複製パラメーターコンテキストの Included Table Names または Included Table Regex パラメーターからテーブルが削除されていることを確認します。

後でテーブルを複製に再追加する場合は、まずSnowflakeで対応する宛先テーブルを削除します。その後、 Included Table Names または Included Table Regex パラメーターにテーブルを追加して戻します。これにより、テーブルの複製プロセスが新しく開始されます。

このアプローチは、失敗したテーブル複製シナリオからの復旧にも使用できます。

テーブルの列のサブセットを複製します。

コネクタは、テーブルごとに複製されるデータを構成列のサブセットにフィルターできます。

列にフィルターを適用するには、複製パラメーターコンテキストのColumn Filterプロパティを変更し、フィルターを適用したいテーブルごとに1エントリずつ、構成の配列を追加します。

列は、名前またはパターン別に包含したり、除外したりすることができます。テーブルごとに単一の条件を適用することも、複数の条件を組み合わせて適用することもできます。除外は常に包含より優先されます。

以下の例は、利用可能なフィールドを示したものです。schema および table は必須で、 includedexcludedincludedPatternexcludedPattern のうちの1つ以上が必要です。

[
    {
        "schema": "<source table schema>",
        "table" : "<source table name>",
        "included": ["<column name>", "<column name>"],
        "excluded": ["<column name>", "<column name>"],
        "includedPattern": "<regular expression>",
        "excludedPattern": "<regular expression>",
    }
]
Copy

パーティショニングされたテーブルを複製する

このコネクタは、バージョン15以降の PostgreSQL サーバーでパーティション化されたテーブルの複製をサポートしています。パーティショニングされた PostgreSQL テーブルは、単一の宛先テーブルとしてSnowflakeに複製されます。

たとえば、パーティショニングされたテーブル orders およびサブパーティションの orders_2023orders_2024 があって、コネクタが orders.* パターンに一致するすべてのテーブルを取り込むように構成されている場合、Snowflakeには orders テーブルのみが複製され、そこにすべてのサブパーティションのデータが含まれることになります。

パーティショニングされたテーブルの複製をサポートするには、確実に PostgreSQL で作成された パブリケーションpublish_via_partition_root オプションが true に設定されているようにしてください。

パーティショニングされたテーブルの取り込みには、現在のところ以下の制限があります。

  • 取り込み開始後に、パーティショニングされたテーブルにパーティションとしてテーブルがアタッチされると、コネクタはアタッチ前にパーティションテーブルに存在していたデータを取得しません。

  • 取り込み開始後に、サブパーティションテーブルがパーティションテーブルから切り離されると、コネクタはルートパーティションテーブルでこのサブパーティションのデータを削除済みとしてマークしません。

  • サブパーティションに対する切り捨て操作は、影響を受けるレコードを削除済みとしてマークしません。

テーブルでデータ変更を追跡する

コネクタはソーステーブルのデータの現在の状態だけでなく、すべての変更セットのすべての行の状態も複製します。このデータは、宛先テーブルと同じスキーマで作成されたジャーナルテーブルに格納されます。

ジャーナルテーブル名の形式は次のとおりです。<source table name>_JOURNAL_<timestamp><schema generation> where <timestamp> is the value of epoch seconds when the source table was added to replication, and <schema generation> は整数で、ソーステーブルのスキーマが変更されるごとに増加します。その結果、スキーマが変更されるソーステーブルには、複数のジャーナルテーブルがあります。

テーブルが複製から削除され、その後再び追加されると、 <タイムスタンプ> の値が変更され、 <スキーマ生成>1 から再び開始されます。

重要

Snowflakeは、ジャーナルテーブルの構造を一切変更しないことを推奨します。これらは、コネクタによって複製プロセスの一部として宛先テーブルを更新するために使用されます。

コネクタはジャーナルテーブルをドロップすることはありませんが、複製されたすべてのソーステーブルに最新のジャーナルを使用し、ジャーナル上の追加専用ストリームのみを読み取ります。ストレージを回収するには、以下を実行します。

  • すべてのジャーナルテーブルをいつでも切り捨てます。

  • 複製から削除されたソーステーブルに関連するジャーナルテーブルをドロップします。

  • アクティブに複製されたテーブルの最新の生成ジャーナルテーブルを除いて、すべてをドロップします。

例えば、コネクタがソーステーブル orders をアクティブに複製するように設定されており、以前にテーブル customers を複製から削除した場合、以下のようなジャーナルテーブルが存在する可能性があります。この場合、 orders_5678_2除いて、それらのすべてをドロップできます。

customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2

マージタスクのスケジュールを構成する

コネクタはウェアハウスを使用して、変更データキャプチャ(CDC)データを宛先テーブルにマージします。この操作は、 MergeSnowflakeJournalTable プロセッサーによってトリガーされます。新しい変更がない場合、または MergeSnowflakeJournalTable キューで待機する新しいフローファイルがない場合、マージはトリガーされず、ウェアハウスは自動サスペンドします。

ウェアハウスのコストを制限し、スケジュールされた時間のみにマージを制限するには、Merge task Schedule CRON パラメーターで CRON 式を使用します。MergeSnowflakeJournalTable プロセッサーに送られてくるフローファイルをスロットルし、マージは専用の期間のみにトリガーされます。スケジュールに関する詳細は、 スケジュールストラテジー をご参照ください。

コネクタを停止または削除する

コネクタを停止または削除する場合は、コネクタが使用する 複製スロット を考慮する必要があります。

コネクタは、 snowflake_connector_ で始まる名前にランダムなサフィックスが続く独自の複製スロットを作成します。コネクタは複製ストリームを読み取るとスロットを進め、 PostgreSQL は WAL ログをトリミングしてディスク領域を解放します。

コネクタが一時停止している場合、スロットは進行せず、ソースデータベースへの変更によって WAL ログサイズが増加し続けます。特にトラフィックの多いデータベースでは、コネクタを長期間にわたり一時停止したままにしないでください。

コネクタをOpenflowキャンバスから削除したり、Openflowインスタンス全体を削除するなどの方法で削除しても、複製スロットはそのまま残るため、手動でドロップする必要があります。

同じ PostgreSQL データベースから複製する複数のコネクタインスタンスがある場合、各インスタンスは固有の名前の複製スロットを作成します。複製スロットを手動でドロップする場合は、それが正しいスロットであることを確認してください。CaptureChangePostgreSQL プロセッサーの状態を確認することで、指定されたコネクタインスタンスでどの複製スロットが使用されているかを確認できます。

フローを実行する

  1. プレーンを右クリックし、 Enable all Controller Services を選択します。

  2. インポートしたプロセスグループを右クリックし、 Start を選択します。コネクタがデータの取り込みを開始します。