の設定 Openflow Connector for PostgreSQL¶
注釈
このコネクタは、 Snowflakeコネクタ規約 に従うものとします。
このトピックでは、 Openflow Connector for PostgreSQL を設定する手順について説明します。
注釈
This connector can be configured to immediately start replicating incremental changes for newly added tables, bypassing the snapshot load phase. This option is often useful when reinstalling the connector in an account where previously replicated data exists and you want to continue replication without having to re-snapshot tables.
For details on the incremental load process, see Incremental replication.
前提条件¶
Openflow Connector for PostgreSQL について を確認してください。
サポートされている PostgreSQL バージョン を確認してください。
推奨:ランタイムごとにコネクタインスタンスを1つだけ追加するようにします。
Ensure that you have Openflowの設定 - BYOC or Set up Openflow - Snowflake Deployments.
If using Openflow - Snowflake Deployments, ensure that you've reviewed configuring required domains and have granted access to the required domains for the PostgreSQL connector.
データベース管理者として、以下のタスクを実行します。
WAL 用の PostgreSQL サーバーに十分なディスク容量があることを確認します。これは、複製スロットが作成されると、コネクタがその位置を確認して進むまで、 PostgreSQL が複製スロットが保持する位置の WAL データを保持するためです。
複製が有効なすべてのテーブルに主キーがあることを確認します。キーは単一列でも複合列でもかまいません。
テーブルの REPLICA IDENTITY を
DEFAULTに設定します。これにより、主キーが WAL で表現され、コネクタがそれを読み取れるようになります。コネクタのユーザーを作成します。コネクタには、
REPLICATION属性と、複製されたすべてのテーブルから SELECT にアクセスする権限を持つユーザーが必要です。コネクタの構成に入るためのパスワードを持つユーザーを作成します。複製セキュリティの詳細については、 セキュリティ をご参照ください。
Snowflakeアカウント管理者として、以下のタスクを実行します。
タイプを SERVICE としてSnowflakeユーザーを作成します。複製データを格納するデータベースを作成し、Snowflakeユーザーに USAGE および CREATE SCHEMA 権限 を付与して、そのデータベースにオブジェクトを作成する権限を設定します。
CREATE DATABASE <destination_database>; CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow'; CREATE ROLE <openflow_role>; GRANT ROLE <openflow_role> TO USER <openflow_user>; GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>; GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>; CREATE WAREHOUSE <openflow_warehouse> WITH WAREHOUSE_SIZE = 'MEDIUM' AUTO_SUSPEND = 300 AUTO_RESUME = TRUE; GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
安全なキーのペア(パブリックおよび秘密)を作成します。コネクタの構成時に使用するために、ユーザーの秘密キーを ファイルに保存します。Snowflakeユーザーに公開キーを割り当てます:
ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
詳細については、:doc:` キーペア認証 </user-guide/key-pair-auth>` をご参照ください。
コネクタが使用するウェアハウスを指定します。で開始する
MEDIUMウェアハウスのサイズから、レプリケートされるテーブルの量とデータの転送量に応じてサイズを実験します。多数のテーブルを扱う場合、通常はウェアハウスサイズではなく、 マルチクラスターウェアハウス を使用した方がスケールしやすくなります。
wal_levelの構成¶
Openflow Connector for PostgreSQL では wal_level を logical に設定する必要があります。
PostgreSQL サーバーがホストされている場所に応じて、wal_levelを以下のように構成できます:
オンプレミス |
スーパーユーザーまたは
|
RDS |
エージェントが使用するユーザには、 次も設定する必要があります。
|
AWS Aurora |
|
GCP |
次のフラグを設定します。
|
Azure |
レプリケーションサポートを |
パブリケーションの作成¶
Openflow Connector for PostgreSQL では、複製を開始する前に、 パブリケーション を PostgreSQL で作成し、構成する必要があります。すべてのテーブル、またはテーブルのサブセットに対して作成することも、特定の列のみを持つ特定のテーブルに対して作成することもできます。複製を予定しているすべてのテーブルと列がパブリケーションに含まれていることを確認してください。コネクタの実行中であれば、後でパブリケーションを変更することもできます。パブリケーションを作成および構成するには、以下の手順を実行します。
データベースに対する権限、を持つユーザーとしてログインし、次のクエリを実行します:
PostgreSQL 13以降の場合
CREATE PUBLICATION <publication name> WITH (publish_via_partition_root = true);
パーティション分割されたテーブルを正しく複製するためには、
publish_via_partition_rootを追加する必要があります。パーティション分割されたテーブルの取り込みについては、 パーティショニングされたテーブルを複製する をご参照ください。PostgreSQL 13より前のバージョンの場合
CREATE PUBLICATION <publication name>;
次を使用して、データベースエージェントが閲覧できるテーブルを定義します。
ALTER PUBLICATION <publication name> ADD TABLE <table name>;パーティション分割されたテーブルの場合、ルートパーティションテーブルをパブリケーションに追加するだけで十分です。詳細については、 パーティショニングされたテーブルを複製する をご参照ください。
重要
PostgreSQL 15以降 では、指定されたテーブル列のサブセットに対するパブリケーションの構成をサポートしています。コネクタがこれを正しくサポートするには、 列フィルタリング設定 を使用して、パブリケーションで設定された列と同じ列を含める必要があります。
この設定がないと、コネクタは以下のように動作します:
宛先データベースでは、フィルターに含まれていない列に
__DELETEDというサフィックスが付きます。スナップショットフェーズ中にレプリケートされたすべてのデータは保持されます。パブリケーションに新しい列を追加すると、テーブルは永久に失敗し、複製を再起動する必要があります。
詳細については、 ALTER PUBLICATION をご参照ください。
コネクタをインストールする¶
Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.
Openflowのコネクタページでコネクタを探し、 Add to runtime を選択します。
In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.
注釈
コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベースとスキーマをSnowflakeで作成したことを確認します。
Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。
Snowflakeアカウント認証情報でランタイムを認証します。
コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。
コネクタを構成する¶
以下のユースケースにコネクタを構成できます。
リアルタイムでテーブルのセットを複製する¶
インポートしたプロセスグループを右クリックし、 Parameters を選択します。
フローパラメーター の説明に従って、必要なパラメーター値を入力します。
フローパラメーター¶
まず、PostgreSQL ソースパラメーターコンテキストのパラメーターを設定し、次に PostgreSQL 宛先パラメーターコンテキストのパラメーターを設定します。これが完了したら、コネクタを有効にすることができ、PostgreSQL とSnowflakeの両方に接続して、実行が開始されることになります。しかし、明示的にテーブルが構成に追加されるまでは、いかなるデータも複製されません。
複製する特定のテーブルを構成するには、 PostgreSQL 取り込みパラメーターコンテキストを編集します。複製パラメーターコンテキストに変更を適用した直後に、コネクタが構成を取得し、各テーブルの複製ライフサイクルが開始します。
PostgreSQL ソースパラメーターコンテキスト¶
パラメーター |
説明 |
|---|---|
PostgreSQL 接続 URL |
ソースデータベースへの完全な JDBC URL。例: PostgreSQL レプリカサーバーに接続している場合、PostgreSQL レプリカサーバーからテーブルを複製する をご参照ください。 |
PostgreSQL JDBC ドライバー |
PostgreSQL JDBC ドライバーのjar へのパス。ウェブサイトからjarをダウンロードし、 Reference asset チェックボックスを選択してアップロードして添付します。 |
PostgreSQL ユーザー名 |
コネクタのユーザー名。 |
PostgreSQL パスワード |
コネクタのパスワード。 |
パブリケーション名 |
以前に作成したパブリケーションの名前。 |
Replication Slot Name |
Optional. When no value is provided, the connector will create a new, uniquely-named slot. When given a value, the connector will use the existing slot, or create a new one with the provided name. Changing the value for a running connector will restart reading the incremental change data capture (CDC) stream from the updated slot's position. |
PostgreSQL 宛先パラメーターコンテキスト¶
パラメーター |
説明 |
必須 |
|---|---|---|
宛先データベース |
データが永続化されるデータベース。Snowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。 |
有り |
Snowflake認証ストラテジー |
以下を使用する場合:
|
有り |
Snowflakeアカウント識別子 |
以下を使用する場合:
|
有り |
Snowflake秘密キー |
以下を使用する場合:
|
無し |
Snowflake秘密キーファイル |
以下を使用する場合:
|
無し |
Snowflake秘密キーパスワード |
以下を使用する場合
|
無し |
Snowflakeロール |
以下を使用する場合
|
有り |
Snowflakeのユーザー名 |
以下を使用する場合
|
有り |
Snowflakeウェアハウス |
クエリの実行に使用されるSnowflakeウェアハウス。 |
有り |
PostgreSQL 取り込みパラメーターコンテキスト¶
パラメーター |
説明 |
|---|---|
含まれるテーブル名 |
スキーマを含む、テーブルパスのコンマ区切りリスト。例: 名前または正規表現のいずれかでテーブルを選択します。両方を使用すると、いずれかのオプションで一致するすべてのテーブルが含まれます。 サブパーティションであるテーブルは常に取り込みから除外されます。詳細については、 パーティショニングされたテーブルを複製する をご参照ください。 |
含まれるテーブル正規表現 |
テーブルパスに一致させる正規表現。式に一致するすべてのパスが複製され、後から作成されたパターンに一致する新しいテーブルも自動的に含まれます。例: 名前または正規表現のいずれかでテーブルを選択します。両方を使用すると、いずれかのオプションで一致するすべてのテーブルが含まれます。 サブパーティションであるテーブルは常に取り込みから除外されます。詳細については、 パーティショニングされたテーブルを複製する をご参照ください。 |
列フィルター JSON |
オプション。完全修飾テーブル名のリストと、複製に含める必要がある列名の正規表現パターンが含まれる JSON。例: |
タスクスケジュール CRON をマージする |
ジャーナルから宛先テーブルへのマージ操作がトリガーされる期間を定義する CRON 式。連続的なマージやタイムスケジュールでウェアハウスの実行時間を制限したい場合は、 例:
その他の情報と例については、 Quartzドキュメント のcronトリガーチュートリアルをご参照ください。 |
オブジェクト識別子の解決 |
Specifies how source object identifiers such as the names of schemas, tables, and columns are stored and queried in Snowflake. This setting specifies that you must use double quotes in SQL queries. :emph:`オプション1:(デフォルト)大文字と小文字を区別する。`下位互換性のため。
注釈 Snowflake recommends using this option if you must preserve source casing for legacy or compatibility reasons.
For example, if the source database includes table names that differ in case only--such as オプション 2:(推奨)大文字と小文字を区別しない
注釈 Snowflakeでは、データベースオブジェクトに大文字と小文字が混在する名前が想定されない場合、このオプションを使用することを推奨しています。 重要 Do not change this setting after the connector has begun ingesting data. Changing this setting after ingestion has begun breaks the existing ingestion. If you must change this setting, create a new connector instance. |
PostgreSQL レプリカサーバーからテーブルを複製する¶
コネクタは、論理的複製 を使用してプライマリサーバー、ホットスタンバイレプリカ、またはサブスクライバーサーバーからデータをインジェストできます。PostgreSQL レプリカに接続するコネクタを構成する前に、プライマリノードとレプリカノード間の複製が正しく機能することを確認します。コネクタでデータが欠落している問題を調査する場合は、まずコネクタが使用するレプリカサーバーに欠落している行が存在することを確認します。
スタンバイレプリカに接続する際のその他の考慮事項:
ホットスタンバイレプリカへの接続のみがサポートされています。ウォームスタンバイレプリカは、プライマリインスタンスにプロモートされるまでクライアントからの接続を受け入れることができないことに注意してください。
サーバーの PostgreSQL バージョンは>= 16である必要があります。
コネクタが必要とする:ref:
パブリケーション <label-postgres_connector_create_a_publication>は、スタンバイサーバーではなく、プライマリサーバーで作成する必要があります。スタンバイサーバーは読み取り専用で、パブリケーションの作成は許可されません。
ホットスタンバイインスタンスに接続して :ui:` 複製スロットを作成しようとすると、'<replication slot>' がタイムアウトします。スタンバイインスタンスに接続する場合は、プライマリ PostgreSQL インスタンスにトラフィックがあることを確認します。そうでなければ、複製スロットを作成するための呼び出しは`Openflow通知でエラーを返さないか、 Read PostgreSQL CDC Stream プロセッサーが起動しません。プライマリ PostgreSQL インスタンスにログインし、次のクエリを実行します。
SELECT pg_log_standby_snapshot();
エラーは、プライマリサーバーにデータ変更がない場合に発生します。そのため、レプリカサーバーで複製スロットを作成しているときに、コネクタが停止する可能性があります。これは、レプリカサーバーが複製スロットを作成できるように、プライマリサーバーから実行中のトランザクションに関する情報を要求するためです。プライマリサーバーはアイドル状態の間、情報を送信しません。pg_log_standby_snapshot() 関数は、プライマリサーバーが実行中のトランザクションに関する情報をレプリカサーバーに送信するように強制します。
テーブルを削除し、複製に再追加する¶
複製からテーブルを削除するには、複製パラメーターコンテキストの Included Table Names または Included Table Regex パラメーターからテーブルが削除されていることを確認します。
後でテーブルを複製に再追加する場合は、まずSnowflakeで対応する宛先テーブルを削除します。その後、 Included Table Names または Included Table Regex パラメーターにテーブルを追加して戻します。これにより、テーブルの複製プロセスが新しく開始されます。
このアプローチは、失敗したテーブル複製シナリオからの復旧にも使用できます。
テーブルの列のサブセットを複製します。¶
コネクタは、テーブルごとに複製されるデータを構成列のサブセットにフィルターできます。
列にフィルターを適用するには、複製パラメーターコンテキストのColumn Filterプロパティを変更し、フィルターを適用したいテーブルごとに1エントリずつ、構成の配列を追加します。
列は、名前またはパターン別に包含したり、除外したりすることができます。テーブルごとに単一の条件を適用することも、複数の条件を組み合わせて適用することもできます。除外は常に包含より優先されます。
以下の例は、利用可能なフィールドを示したものです。schema および table は必須で、 included、 excluded、 includedPattern、 excludedPattern のうちの1つ以上が必要です。
[
{
"schema": "<source table schema>",
"table" : "<source table name>",
"included": ["<column name>", "<column name>"],
"excluded": ["<column name>", "<column name>"],
"includedPattern": "<regular expression>",
"excludedPattern": "<regular expression>",
}
]
パーティショニングされたテーブルを複製する¶
このコネクタは、バージョン15以降の PostgreSQL サーバーでパーティション化されたテーブルの複製をサポートしています。パーティショニングされた PostgreSQL テーブルは、単一の宛先テーブルとしてSnowflakeに複製されます。
たとえば、パーティショニングされたテーブル orders およびサブパーティションの orders_2023 と orders_2024 があって、コネクタが orders.* パターンに一致するすべてのテーブルを取り込むように構成されている場合、Snowflakeには orders テーブルのみが複製され、そこにすべてのサブパーティションのデータが含まれることになります。
パーティショニングされたテーブルの複製をサポートするには、確実に PostgreSQL で作成された パブリケーション の publish_via_partition_root オプションが true に設定されているようにしてください。
パーティショニングされたテーブルの取り込みには、現在のところ以下の制限があります。
取り込み開始後に、パーティショニングされたテーブルにパーティションとしてテーブルがアタッチされると、コネクタはアタッチ前にパーティションテーブルに存在していたデータを取得しません。
取り込み開始後に、サブパーティションテーブルがパーティションテーブルから切り離されると、コネクタはルートパーティションテーブルでこのサブパーティションのデータを削除済みとしてマークしません。
サブパーティションに対する切り捨て操作は、影響を受けるレコードを削除済みとしてマークしません。
テーブルでデータ変更を追跡する¶
コネクタはソーステーブルのデータの現在の状態だけでなく、すべての変更セットのすべての行の状態も複製します。このデータは、宛先テーブルと同じスキーマで作成されたジャーナルテーブルに格納されます。
ジャーナルテーブル名の形式は次のとおりです。<source table name>_JOURNAL_<timestamp><schema generation> where <timestamp> is the value of epoch seconds when the source table was added to replication, and <schema generation> は整数で、ソーステーブルのスキーマが変更されるごとに増加します。その結果、スキーマが変更されるソーステーブルには、複数のジャーナルテーブルがあります。
テーブルが複製から削除され、その後再び追加されると、 <タイムスタンプ> の値が変更され、 <スキーマ生成> が 1 から再び開始されます。
重要
Snowflakeは、ジャーナルテーブルの構造を一切変更しないことを推奨します。これらは、コネクタによって複製プロセスの一部として宛先テーブルを更新するために使用されます。
コネクタはジャーナルテーブルをドロップすることはありませんが、複製されたすべてのソーステーブルに最新のジャーナルを使用し、ジャーナル上の追加専用ストリームのみを読み取ります。ストレージを回収するには、以下を実行します。
すべてのジャーナルテーブルをいつでも切り捨てます。
複製から削除されたソーステーブルに関連するジャーナルテーブルをドロップします。
アクティブに複製されたテーブルの最新の生成ジャーナルテーブルを除いて、すべてをドロップします。
例えば、コネクタがソーステーブル orders をアクティブに複製するように設定されており、以前にテーブル customers を複製から削除した場合、以下のようなジャーナルテーブルが存在する可能性があります。この場合、 orders_5678_2 を 除いて、それらのすべてをドロップできます。
customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2
マージタスクのスケジュールを構成する¶
コネクタはウェアハウスを使用して、変更データキャプチャ(CDC)データを宛先テーブルにマージします。この操作は、 MergeSnowflakeJournalTable プロセッサーによってトリガーされます。新しい変更がない場合、または MergeSnowflakeJournalTable キューで待機する新しいフローファイルがない場合、マージはトリガーされず、ウェアハウスは自動サスペンドします。
ウェアハウスのコストを制限し、スケジュールされた時間のみにマージを制限するには、Merge task Schedule CRON パラメーターで CRON 式を使用します。MergeSnowflakeJournalTable プロセッサーに送られてくるフローファイルをスロットルし、マージは専用の期間のみにトリガーされます。スケジュールに関する詳細は、 スケジュールストラテジー をご参照ください。
コネクタを停止または削除する¶
コネクタを停止または削除する場合は、コネクタが使用する 複製スロット を考慮する必要があります。
コネクタは、 snowflake_connector_ で始まる名前にランダムなサフィックスが続く独自の複製スロットを作成します。コネクタは複製ストリームを読み取るとスロットを進め、 PostgreSQL は WAL ログをトリミングしてディスク領域を解放します。
コネクタが一時停止している場合、スロットは進行せず、ソースデータベースへの変更によって WAL ログサイズが増加し続けます。特にトラフィックの多いデータベースでは、コネクタを長期間にわたり一時停止したままにしないでください。
コネクタをOpenflowキャンバスから削除したり、Openflowインスタンス全体を削除するなどの方法で削除しても、複製スロットはそのまま残るため、手動でドロップする必要があります。
同じ PostgreSQL データベースから複製する複数のコネクタインスタンスがある場合、各インスタンスは固有の名前の複製スロットを作成します。複製スロットを手動でドロップする場合は、それが正しいスロットであることを確認してください。CaptureChangePostgreSQL プロセッサーの状態を確認することで、指定されたコネクタインスタンスでどの複製スロットが使用されているかを確認できます。
フローを実行する¶
プレーンを右クリックし、 Enable all Controller Services を選択します。
インポートしたプロセスグループを右クリックし、 Start を選択します。コネクタがデータの取り込みを開始します。