Openflow Connector for PostgreSQL の設定¶
注釈
このコネクタは、 Snowflakeコネクタ規約 に従うものとします。
このトピックでは、 Openflow Connector for PostgreSQL を設定する手順について説明します。
注釈
このコネクタは、スナップショットのロードフェーズをバイパスして、新しく追加されたテーブルの増分変更の複製をすぐに開始するように構成できます。このオプションは、以前に複製されたデータが存在し、テーブルを再スナップショットせずに複製を続行するアカウントにコネクタを再インストールする場合に、よく役立ちます。
増分ロードプロセスの詳細については、 増分複製 をご参照ください。
前提条件¶
Openflow Connector for PostgreSQL について を確認してください。
サポートされている PostgreSQL バージョン を確認してください。
推奨:ランタイムごとにコネクタインスタンスを1つだけ追加するようにします。
:doc:`/user-guide/data-integration/openflow/setup-openflow-byoc`または:doc:`Openflow のセットアップ - Snowflake デプロイメント</user-guide/data-integration/openflow/setup-openflow-spcs>`があることを確認してください。
|OFSFSPCS-plural|を使用する場合、:doc:`必要なドメインの構成</user-guide/data-integration/openflow/setup-openflow-spcs-sf-allow-list>`を精査し、:ref:`label-openflow_domains_used_by_openflow_connectors_postgresql`コネクタに必要なドメインへのアクセス許可を付与していることを確認してください。
データベース管理者として、以下のタスクを実行します。
WAL 用の PostgreSQL サーバーに十分なディスク容量があることを確認します。これは、複製スロットが作成されると、コネクタがその位置を確認して進むまで、 PostgreSQL が複製スロットが保持する位置の WAL データを保持するためです。
複製が有効なすべてのテーブルに主キーがあることを確認します。キーは単一列でも複合列でもかまいません。
テーブルの REPLICA IDENTITY を
DEFAULTに設定します。これにより、主キーが WAL で表現され、コネクタがそれを読み取れるようになります。コネクタのユーザーを作成します。コネクタには、
REPLICATION属性と、複製されたすべてのテーブルから SELECT にアクセスする権限を持つユーザーが必要です。コネクタの構成に入るためのパスワードを持つユーザーを作成します。複製セキュリティの詳細については、 セキュリティ をご参照ください。
Snowflakeアカウント管理者として、以下のタスクを実行します。
タイプを SERVICE としてSnowflakeユーザーを作成します。複製データを格納するデータベースを作成し、Snowflakeユーザーに USAGE および CREATE SCHEMA 権限 を付与して、そのデータベースにオブジェクトを作成する権限を設定します。
CREATE DATABASE <destination_database>; CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow'; CREATE ROLE <openflow_role>; GRANT ROLE <openflow_role> TO USER <openflow_user>; GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>; GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>; CREATE WAREHOUSE <openflow_warehouse> WITH WAREHOUSE_SIZE = 'XSMALL' AUTO_SUSPEND = 300 AUTO_RESUME = TRUE; GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
安全なキーのペア(パブリックおよび秘密)を作成します。コネクタの構成時に使用するために、ユーザーの秘密キーを ファイルに保存します。Snowflakeユーザーに公開キーを割り当てます:
ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
詳細については、:doc:` キーペア認証 </user-guide/key-pair-auth>` をご参照ください。
コネクタが使用するウェアハウスを指定します。で開始する
XSMALLウェアハウスのサイズから、レプリケートされるテーブルの量とデータの転送量に応じてサイズを実験します。多数のテーブルを扱う場合、通常はウェアハウスサイズではなく、 マルチクラスターウェアハウス を使用した方がスケールしやすくなります。
wal_levelの構成¶
Openflow Connector for PostgreSQL では wal_level を logical に設定する必要があります。
PostgreSQL サーバーがホストされている場所に応じて、wal_levelを以下のように構成できます:
オンプレミス |
スーパーユーザーまたは
|
RDS |
エージェントが使用するユーザには、 次も設定する必要があります。
|
AWS Aurora |
|
GCP |
次のフラグを設定します。
|
Azure |
レプリケーションサポートを |
パブリケーションの作成¶
Openflow Connector for PostgreSQL では、複製を開始する前に、 パブリケーション を PostgreSQL で作成し、構成する必要があります。すべてのテーブル、またはテーブルのサブセットに対して作成することも、特定の列のみを持つ特定のテーブルに対して作成することもできます。複製を予定しているすべてのテーブルと列がパブリケーションに含まれていることを確認してください。コネクタの実行中であれば、後でパブリケーションを変更することもできます。パブリケーションを作成および構成するには、以下の手順を実行します。
データベースに対する権限、を持つユーザーとしてログインし、次のクエリを実行します:
PostgreSQL 13以降の場合
CREATE PUBLICATION <publication name> WITH (publish_via_partition_root = true);
パーティション分割されたテーブルを正しく複製するためには、
publish_via_partition_rootを追加する必要があります。パーティション分割されたテーブルの取り込みについては、 パーティショニングされたテーブルを複製する をご参照ください。PostgreSQL 13より前のバージョンの場合
CREATE PUBLICATION <publication name>;
次を使用して、データベースエージェントが閲覧できるテーブルを定義します。
ALTER PUBLICATION <publication name> ADD TABLE <table name>;パーティション分割されたテーブルの場合、ルートパーティションテーブルをパブリケーションに追加するだけで十分です。詳細については、 パーティショニングされたテーブルを複製する をご参照ください。
重要
PostgreSQL 15以降 では、指定されたテーブル列のサブセットに対するパブリケーションの構成をサポートしています。コネクタがこれを正しくサポートするには、 列フィルタリング設定 を使用して、パブリケーションで設定された列と同じ列を含める必要があります。
この設定がないと、コネクタは以下のように動作します:
宛先データベースでは、フィルターに含まれていない列に
__DELETEDというサフィックスが付きます。スナップショットフェーズ中にレプリケートされたすべてのデータは保持されます。パブリケーションに新しい列を追加すると、テーブルは永久に失敗し、複製を再起動する必要があります。
詳細については、 ALTER PUBLICATION をご参照ください。
コネクタをインストールする¶
コネクタをインストールするには、データエンジニアとして次を実行します。
Openflow概要ページに移動します。Featured connectors セクションで、 View more connectors を選択します。
Openflowのコネクタページでコネクタを探し、 Add to runtime を選択します。
Select runtime ダイアログで、Available runtimes ドロップダウンリストからランタイムを選択して Add をクリックします。
注釈
コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベースとスキーマをSnowflakeで作成したことを確認します。
Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。
Snowflakeアカウント認証情報でランタイムを認証します。
コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。
コネクタを構成する¶
コネクタを構成するには、データエンジニアとして次の手順を実行します。
インポートしたプロセスグループを右クリックし、 Parameters を選択します。
フローパラメーター の説明に従って、必要なパラメーター値を入力します。
フローパラメーター¶
まず、 PostgreSQL ソースパラメーターコンテキストのパラメーターを設定し、次に PostgreSQL 宛先パラメーターコンテキストのパラメーターを設定します。これが完了したら、コネクタを有効にすることができ、 PostgreSQL とSnowflakeの両方に接続して、実行が開始されることになります。しかし、明示的にテーブルが構成に追加されるまでは、いかなるデータも複製されません。
複製する特定のテーブルを構成するには、 PostgreSQL 取り込みパラメーターコンテキストを編集します。複製パラメーターコンテキストに変更を適用した直後に、コネクタが構成を取得し、各テーブルの複製ライフサイクルが開始します。
PostgreSQL ソースパラメーターコンテキスト¶
パラメーター |
説明 |
|---|---|
PostgreSQL 接続 URL |
ソースデータベースへの完全な JDBC URL。例: PostgreSQL レプリカサーバーに接続している場合、PostgreSQL レプリカサーバーからテーブルを複製する をご参照ください。 |
PostgreSQL JDBC ドライバー |
PostgreSQL JDBC ドライバーのjar へのパス。ウェブサイトからjarをダウンロードし、 Reference asset チェックボックスを選択してアップロードして添付します。 |
PostgreSQL ユーザー名 |
コネクタのユーザー名。 |
PostgreSQL パスワード |
コネクタのパスワード。 |
パブリケーション名 |
以前に作成したパブリケーションの名前。 |
複製スロット名 |
オプション。値が指定されていない場合、コネクタは新しい一意の名前のスロットを作成します。値が指定されている場合、コネクタは既存のスロットを使用するか、指定された名前で新しいスロットを作成します。 実行中のコネクタの値を変更すると、更新されたスロットの位置から増分変更データキャプチャ(CDC)ストリームの読み取りが再開されます。 |
PostgreSQL 宛先パラメーターコンテキスト¶
パラメーター |
説明 |
必須 |
|---|---|---|
宛先データベース |
データが永続化されるデータベース。Snowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。 |
有り |
Snowflake認証ストラテジー |
以下を使用する場合:
|
有り |
Snowflakeアカウント識別子 |
以下を使用する場合:
|
有り |
Snowflake接続の戦略 |
KEY_PAIRを使用する場合は、Snowflakeに接続するための戦略を特定します。
|
KEY_PAIRを使用したBYOCのみに必要です。それ以外の場合は無視されます。 |
Snowflake秘密キー |
以下を使用する場合:
|
無し |
Snowflake秘密キーファイル |
以下を使用する場合:
|
無し |
Snowflake秘密キーパスワード |
以下を使用する場合
|
無し |
Snowflakeロール |
以下を使用する場合 |
有り |
Snowflakeのユーザー名 |
以下を使用する場合
|
有り |
オーバーサイズ値戦略 |
複製中にコネクタが内部サイズ制限(16MB)を超える値を処理する方法を決定します。可能な値は次のとおりです。
|
無し |
Snowflakeウェアハウス |
クエリの実行に使用されるSnowflakeウェアハウス。 |
有り |
PostgreSQL 取り込みパラメーターコンテキスト¶
パラメーター |
説明 |
|---|---|
含まれるテーブル名 |
スキーマを含む、テーブルパスのコンマ区切りリスト。例: 名前または正規表現のいずれかでテーブルを選択します。両方を使用すると、いずれかのオプションで一致するすべてのテーブルが含まれます。 サブパーティションであるテーブルは常に取り込みから除外されます。詳細については、 パーティショニングされたテーブルを複製する をご参照ください。 |
含まれるテーブル正規表現 |
テーブルパスに一致させる正規表現。式に一致するすべてのパスが複製され、後から作成されたパターンに一致する新しいテーブルも自動的に含まれます。例: 名前または正規表現のいずれかでテーブルを選択します。両方を使用すると、いずれかのオプションで一致するすべてのテーブルが含まれます。 サブパーティションであるテーブルは常に取り込みから除外されます。詳細については、 パーティショニングされたテーブルを複製する をご参照ください。 |
列フィルター JSON |
オプション。完全修飾テーブル名のリストと、複製に含める必要がある列名の正規表現パターンが含まれる JSON。例: |
タスクスケジュール CRON をマージする |
ジャーナルから宛先テーブルへのマージ操作がトリガーされる期間を定義する CRON 式。連続的なマージやタイムスケジュールでウェアハウスの実行時間を制限したい場合は、 例:
その他の情報と例については、 Quartzドキュメント のcronトリガーチュートリアルをご参照ください。 |
オブジェクト識別子の解決 |
スキーマ名、テーブル名、列名などのソースオブジェクト識別子がSnowflakeにどのように保存され、クエリされるかを指定します。この設定は、 SQL クエリで二重引用符を使用する必要があることを指定します。 :emph:`オプション1:(デフォルト)大文字と小文字を区別する。 `下位互換性のため。
注釈 Snowflakeでは、レガシーまたは互換性の理由からソースの大文字小文字の区別を保持する必要がある場合、このオプションを使用することを推奨しています。たとえば、 :emph:`オプション 2:(推奨)大文字と小文字を区別しない `
注釈 Snowflakeでは、データベースオブジェクトに大文字と小文字が混在する名前が想定されない場合、このオプションを使用することを推奨しています。 重要 コネクタがデータのインジェスチョンを開始した後は、この設定を変更しないでください。取り込み開始後にこの設定を変更すると、既存の取り込みは中断されます。この設定を変更する必要がある場合は、新しいコネクタインスタンスを作成してください。 |
PostgreSQL レプリカサーバーからテーブルを複製する¶
コネクタは、論理的複製 を使用してプライマリサーバー、ホットスタンバイレプリカ、またはサブスクライバーサーバーからデータをインジェストできます。PostgreSQL レプリカに接続するコネクタを構成する前に、プライマリノードとレプリカノード間の複製が正しく機能することを確認します。コネクタでデータが欠落している問題を調査する場合は、まずコネクタが使用するレプリカサーバーに欠落している行が存在することを確認します。
スタンバイレプリカに接続する際のその他の考慮事項:
ホットスタンバイレプリカへの接続のみがサポートされています。ウォームスタンバイレプリカは、プライマリインスタンスにプロモートされるまでクライアントからの接続を受け入れることができないことに注意してください。
サーバーの PostgreSQL バージョンは>= 16である必要があります。
コネクタが必要とする:ref:
パブリケーション <label-postgres_connector_create_a_publication>は、スタンバイサーバーではなく、プライマリサーバーで作成する必要があります。スタンバイサーバーは読み取り専用で、パブリケーションの作成は許可されません。
ホットスタンバイインスタンスに接続して :ui:` 複製スロットを作成しようとすると、'<replication slot>' がタイムアウトします。スタンバイインスタンスに接続する場合は、プライマリ PostgreSQL インスタンスにトラフィックがあることを確認します。そうでなければ、複製スロットを作成するための呼び出しは`Openflow通知でエラーを返さないか、 Read PostgreSQL CDC Stream プロセッサーが起動しません。プライマリ PostgreSQL インスタンスにログインし、次のクエリを実行します。
SELECT pg_log_standby_snapshot();
エラーは、プライマリサーバーにデータ変更がない場合に発生します。そのため、レプリカサーバーで複製スロットを作成しているときに、コネクタが停止する可能性があります。これは、レプリカサーバーが複製スロットを作成できるように、プライマリサーバーから実行中のトランザクションに関する情報を要求するためです。プライマリサーバーはアイドル状態の間、情報を送信しません。pg_log_standby_snapshot() 関数は、プライマリサーバーが実行中のトランザクションに関する情報をレプリカサーバーに送信するように強制します。
テーブル複製の再開¶
(主キーが欠落している、またはスキーマの変更がサポートされていないなどの理由で) FAILED 状態のテーブルは、自動的に再開されません。テーブルが FAILED 状態になった場合、または複製を最初から再開する必要がある場合は、次の手順に従ってテーブルを削除し、複製に再度追加します。
注釈
主キーが欠落しているなど、ソーステーブルの問題が原因で障害が発生した場合は、続行する前にソースデータベースでその問題を解決します。
フローパラメーターからテーブルを削除します。取り込みパラメーターのコンテキストで、Included Table Names からテーブルを削除するか、Included Table Regex を変更してテーブルが一致しないようにします。
テーブルが削除されたことを確認します。
Openflowランタイムキャンバスで、プロセッサーグループを右クリックし、Controller Services を選択します。
コントローラーサービスをリストしたテーブルで、Table State Store 行を見つけ、行の右側にある縦3つのドットをクリックして、View State を選択します。
重要
続行する前に、テーブルの状態がこのリストから完全に削除されるまで待つ必要があります。この構成変更が完了するまで続行しないでください。
宛先をクリーンアップする:テーブルの状態が完全に削除されたと表示されたら、Snowflake で宛先テーブルを手動で DROP します。スナップショットフェーズ中に、コネクタは既存の宛先テーブルを上書きしないことに注意してください。テーブルがまだ存在する場合、複製は再度失敗します。オプションで、ジャーナルテーブルとストリームが不要になった場合は削除することもできます。
テーブルを再度追加する: Included Table Names または Included Table Regex パラメーターを更新して、テーブルを再度含めます。
再開を確認する:前述の指示に従って Table State Store をチェックします。テーブルの状態は、ステータス NEW で表示され、次に SNAPSHOT_REPLICATION に移行し、最後に INCREMENTAL_REPLICATION になります。
テーブルの列のサブセットを複製します。¶
コネクタは、テーブルごとに複製されるデータを構成列のサブセットにフィルターできます。
列にフィルターを適用するには、複製パラメーターコンテキストのColumn Filterプロパティを変更し、フィルターを適用したいテーブルごとに1エントリずつ、構成の配列を追加します。
列は、名前またはパターン別に包含したり、除外したりすることができます。テーブルごとに単一の条件を適用することも、複数の条件を組み合わせて適用することもできます。除外は常に包含より優先されます。
以下の例は、利用可能なフィールドを示したものです。schema および table は必須で、 included、 excluded、 includedPattern、 excludedPattern のうちの1つ以上が必要です。
[
{
"schema": "<source table schema>",
"table" : "<source table name>",
"included": ["<column name>", "<column name>"],
"excluded": ["<column name>", "<column name>"],
"includedPattern": "<regular expression>",
"excludedPattern": "<regular expression>",
}
]
パーティショニングされたテーブルを複製する¶
このコネクタは、バージョン15以降の PostgreSQL サーバーでパーティション化されたテーブルの複製をサポートしています。パーティショニングされた PostgreSQL テーブルは、単一の宛先テーブルとしてSnowflakeに複製されます。
たとえば、パーティショニングされたテーブル orders およびサブパーティションの orders_2023 と orders_2024 があって、コネクタが orders.* パターンに一致するすべてのテーブルを取り込むように構成されている場合、Snowflakeには orders テーブルのみが複製され、そこにすべてのサブパーティションのデータが含まれることになります。
パーティショニングされたテーブルの複製をサポートするには、確実に PostgreSQL で作成された パブリケーション の publish_via_partition_root オプションが true に設定されているようにしてください。
パーティショニングされたテーブルの取り込みには、現在のところ以下の制限があります。
取り込み開始後に、パーティショニングされたテーブルにパーティションとしてテーブルがアタッチされると、コネクタはアタッチ前にパーティションテーブルに存在していたデータを取得しません。
取り込み開始後に、サブパーティションテーブルがパーティションテーブルから切り離されると、コネクタはルートパーティションテーブルでこのサブパーティションのデータを削除済みとしてマークしません。
サブパーティションに対する切り捨て操作は、影響を受けるレコードを削除済みとしてマークしません。
テーブルでデータ変更を追跡する¶
コネクタはソーステーブルのデータの現在の状態だけでなく、すべての変更セットのすべての行の状態も複製します。このデータは、宛先テーブルと同じスキーマで作成されたジャーナルテーブルに格納されます。
ジャーナルテーブル名の形式は``<source_table_name>_JOURNAL_<timestamp>_<schema_generation>``です。ここで、``<timestamp>``はソーステーブルが複製に追加されたときのエポック秒の値であり、``<schema_generation>``はソーステーブルのスキーマ変更ごとに増加する整数です。その結果、スキーマが変更されるソーステーブルには、複数のジャーナルテーブルがあります。
テーブルが複製から削除され、再度追加されると、``<timestamp>``値が変更され、``<schema_generation>``は``1``から再開します。
重要
Snowflakeは、ジャーナルテーブルの構造を一切変更しないことを推奨します。これらは、コネクタによって複製プロセスの一部として宛先テーブルを更新するために使用されます。
コネクタはジャーナルテーブルをドロップすることはありませんが、複製されたすべてのソーステーブルに最新のジャーナルを使用し、ジャーナル上の追加専用ストリームのみを読み取ります。ストレージを回収するには、以下を実行します。
すべてのジャーナルテーブルをいつでも切り捨てます。
複製から削除されたソーステーブルに関連するジャーナルテーブルをドロップします。
アクティブに複製されたテーブルの最新の生成ジャーナルテーブルを除いて、すべてをドロップします。
例えば、コネクタがソーステーブル orders をアクティブに複製するように設定されており、以前にテーブル customers を複製から削除した場合、以下のようなジャーナルテーブルが存在する可能性があります。この場合、 orders_5678_2 を 除いて、それらのすべてをドロップできます。
customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2
マージタスクのスケジュールを構成する¶
コネクタはウェアハウスを使用して、変更データキャプチャ(CDC)データを宛先テーブルにマージします。この操作は、 MergeSnowflakeJournalTable プロセッサーによってトリガーされます。新しい変更がない場合、または MergeSnowflakeJournalTable キューで待機する新しいフローファイルがない場合、マージはトリガーされず、ウェアハウスは自動サスペンドします。
ウェアハウスのコストを制限し、スケジュールされた時間のみにマージを制限するには、Merge task Schedule CRON パラメーターで CRON 式を使用します。MergeSnowflakeJournalTable プロセッサーに送られてくるフローファイルをスロットルし、マージは専用の期間のみにトリガーされます。スケジュールに関する詳細は、 スケジュールストラテジー をご参照ください。
コネクタを停止または削除する¶
コネクタを停止または削除する場合は、コネクタが使用する 複製スロット を考慮する必要があります。
コネクタは、 snowflake_connector_ で始まる名前にランダムなサフィックスが続く独自の複製スロットを作成します。コネクタは複製ストリームを読み取るとスロットを進め、 PostgreSQL は WAL ログをトリミングしてディスク領域を解放します。
コネクタが一時停止している場合、スロットは進行せず、ソースデータベースへの変更によって WAL ログサイズが増加し続けます。特にトラフィックの多いデータベースでは、コネクタを長期間にわたり一時停止したままにしないでください。
コネクタをOpenflowキャンバスから削除したり、Openflowインスタンス全体を削除するなどの方法で削除しても、複製スロットはそのまま残るため、手動でドロップする必要があります。
同じ PostgreSQL データベースから複製する複数のコネクタインスタンスがある場合、各インスタンスは固有の名前の複製スロットを作成します。複製スロットを手動でドロップする場合は、それが正しいスロットであることを確認してください。CaptureChangePostgreSQL プロセッサーの状態を確認することで、指定されたコネクタインスタンスでどの複製スロットが使用されているかを確認できます。
フローを実行する¶
プレーンを右クリックし、 Enable all Controller Services を選択します。
インポートしたプロセスグループを右クリックし、 Start を選択します。コネクタがデータの取り込みを開始します。