の設定 Openflow Connector for PostgreSQL¶
注釈
コネクタには、 コネクタ利用規約 が適用されます。
このトピックでは、 Openflow Connector for PostgreSQL を設定する手順について説明します。
前提条件¶
Openflow Connector for PostgreSQL について を確認してください。
サポートされている PostgreSQL バージョン を確認してください。
推奨:ランタイムごとにコネクタインスタンスを1つだけ追加するようにします。
Openflowの設定 - BYOC または Openflowの設定 - Snowflakeデプロイメント - タスク概要 があることを確認してください。
データベース管理者として、以下のタスクを実行します。
WAL 用の PostgreSQL サーバーに十分なディスク容量があることを確認します。これは、複製スロットが作成されると、コネクタがその位置を確認して進むまで、 PostgreSQL が複製スロットが保持する位置の WAL データを保持するためです。
複製が有効なすべてのテーブルに主キーがあることを確認します。キーは単一列でも複合列でもかまいません。
テーブルの REPLICA IDENTITY を
DEFAULTに設定します。これにより、主キーが WAL で表現され、コネクタがそれを読み取れるようになります。コネクタのユーザーを作成します。コネクタには、
REPLICATION属性と、複製されたすべてのテーブルから SELECT にアクセスする権限を持つユーザーが必要です。コネクタの構成に入るためのパスワードを持つユーザーを作成します。複製セキュリティの詳細については、 セキュリティ をご参照ください。
Snowflakeアカウント管理者として、以下のタスクを実行します。
タイプを SERVICE としてSnowflakeユーザーを作成します。複製データを格納するデータベースを作成し、Snowflakeユーザーに USAGE および CREATE SCHEMA 権限 を付与して、そのデータベースにオブジェクトを作成する権限を設定します。
CREATE DATABASE <destination_database>; CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow'; CREATE ROLE <openflow_role>; GRANT ROLE <openflow_role> TO USER <openflow_user>; GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>; GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>; CREATE WAREHOUSE <openflow_warehouse> WITH WAREHOUSE_SIZE = 'MEDIUM' AUTO_SUSPEND = 300 AUTO_RESUME = TRUE; GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
安全なキーのペア(パブリックおよび秘密)を作成します。コネクタの構成時に使用するために、ユーザーの秘密キーを ファイルに保存します。Snowflakeユーザーに公開キーを割り当てます:
ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
詳細については、:doc:` キーペア認証 </user-guide/key-pair-auth>` をご参照ください。
コネクタが使用するウェアハウスを指定します。で開始する
MEDIUMウェアハウスのサイズから、レプリケートされるテーブルの量とデータの転送量に応じてサイズを実験します。多数のテーブルを扱う場合、通常はウェアハウスサイズではなく、 マルチクラスターウェアハウス を使用した方がスケールしやすくなります。
wal_levelの構成¶
Openflow Connector for PostgreSQL では wal_level を logical に設定する必要があります。
PostgreSQL サーバーがホストされている場所に応じて、wal_levelを以下のように構成できます:
オンプレミス |
スーパーユーザーまたは
|
RDS |
エージェントが使用するユーザには、 次も設定する必要があります。
|
AWS Aurora |
|
GCP |
次のフラグを設定します。
|
Azure |
レプリケーションサポートを |
パブリケーションの作成¶
Openflow Connector for PostgreSQL では、複製を開始する前に、 パブリケーション を PostgreSQL で作成し、構成する必要があります。すべてのテーブル、またはテーブルのサブセットに対して作成することも、特定の列のみを持つ特定のテーブルに対して作成することもできます。複製を予定しているすべてのテーブルと列がパブリケーションに含まれていることを確認してください。コネクタの実行中であれば、後でパブリケーションを変更することもできます。パブリケーションを作成および構成するには、以下の手順を実行します。
データベースに対する権限、を持つユーザーとしてログインし、次のクエリを実行します:
CREATE PUBLICATION <publication name>;次を使用して、データベースエージェントが閲覧できるテーブルを定義します。
ALTER PUBLICATION <publication name> ADD TABLE <table name>;重要
PostgreSQL 15以降 では、指定されたテーブル列のサブセットに対するパブリケーションの構成をサポートしています。コネクタがこれを正しくサポートするには、 列フィルタリング設定 を使用して、パブリケーションで設定された列と同じ列を含める必要があります。
この設定がないと、コネクタは以下のように動作します:
宛先データベースでは、フィルターに含まれていない列に
__DELETEDというサフィックスが付きます。スナップショットフェーズ中にレプリケートされたすべてのデータは保持されます。パブリケーションに新しい列を追加すると、テーブルは永久に失敗し、複製を再起動する必要があります。複製。
詳細については、 ALTER PUBLICATION をご参照ください。
コネクタをインストールする¶
Openflowの概要ページに移動します。Featured connectors セクションで、 View more connectors を選択します。
Openflowのコネクタページでコネクタを探し、 Add to runtime を選択します。
Select runtime ダイアログで、 Available runtimes ドロップダウンリストからランタイムを選択します。
Add を選択します。
注釈
コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベースとスキーマをSnowflakeで作成したことを確認します。
Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。
Snowflakeアカウント認証情報でランタイムを認証します。
コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。
コネクタを構成する¶
以下のユースケースにコネクタを構成できます。
リアルタイムでテーブルのセットを複製する¶
インポートしたプロセスグループを右クリックし、 Parameters を選択します。
フローパラメーター の説明に従って、必要なパラメーター値を入力します。
フローパラメーター¶
まず、 PostgreSQL ソースパラメーターコンテキストのパラメーターを設定し、次に PostgreSQL 宛先パラメーターコンテキストを設定します。これが完了したら、コネクタを有効にして、 PostgreSQL とSnowflakeの両方に接続し、実行を開始します。ただし、明示的にテーブルが構成に追加されるまでは、データの複製は行われません。
複製する特定のテーブルを構成するには、 PostgreSQL 取り込みパラメーターコンテキストを編集します。複製パラメーターコンテキストに変更を適用した直後に、コネクタが構成を取得し、各テーブルの複製ライフサイクルが開始します。
PostgreSQL ソースパラメーターコンテキスト¶
パラメーター |
説明 |
|---|---|
Postgres 接続 URL |
ソースデータベースへの完全な JDBC URL。例: PostgreSQL レプリカサーバーに接続している場合、PostgreSQL レプリカサーバーからテーブルを複製する をご参照ください。 |
Postgres JDBC ドライバー |
PostgreSQL JDBC ドライバーのjar へのパス。ウェブサイトからjarをダウンロードし、 Reference asset チェックボックスを選択してアップロードして添付します。 |
Postgres SSL モード |
SSL 接続を有効または無効にします。 |
Postgresルート SSL 証明書 |
データベースのルート証明書のフルコンテンツ。SSL が無効の場合はオプション。 |
Postgresユーザー名 |
コネクタのユーザー名。 |
Postgresパスワード |
コネクタのパスワード。 |
PostgreSQL 宛先パラメーターコンテキスト¶
パラメーター |
説明 |
必須 |
|---|---|---|
宛先データベース |
データが永続化されるデータベース。Snowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。 |
有り |
宛先スキーマ |
データが永続化されるスキーマ。これはSnowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。 次の例をご参照ください。
|
有り |
Snowflakeアカウント識別子 |
以下を使用する場合:
|
有り |
Snowflake認証ストラテジー |
以下を使用する場合:
|
有り |
Snowflake秘密キー |
以下を使用する場合:
|
無し |
Snowflake秘密キーファイル |
以下を使用する場合:
|
無し |
Snowflake秘密キーパスワード |
以下を使用する場合
|
無し |
Snowflakeロール |
以下を使用する場合
|
有り |
Snowflakeのユーザー名 |
以下を使用する場合
|
有り |
Snowflakeウェアハウス |
クエリの実行に使用されるSnowflakeウェアハウス。 |
有り |
PostgreSQL 取り込みパラメーターコンテキスト¶
パラメーター |
説明 |
|---|---|
含まれるテーブル名 |
スキーマを含む、テーブルパスのコンマ区切りリスト。例: 名前または正規表現のいずれかでテーブルを選択します。両方を使用すると、いずれかのオプションで一致するすべてのテーブルが含まれます。 |
含まれるテーブル正規表現 |
テーブルパスに一致させる正規表現。式に一致するすべてのパスが複製され、後から作成されたパターンに一致する新しいテーブルも自動的に含まれます。例: 名前または正規表現のいずれかでテーブルを選択します。両方を使用すると、いずれかのオプションで一致するすべてのテーブルが含まれます。 |
列フィルター JSON |
オプション。完全修飾テーブル名のリストと、複製に含める必要がある列名の正規表現パターンが含まれる JSON。例: |
タスクスケジュール CRON をマージする |
ジャーナルから宛先テーブルへのマージ操作がトリガーされる期間を定義する CRON 式。連続的なマージやタイムスケジュールでウェアハウスの実行時間を制限したい場合は、 例:
その他の情報と例については、 Quartzドキュメント のcronトリガーチュートリアルをご参照ください。 |
PostgreSQL レプリカサーバーからテーブルを複製する¶
コネクタは、論理的複製 を使用してプライマリサーバー、ホットスタンバイレプリカ、またはサブスクライバーサーバーからデータをインジェストできます。PostgreSQL レプリカに接続するコネクタを構成する前に、プライマリノードとレプリカノード間の複製が正しく機能することを確認します。コネクタでデータが欠落している問題を調査する場合は、まずコネクタが使用するレプリカサーバーに欠落している行が存在することを確認します。
スタンバイレプリカに接続する際のその他の考慮事項:
ホットスタンバイレプリカへの接続のみがサポートされています。ウォームスタンバイレプリカは、プライマリインスタンスにプロモートされるまでクライアントからの接続を受け入れることができないことに注意してください。
サーバーの PostgreSQL バージョンは>= 16である必要があります。
コネクタが必要とする:ref:
パブリケーション <label-postgres_connector_create_a_publication>は、スタンバイサーバーではなく、プライマリサーバーで作成する必要があります。スタンバイサーバーは読み取り専用で、パブリケーションの作成は許可されません。
ホットスタンバイインスタンスに接続して :ui:` 複製スロットを作成しようとすると、'<replication slot>' がタイムアウトします。スタンバイインスタンスに接続する場合は、プライマリ PostgreSQL インスタンスにトラフィックがあることを確認します。そうでなければ、複製スロットを作成するための呼び出しは`Openflow通知でエラーを返さないか、 Read PostgreSQL CDC Stream プロセッサーが起動しません。プライマリ PostgreSQL インスタンスにログインし、次のクエリを実行します。
SELECT pg_log_standby_snapshot();
エラーは、プライマリサーバーにデータ変更がない場合に発生します。そのため、レプリカサーバーで複製スロットを作成しているときに、コネクタが停止する可能性があります。これは、レプリカサーバーが複製スロットを作成できるように、プライマリサーバーから実行中のトランザクションに関する情報を要求するためです。プライマリサーバーはアイドル状態の間、情報を送信しません。pg_log_standby_snapshot() 関数は、プライマリサーバーが実行中のトランザクションに関する情報をレプリカサーバーに送信するように強制します。
テーブルを削除し、複製に再追加する¶
複製からテーブルを削除するには、複製パラメーターコンテキストの Included Table Names または Included Table Regex パラメーターからテーブルが削除されていることを確認します。
後でテーブルを複製に再追加する場合は、まずSnowflakeで対応する宛先テーブルを削除します。その後、 Included Table Names または Included Table Regex パラメーターにテーブルを追加して戻します。これにより、テーブルの複製プロセスが新しく開始されます。
このアプローチは、失敗したテーブル複製シナリオからの復旧にも使用できます。
テーブルの列のサブセットを複製します。¶
コネクタは、テーブルごとに複製されるデータを構成列のサブセットにフィルターできます。
列にフィルターを適用するには、複製パラメーターコンテキストのColumn Filterプロパティを変更し、フィルターを適用したいテーブルごとに1エントリずつ、構成の配列を追加します。
列は、名前またはパターン別に包含したり、除外したりすることができます。テーブルごとに単一の条件を適用することも、複数の条件を組み合わせて適用することもできます。除外は常に包含より優先されます。
以下の例は、利用可能なフィールドを示したものです。schema および table は必須で、 included、 excluded、 includedPattern、 excludedPattern のうちの1つ以上が必要です。
[
{
"schema": "<source table schema>",
"table" : "<source table name>",
"included": ["<column name>", "<column name>"],
"excluded": ["<column name>", "<column name>"],
"includedPattern": "<regular expression>",
"excludedPattern": "<regular expression>",
}
]
テーブルでデータ変更を追跡する¶
コネクタはソーステーブルのデータの現在の状態だけでなく、すべての変更セットのすべての行の状態も複製します。このデータは、宛先テーブルと同じスキーマで作成されたジャーナルテーブルに格納されます。
ジャーナルテーブル名の形式は次のとおりです。<source table name>_JOURNAL_<timestamp><schema generation> where <timestamp> is the value of epoch seconds when the source table was added to replication, and <schema generation> は整数で、ソーステーブルのスキーマが変更されるごとに増加します。その結果、スキーマが変更されるソーステーブルには、複数のジャーナルテーブルがあります。
テーブルが複製から削除され、その後再び追加されると、 <タイムスタンプ> の値が変更され、 <スキーマ生成> が 1 から再び開始されます。
重要
Snowflakeは、ジャーナルテーブルの構造を一切変更しないことを推奨します。これらは、コネクタによって複製プロセスの一部として宛先テーブルを更新するために使用されます。
コネクタはジャーナルテーブルをドロップすることはありませんが、複製されたすべてのソーステーブルに最新のジャーナルを使用し、ジャーナル上の追加専用ストリームのみを読み取ります。ストレージを回収するには、以下を実行します。
すべてのジャーナルテーブルをいつでも切り捨てます。
複製から削除されたソーステーブルに関連するジャーナルテーブルをドロップします。
アクティブに複製されたテーブルの最新の生成ジャーナルテーブルを除いて、すべてをドロップします。
例えば、コネクタがソーステーブル orders をアクティブに複製するように設定されており、以前にテーブル customers を複製から削除した場合、以下のようなジャーナルテーブルが存在する可能性があります。この場合、 orders_5678_2 を 除いて、それらのすべてをドロップできます。
customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2
マージタスクのスケジュールを構成する¶
コネクタはウェアハウスを使用して、変更データキャプチャ(CDC)データを宛先テーブルにマージします。この操作は、 MergeSnowflakeJournalTable プロセッサーによってトリガーされます。新しい変更がない場合、または MergeSnowflakeJournalTable キューで待機する新しいフローファイルがない場合、マージはトリガーされず、ウェアハウスは自動サスペンドします。
ウェアハウスのコストを制限し、スケジュールされた時間のみにマージを制限するには、Merge task Schedule CRON パラメーターで CRON 式を使用します。MergeSnowflakeJournalTable プロセッサーに送られてくるフローファイルをスロットルし、マージは専用の期間のみにトリガーされます。スケジュールに関する詳細は、 スケジュールストラテジー をご参照ください。
コネクタを停止または削除する¶
コネクタを停止または削除する場合は、コネクタが使用する 複製スロット を考慮する必要があります。
コネクタは、 snowflake_connector_ で始まる名前にランダムなサフィックスが続く独自の複製スロットを作成します。コネクタは複製ストリームを読み取るとスロットを進め、 PostgreSQL は WAL ログをトリミングしてディスク領域を解放します。
コネクタが一時停止している場合、スロットは進行せず、ソースデータベースへの変更によって WAL ログサイズが増加し続けます。特にトラフィックの多いデータベースでは、コネクタを長期間にわたり一時停止したままにしないでください。
コネクタをOpenflowキャンバスから削除したり、Openflowインスタンス全体を削除するなどの方法で削除しても、複製スロットはそのまま残るため、手動でドロップする必要があります。
同じ PostgreSQL データベースから複製する複数のコネクタインスタンスがある場合、各インスタンスは固有の名前の複製スロットを作成します。複製スロットを手動でドロップする場合は、それが正しいスロットであることを確認してください。CaptureChangePostgreSQL プロセッサーの状態を確認することで、指定されたコネクタインスタンスでどの複製スロットが使用されているかを確認できます。
フローを実行する¶
プレーンを右クリックし、 Enable all Controller Services を選択します。
インポートしたプロセスグループを右クリックし、 Start を選択します。コネクタがデータの取り込みを開始します。