Openflow Connector for PostgreSQL について

注釈

このコネクタは、 Snowflakeコネクタ規約 に従うものとします。

このトピックでは、 Openflow Connector for PostgreSQL の基本概念、ワークフロー、および制限事項について説明します。

Openflow Connector for PostgreSQL について

Openflow Connector for PostgreSQL は、 PostgreSQL データベースインスタンスをSnowflakeに接続し、選択したテーブルのデータをほぼリアルタイムで、または指定したスケジュールで複製します。コネクタは、複製されたテーブルの現在の状態とともに利用可能なすべてのデータ変更のログも作成します。

ユースケース

このコネクタは、以下を実行する場合に使用します。

  • 包括的な一元化されたレポートのための、Snowflakeを使った PostgreSQL データの CDC 複製。

サポートされている PostgreSQL バージョン

サポートされている PostgreSQL バージョンは以下のとおりです。

サポートされている PostgreSQL バージョン

11

12

13

14

15

16

17

18

標準

有り

有り

有り

有り

有り

有り

有り

有り

AWS RDS

有り

有り

有り

有り

有り

有り

有り

有り

Amazon Aurora

有り

有り

有り

有り

有り

有り

有り

GCP Cloud SQL

有り

有り

有り

有り

有り

有り

有り

Azure Database

有り

有り

有り

有り

有り

有り

有り

Openflow要件

  • ランタイムのサイズはM以上である必要があります。大量のデータを複製する場合、特に行サイズが大きい場合は、より大きなランタイムを使用します。

  • コネクタは、マルチノードのOpenflowランタイムをサポートしていません。このコネクタのランタイムを、 Min nodes および Max nodes1 に設定して構成します。

制限事項

  • コネクタは PostgreSQL バージョン11以降をサポートしています。

  • コネクタは、 PostgreSQL を使用したユーザー名とパスワードによる認証のみをサポートしています。

  • コネクタは、16MBより大きい個々の値を複製しません。デフォルトでは、このような値を処理すると、関連するテーブルは永続的な失敗としてマークされます。テーブルの失敗を防ぐには、**オーバーサイズ値戦略**の宛先パラメーターを変更します。

  • コネクタは、Snowflakeのタイプ制限 を超えるデータが含まれるテーブルを複製しません。このルールの例外は、データのタイプが日付と時刻で範囲外の値を含む列です。詳細については、範囲外の値のサポート をご参照ください。

  • コネクタでは、複製されるすべてのテーブルに主キーが必要であり、そのテーブルの複製アイデンティティが主キーと同じである必要があります。

  • コネクタは、主キーの定義の変更、数値列の精度やスケールの変更を除いて、ソーステーブルのスキーマの変更をサポートしています。

  • コネクタは、ドロップされた列の再追加をサポートしていません。

注釈

特定の列を複製から除外することで、特定のテーブル列に影響する制限を回避できます。

ワークフロー

  1. データベース管理者 は、 PostgreSQL 複製設定を構成し、パブリケーションおよびコネクタの認証情報を作成します。オプションとして、 SSL 証明書を提供します。

  2. Snowflakeアカウント管理者 は以下のタスクを実行します。

    1. コネクタのサービスユーザー、コネクタのウェアハウス、複製する宛先データベースを作成します。

    2. コネクタをインストールします。

    3. フローテンプレートに必要なパラメーターを指定します。

    4. フローを実行します。コネクタは、Openflowの実行時に以下のタスクを実行します。

      1. ジャーナルテーブルのスキーマを作成します。

      2. 複製用に構成されたソーステーブルと一致するスキーマと宛先テーブルを作成します。

      3. テーブル複製のライフサイクルに従って複製を開始します。

コネクタの仕組み

以下のセクションでは、複製、スキーマの変更、データ保持など、さまざまなシナリオでコネクタがどのように機能するかについて説明します。

テーブルの複製方法

  1. スキーマのイントロスペクション: コネクタはソーステーブルの列、名前、タイプを検出し、Snowflakeとコネクタの制限事項に対してそれらを検証します。検証に失敗するとこのステージは失敗し、サイクルが完了します。スキーマイントロスペクションが正常に完了すると、コネクタは空の宛先テーブルを作成します。

  2. スナップショットロード: コネクタは、ソーステーブルで利用可能なすべてのデータを宛先テーブルにコピーします。このステージに失敗するとサイクルは終了し、それ以上のデータは複製されません。正常に完了すると、ソーステーブルのデータセット全体が宛先テーブルで利用可能になります。

  3. 増分ロード: コネクタはソーステーブルの変更の追跡と宛先テーブルへのコピーを続けます。これは、テーブルがレプリケーションから削除されるまで続きます。このステージで失敗すると、問題が解決するまでソーステーブルの複製が永久的に停止します。

    注釈

    このコネクタは、スナップショットのロードフェーズをバイパスして、新しく追加されたテーブルの増分変更の複製をすぐに開始するように構成できます。このオプションは、以前に複製されたデータが存在し、テーブルを再スナップショットせずに複製を続行するアカウントにコネクタを再インストールする場合に、よく役立ちます。

    スナップショットロードのバイパスと増分ロードプロセスの使用の詳細については、 増分複製 をご参照ください。

重要

接続エラーなどの一時的な障害は、テーブルの複製を妨げません。サポートされていないデータ型などの永続的な障害は、テーブルの複製を妨げます。永続的な障害によりテーブルの複製が妨げられた場合は、複製されたテーブルのリストからテーブルが削除されます。失敗の原因となった問題に対処した後、複製されたテーブルのリストにテーブルを追加し直すことができます。

TOASTed 値のサポート

コネクタは、タイプが arraybyteajsonjsonbtextvarcharxml である列について TOAST 値 を使用したテーブルの複製をサポートします。

コネクタは、CDC ストリームで TOASTed 値に遭遇するたびに、所定の列タイプ用にフォーマットされたデフォルトのプレースホルダー __previous_value_unchanged を代用し、これをジャーナルテーブルに保存します。その後、MERGE クエリがプレースホルダーの値を考慮して、宛先テーブルが常に最後の TOASTed ではない値を含むようにします。

範囲外の値のサポート

コネクタは、タイプが datetimestamptimestamptz であり範囲外の値を含む列のあるテーブルの複製をサポートします。コネクタが CDC ストリームで範囲外の値を検出した場合、列のタイプに基づいてデフォルトのプレースホルダーを代用します。

範囲外の値のプレースホルダー値

列タイプ

プレースホルダー値

date

-9999-01-01 から 9999-12-31

timestamp

0001-01-01 00:00:00 から 9999-12-31 23:59:59.999999999

timestamptz

0001-01-01 00:00:00+00 から 9999-12-31 23:59:59.999999999+00

注釈

-Infinity 値と Infinity 値も、3つの型すべてのそれぞれに対応するプレースホルダーに置き換えられます。

テーブル複製ステータス

接続エラーなどの一時的な障害は、テーブルの複製を妨げません。ただし、サポートされていないデータ型などの永続的な障害はテーブルの複製を妨げます。

複製の問題をトラブルシューティングする、または複製フローからテーブルが正常に削除されたことを確認するには、テーブル状態ストアをチェックします。

  1. Openflowランタイムキャンバスで、プロセッサーグループを右クリックし、Controller Services を選択します。コントローラーサービスをリストしたテーブルが表示されます。

  2. Table State Store というラベルの行を見つけて、行の右側にある More 他のオプションを示す3つの垂直の点 ボタンをクリックし、View State を選択します。

テーブルとその現在の状態が書かれたリストが表示されます。検索ボックスに入力して、テーブル名でリストをフィルタリングします。可能な状態は次のとおりです。

  • NEW:テーブルは複製がスケジュールされていますが、複製は開始されていません。

  • SNAPSHOT_REPLICATION:コネクタは既存のデータをコピーしています。このステータスは、すべての記録が宛先テーブルに保存されるまで表示されます。

  • INCREMENTAL_REPLICATION:コネクタは変更をアクティブに複製しています。このステータスは、スナップショット複製の終了後に表示され、テーブルが複製から削除されるか、複製が失敗するまで無期限に表示され続けます。

  • FAILED:エラーのため、複製が完全に停止しています。

注釈

Openflowランタイムキャンバスにはテーブルステータスの変更は表示されず、現在のテーブルステータスのみが表示されます。ただし、テーブルステータスの変更は発生するとログに記録されます。次のログメッセージを探します。

Replication state for table <database_name>.<schema_name>.<table_name> changed from <old_state> to <new_state>
Copy

永続的な障害によりテーブルの複製が妨げられた場合は、複製からテーブルが削除されます。失敗の原因となった問題に対処した後、複製にテーブルを追加し直すことができます。詳細については、:ref:`テーブル複製の再開 <label-of_postgres_restart_table_replication>`をご参照ください。

データ保持について

コネクタは、顧客データが自動的に削除されることのないデータ保持原則に従います。お客様は複製されたデータに対する完全な所有権と制御を維持し、コネクタは履歴情報を完全に削除するのではなく保持します。

このアプローチには次のような意味があります。

  • ソーステーブルから削除された行は、物理的に削除されるのではなく、宛先テーブルでソフト削除されます。

  • ソーステーブルからドロップされた列は、宛先テーブルではドロップされるのではなく名前が変更されます。

  • ジャーナルテーブルは無期限に保持され、自動的にクリーンアップされません。

宛先テーブルのメタデータ列

各宛先テーブルには、複製情報を追跡する次のメタデータ列が含まれます。

列名

説明

_SNOWFLAKE_INSERTED_AT

TIMESTAMP_NTZ

行が最初に宛先テーブルに挿入されたときのタイムスタンプ。

_SNOWFLAKE_UPDATED_AT

TIMESTAMP_NTZ

宛先テーブルで行が最後に更新されたときのタイムスタンプ。

_SNOWFLAKE_DELETED

BOOLEAN

行がソーステーブルから削除されたかどうかを示します。``true``の場合、行はソフト削除されており、ソースには存在しなくなります。

ソフト削除された行

ソーステーブルから行が削除されても、コネクタはそれを宛先テーブルから物理的に削除しません。代わりに、``_SNOWFLAKE_DELETED``メタデータ列を``true``に設定することにより、行は削除済みとしてマークされます。

このアプローチにより、以下が可能になります。

  • 監査またはコンプライアンスの目的で履歴データを保持する。

  • 必要に応じて削除されたレコードをクエリする。

  • 要件に基づいて、データを完全に削除するタイミングと方法を決定する。

アクティブな(削除されていない)行のみをクエリするには、``_SNOWFLAKE_DELETED``列でフィルターします。

SELECT * FROM my_table WHERE _SNOWFLAKE_DELETED = FALSE;
Copy

削除された行をクエリするには、以下のようにします。

SELECT * FROM my_table WHERE _SNOWFLAKE_DELETED = TRUE;
Copy

ドロップされた列

ソーステーブルから列がドロップされても、コネクタは宛先テーブルから対応する列をドロップしません。代わりに、履歴値を保持するために``__SNOWFLAKE_DELETED``サフィックスを追加して列の名前が変更されます。

たとえば、``EMAIL``という名前の列がソーステーブルからドロップされた場合、宛先テーブルでは``EMAIL__SNOWFLAKE_DELETED``に名前が変更されます。列がドロップされる前に存在していた行は元の値を保持しますが、ドロップ後に追加された行はこの列に``NULL``を持ちます。

名前が変更された列から履歴値を引き続きクエリできます。

SELECT EMAIL__SNOWFLAKE_DELETED FROM my_table;
Copy

列の名前の変更

CDC(変更データキャプチャ)メカニズムの制限により、コネクタは、列の名前変更と、列のドロップおよびその後の新しい列の追加を区別できません。その結果、ソーステーブルで列の名前を変更すると、コネクタはこれを元の列のドロップと、新しい名前の新しい列の追加という2つの別個の操作として扱います。

たとえば、ソーステーブルで列の名前を``A``から``B``に変更した場合、宛先テーブルには次が含まれます。

  • A__SNOWFLAKE_DELETED:名前変更前の値が含まれます。名前変更後に追加された行は、この列が``NULL``です。

  • B:名前変更後の値が含まれます。名前変更前に存在していた行は、この列が``NULL``です。

名前を変更した列のクエリ

元の列と名前が変更された列の両方からデータを、単一の統合された列として取得するには、``COALESCE``または``CASE``式を使用します。

SELECT
    COALESCE(B, A__SNOWFLAKE_DELETED) AS A_RENAMED_TO_B
FROM my_table;
Copy

または、``CASE``式を使用します。

SELECT
    CASE
        WHEN B IS NOT NULL THEN B
        ELSE A__SNOWFLAKE_DELETED
    END AS A_RENAMED_TO_B
FROM my_table;
Copy

名前を変更した列のビューの作成

宛先テーブルを手動で変更するのではなく、名前が変更された列を単一の統合された列として表示するビューを作成できます。このアプローチは、元のデータを保持し、進行中の複製に関する潜在的な問題を回避するために、推奨されます。

CREATE VIEW my_table_unified AS
SELECT
    *,
    COALESCE(B, A__SNOWFLAKE_DELETED) AS A_RENAMED_TO_B
FROM my_table;
Copy

重要

宛先テーブルの構造(列のドロップや名前変更など)を手動で変更することは推奨されません。進行中の複製に干渉して、データの不整合を引き起こす可能性があるためです。

ジャーナルテーブル

増分複製において、ソースデータベースからの変更は、宛先テーブルにマージされる前に、最初にジャーナルテーブルに書き込まれます。コネクタは、ジャーナルテーブルからデータを自動的に削除しません。このデータは、監査、デバッグ、再処理の目的で役立つ場合があります。

ジャーナルテーブルは、対応する宛先テーブルと同じスキーマで作成され、次の命名規則に従います。

<TABLE_NAME>_JOURNAL_<timestamp>_<number>

条件:

  • ``<TABLE_NAME>``は宛先テーブルの名前である。

  • ``<timestamp>``はUnixエポック形式(1970年1月1日からの秒数)の作成タイムスタンプであり、一意性を確保する。

  • ``<number>``は1から始まり、ソーステーブルのスキーマ変更または列フィルターの変更により宛先テーブルスキーマが変更されるたびに増加します。

たとえば、宛先テーブルが``SALES.ORDERS``の場合、ジャーナルテーブルの名前は``SALES.ORDERS_JOURNAL_1705320000_1``になる可能性があります。

重要

複製の進行中はジャーナルテーブルをドロップしないでください。アクティブなジャーナルテーブルを削除すると、データが失われたり、複製に失敗したりする可能性があります。対応するソーステーブルが複製から完全に削除された後にのみ、ジャーナルテーブルをドロップします。

ジャーナルテーブルストレージの管理

古いジャーナルデータを削除してストレージコストを管理する必要がある場合は、複製されなくなったテーブルのジャーナルテーブルを定期的にクリーンアップするSnowflakeタスクを作成できます。

ジャーナルのクリーンアップを実装する前に、次を確認してください。

  • 対応するソーステーブルが複製から完全に削除されていること。

  • 監査や処理の目的でジャーナルデータが不要であること。

自動クリーンアップのためのタスクの作成と管理について詳しくは、:doc:`タスクの紹介</user-guide/tasks-intro>`を参照してください。

次のステップ

コネクタがデータ型をSnowflakeのデータ型にどのようにマップするかを理解するには、:doc:`data-mapping`を確認してください。コネクタを設定するには、:doc:`setup`を確認してください。