Openflow Connector for PostgreSQL の設定

注釈

このコネクタは、 Snowflakeコネクタ規約 に従うものとします。

このトピックでは、 Openflow Connector for PostgreSQL を設定する手順について説明します。

注釈

このコネクタは、スナップショットのロードフェーズをバイパスして、新しく追加されたテーブルの増分変更の複製をすぐに開始するように構成できます。このオプションは、以前に複製されたデータが存在し、テーブルを再スナップショットせずに複製を続行するアカウントにコネクタを再インストールする場合に、よく役立ちます。

増分ロードプロセスの詳細については、 増分複製 をご参照ください。

失敗したテーブルの複製の再開については、:ref:`テーブル複製の再開<label-of_postgres_restart_table_replication>`を参照してください。

前提条件

  1. Openflow Connector for PostgreSQL について を確認してください。

  2. サポートされている PostgreSQL バージョン を確認してください。

  3. 推奨:ランタイムごとにコネクタインスタンスを1つだけ追加するようにします。

  4. :doc:`/user-guide/data-integration/openflow/setup-openflow-byoc`または:doc:`Openflow のセットアップ - Snowflake デプロイメント</user-guide/data-integration/openflow/setup-openflow-spcs>`があることを確認してください。

  5. |OFSFSPCS-plural|を使用する場合、:doc:`必要なドメインの構成</user-guide/data-integration/openflow/setup-openflow-spcs-sf-allow-list>`を精査し、:ref:`label-openflow_domains_used_by_openflow_connectors_postgresql`コネクタに必要なドメインへのアクセス許可を付与していることを確認してください。

  6. データベース管理者として、以下のタスクを実行します。

    1. wal_levelの構成

    2. パブリケーションの作成

    3. WAL 用の PostgreSQL サーバーに十分なディスク容量があることを確認します。これは、複製スロットが作成されると、コネクタがその位置を確認して進むまで、 PostgreSQL が複製スロットが保持する位置の WAL データを保持するためです。

    4. サーバー上の各 Openflow Connector for PostgreSQL コネクタインスタンスにつき、少なくとも 1 つの論理複製スロットと 2 つの WAL 送信者を確保してください。max_replication_slots および max_wal_senders を、そのトラフィックおよびインスタンスの他のすべての複製トラフィックをカバーするのに十分な高さに設定してください。

    5. 複製が有効なすべてのテーブルに主キーがあることを確認します。キーは単一列でも複合列でもかまいません。

    6. テーブルの REPLICA IDENTITYDEFAULT に設定します。これにより、主キーが WAL で表現され、コネクタがそれを読み取れるようになります。

    7. コネクタのユーザーを作成します。コネクタには、 REPLICATION 属性と、複製されたすべてのテーブルから SELECT にアクセスする権限を持つユーザーが必要です。コネクタの構成に入るためのパスワードを持つユーザーを作成します。複製セキュリティの詳細については、 セキュリティ をご参照ください。

  7. Snowflakeアカウント管理者として、以下のタスクを実行します。

    1. タイプを SERVICE としてSnowflakeユーザーを作成します。複製データを格納するデータベースを作成し、Snowflakeユーザーに USAGE および CREATE SCHEMA 権限 を付与して、そのデータベースにオブジェクトを作成する権限を設定します。

      CREATE DATABASE <destination_database>;
      CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow';
      CREATE ROLE <openflow_role>;
      GRANT ROLE <openflow_role> TO USER <openflow_user>;
      GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>;
      GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
      CREATE WAREHOUSE <openflow_warehouse>
        WITH
          WAREHOUSE_SIZE = 'XSMALL'
          AUTO_SUSPEND = 300
          AUTO_RESUME = TRUE;
      GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
      
    2. 安全なキーのペア(パブリックおよび秘密)を作成します。コネクタの構成時に使用するために、ユーザーの秘密キーを ファイルに保存します。Snowflakeユーザーに公開キーを割り当てます:

      ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
      

      詳細については、:doc:` キーペア認証 </user-guide/key-pair-auth>` をご参照ください。

    3. コネクタが使用するウェアハウスを指定します。で開始する XSMALL ウェアハウスのサイズから、レプリケートされるテーブルの量とデータの転送量に応じてサイズを実験します。多数のテーブルを扱う場合、通常はウェアハウスサイズではなく、 マルチクラスターウェアハウス を使用した方がスケールしやすくなります。

wal_levelの構成

Openflow Connector for PostgreSQL では wal_levellogical に設定する必要があります。

PostgreSQL サーバーがホストされている場所に応じて、wal_levelを以下のように構成できます:

オンプレミス

スーパーユーザーまたは ALTER SYSTEM 権限を持つユーザーで、次のクエリを実行します。

ALTER SYSTEM SET wal_level = logical;

RDS

エージェントが使用するユーザには、 rds_superuser または rds_replication ロールが割り当てられている必要があります。

次も設定する必要があります。

  • rds.logical_replication 静的パラメーターを1に設定します。

  • データベースとレプリケーションの設定に応じて、 max_replication_slotsmax_connectionsmax_wal_senders パラメーターを設定します。

AWS Aurora

rds.logical_replication 情的パラメーターを1に設定します。

GCP

次のフラグを設定します。

  • cloudsql.logical_decoding=on

  • cloudsql.enable_pglogical=on

詳細については、 Google Cloudドキュメント をご参照ください。

Azure

レプリケーションサポートを Logical に設定します。詳細については、 Azureドキュメンテーション をご参照ください。

パブリケーションの作成

Openflow Connector for PostgreSQL では、複製を開始する前に、 パブリケーション を PostgreSQL で作成し、構成する必要があります。すべてのテーブル、またはテーブルのサブセットに対して作成することも、特定の列のみを持つ特定のテーブルに対して作成することもできます。複製を予定しているすべてのテーブルと列がパブリケーションに含まれていることを確認してください。コネクタの実行中であれば、後でパブリケーションを変更することもできます。パブリケーションを作成および構成するには、以下の手順を実行します。

  1. データベースに対する権限、を持つユーザーとしてログインし、次のクエリを実行します:

    • PostgreSQL 13以降の場合

      CREATE PUBLICATION <publication name> WITH (publish_via_partition_root = true);
      

      パーティション分割されたテーブルを正しく複製するためには、 publish_via_partition_root を追加する必要があります。パーティション分割されたテーブルの取り込みについては、 パーティショニングされたテーブルを複製する をご参照ください。

    • PostgreSQL 13より前のバージョンの場合

      CREATE PUBLICATION <publication name>;
      
  2. 次を使用して、データベースエージェントが閲覧できるテーブルを定義します。

ALTER PUBLICATION <publication name> ADD TABLE <table name>;

パーティション分割されたテーブルの場合、ルートパーティションテーブルをパブリケーションに追加するだけで十分です。詳細については、 パーティショニングされたテーブルを複製する をご参照ください。

重要

PostgreSQL 15以降 では、指定されたテーブル列のサブセットに対するパブリケーションの構成をサポートしています。コネクタがこれを正しくサポートするには、 列フィルタリング設定 を使用して、パブリケーションで設定された列と同じ列を含める必要があります。

この設定がないと、コネクタは以下のように動作します:

  • 宛先データベースでは、フィルターに含まれていない列に __DELETED というサフィックスが付きます。スナップショットフェーズ中にレプリケートされたすべてのデータは保持されます。

  • パブリケーションに新しい列を追加すると、テーブルは永久に失敗し、複製を再起動する必要があります。

詳細については、 ALTER PUBLICATION をご参照ください。

コネクタをインストールする

コネクタをインストールするには、データエンジニアとして次を実行します。

  1. Openflow概要ページに移動します。Featured connectors セクションで、 View more connectors を選択します。

  2. Openflowのコネクタページでコネクタを探し、 Add to runtime を選択します。

  3. Select runtime ダイアログで、Available runtimes ドロップダウンリストからランタイムを選択して Add をクリックします。

    注釈

    コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベースとスキーマをSnowflakeで作成したことを確認します。

  4. Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。

  5. Snowflakeアカウント認証情報でランタイムを認証します。

コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。

コネクタを構成する

コネクタを構成するには、データエンジニアとして次の手順を実行します。

  1. インポートしたプロセスグループを右クリックし、 Parameters を選択します。

  2. 必要なパラメーター値を入力します。

    必要なパラメーター値について詳しくは、次のセクションを参照してください。

まず、 PostgreSQL ソースパラメーターコンテキストのパラメーターを設定し、次に PostgreSQL 宛先パラメーターコンテキストのパラメーターを設定します。これが完了したら、コネクタを有効にすることができ、 PostgreSQL とSnowflakeの両方に接続して、実行が開始されることになります。しかし、明示的にテーブルが構成に追加されるまでは、いかなるデータも複製されません。

複製する特定のテーブルを構成するには、 PostgreSQL 取り込みパラメーターコンテキストを編集します。複製パラメーターコンテキストに変更を適用した直後に、コネクタが構成を取得し、各テーブルの複製ライフサイクルが開始します。

PostgreSQLソースパラメーター

パラメーター

説明

PostgreSQL 接続 URL

ソースデータベースへの完全な JDBC URL。例: jdbc:postgresql://example.com:5432/public

PostgreSQL レプリカサーバーに接続している場合、PostgreSQL レプリカサーバーからテーブルを複製する をご参照ください。

PostgreSQL JDBC ドライバー

PostgreSQL JDBC ドライバーのjar へのパス。ウェブサイトからjarをダウンロードし、 Reference asset チェックボックスを選択してアップロードして添付します。

PostgreSQL ユーザー名

コネクタのユーザー名。

PostgreSQL パスワード

コネクタのパスワード。

パブリケーション名

以前に作成したパブリケーションの名前。

複製スロット名

オプション。値が指定されていない場合、コネクタは新しい一意の名前のスロットを作成します。値が指定されている場合、コネクタは既存のスロットを使用するか、指定された名前で新しいスロットを作成します。

実行中のコネクタの値を変更すると、更新されたスロットの位置から増分変更データキャプチャ(CDC)ストリームの読み取りが再開されます。

PostgreSQL宛先パラメーター

パラメーター

説明

必須

宛先データベース

データが永続化されるデータベース。Snowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。

有り

Snowflake認証ストラテジー

以下を使用する場合:

  • Snowflake Openflow Deployment または BYOC:SNOWFLAKE_MANAGED_TOKEN を使用します。このトークンはSnowflakeによって自動的に管理されます。 BYOC デプロイメントでは、 SNOWFLAKE_MANAGED_TOKEN を使用するために、事前に ランタイムロール が構成されている必要があります。

  • BYOC: 代わりに、BYOC では認証戦略の値として KEY_PAIR を使用できます。

有り

Snowflakeアカウント識別子

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR: データが永続化される[organization-name]-[account-name]形式のSnowflakeアカウント名。

有り

Snowflake接続の戦略

KEY_PAIRを使用する場合は、Snowflakeに接続するための戦略を特定します。

  • STANDARD (デフォルト):Snowflakeサービスへの標準パブリックルーティングを使用して接続します。

  • PRIVATE_CONNECTIVITY:AWS PrivateLinkなど、サポート対象のクラウドプラットフォームに関連付けられたプライベートアドレスを使用して接続します。

KEY_PAIRを使用したBYOCのみに必要です。それ以外の場合は無視されます。

Snowflake秘密キー

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:認証に使用される RSA プライベートキーである必要があります。

    その RSA キーは PKCS8 標準に従ってフォーマットされ、標準の PEM ヘッダーとフッターを持つ必要があります。SnowflakeプライベートキーファイルまたはSnowflakeプライベートキーのいずれかを定義する必要があることに注意してください。

無し

Snowflake秘密キーファイル

以下を使用する場合:

  • Session token authentication strategy:プライベートキーファイルは空白である必要があります。

  • KEY_PAIR:Snowflakeへの認証に使用される RSA プライベートキーを含むファイルをアップロードします。これは、PKCS8 標準に従ってフォーマットされ、標準の PEM ヘッダーとフッターを含んでいる必要があります。ヘッダー行は -----BEGIN PRIVATE で始まります。プライベートキーファイルをアップロードするには、Reference asset チェックボックスを選択します。

無し

Snowflake秘密キーパスワード

以下を使用する場合

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:Snowflakeプライベートキーファイルに関連付けられたパスワードを提供します。

無し

Snowflakeロール

以下を使用する場合

  • Session Token Authentication Strategy :ランタイムに割り当てられたSnowflakeロール、またはこのSnowflakeロールに付与された子ロールを使用します。Openflow UI のランタイムのSnowflakeロールは、ランタイムの More Options [⋮] ボタンを展開して Set Snowflake role を選択することで見つけることができます。

  • KEY_PAIR Authentication Strategy:サービスユーザーのために構成された有効なロールを使用します。

有り

Snowflakeのユーザー名

以下を使用する場合

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:Snowflakeインスタンスへの接続に使用するユーザー名を提供します。

有り

オーバーサイズ値戦略

複製中にコネクタが内部サイズ制限(16MB)を超える値を処理する方法を決定します。可能な値は次のとおりです。

  • **失敗テーブル**(デフォルト):テーブルは永久に失敗したものとしてマークされ、そのテーブルの複製が停止します。

  • Nullの設定:値は宛先テーブルで``NULL``に置き換えられます。オーバーサイズ値を超えるテーブル内のデータが失われても許容できる場合、これを使用してテーブルの失敗を防ぎます。

無し

Snowflakeウェアハウス

クエリの実行に使用されるSnowflakeウェアハウス。

有り

PostgreSQL取り込みパラメーター

パラメーター

説明

含まれるテーブル名

スキーマを含む、テーブルパスのコンマ区切りリスト。例: public.my_table, other_schema.other_table

名前または正規表現のいずれかでテーブルを選択します。両方を使用すると、いずれかのオプションで一致するすべてのテーブルが含まれます。

サブパーティションであるテーブルは常に取り込みから除外されます。詳細については、 パーティショニングされたテーブルを複製する をご参照ください。

含まれるテーブル正規表現

テーブルパスに一致させる正規表現。式に一致するすべてのパスが複製され、後から作成されたパターンに一致する新しいテーブルも自動的に含まれます。例: public\.auto_.*

名前または正規表現のいずれかでテーブルを選択します。両方を使用すると、いずれかのオプションで一致するすべてのテーブルが含まれます。

サブパーティションであるテーブルは常に取り込みから除外されます。詳細については、 パーティショニングされたテーブルを複製する をご参照ください。

列フィルター JSON

オプション。テーブルごとに含める列または除外する列を指定するフィルターオブジェクトのJSON配列。構文の詳細と例については、:ref:`label-postgres_connector_replication_subset_of_columns`を参照してください。

タスクスケジュール CRON をマージする

ジャーナルから宛先テーブルへのマージ操作がトリガーされる期間を定義する CRON 式。連続的なマージやタイムスケジュールでウェアハウスの実行時間を制限したい場合は、 * * * * * ? に設定します。

例:

  • * 0 * * * ? という文字列は、丸1時間で1分間のマージをスケジュールしたいことを示しています。

  • * 20 14 ? * MON-FRI という文字列は、毎週月曜日から金曜日の2:20 PM にマージをスケジュールしたいことを示しています。

その他の情報と例については、 Quartzドキュメント のcronトリガーチュートリアルをご参照ください。

オブジェクト識別子の解決

スキーマ名、テーブル名、列名などのソースオブジェクト識別子がSnowflakeにどのように保存され、クエリされるかを指定します。この設定は、 SQL クエリで二重引用符を使用する必要があることを指定します。

:emph:`オプション1:(デフォルト)大文字と小文字を区別する。 `下位互換性のため。

  • 変換:大文字小文字は維持されます。たとえば、 My_TableMy_Table のままです。

  • クエリ: SQL クエリは、データベースオブジェクトの大文字と小文字を正確に一致させるために、二重引用符を使用する必要があります。例: SELECT * FROM "My_Table";

注釈

Snowflakeでは、レガシーまたは互換性の理由からソースの大文字小文字の区別を保持する必要がある場合、このオプションを使用することを推奨しています。たとえば、 MY_TABLEmy_table のように大文字と小文字だけが異なるテーブル名がソースデータベースに含まれている場合、大文字と小文字を区別しない比較を使用すると、名前の衝突が発生します。

:emph:`オプション 2:(推奨)大文字と小文字を区別しない `

  • 変換:識別子はすべて大文字に変換されます。たとえば、 My_TableMY_TABLE になります。

  • クエリ: SQL クエリは大文字と小文字を区別せず、 SQL 二重引用符は必要ありません。たとえば、 SELECT * FROM my_table;SELECT * FROM MY_TABLE; と同じ結果を返します。

注釈

Snowflakeでは、データベースオブジェクトに大文字と小文字が混在する名前が想定されない場合、このオプションを使用することを推奨しています。

重要

コネクタがデータのインジェスチョンを開始した後は、この設定を変更しないでください。取り込み開始後にこの設定を変更すると、既存の取り込みは中断されます。この設定を変更する必要がある場合は、新しいコネクタインスタンスを作成してください。

PostgreSQL レプリカサーバーからテーブルを複製する

コネクタは、論理的複製 を使用してプライマリサーバー、ホットスタンバイレプリカ、またはサブスクライバーサーバーからデータをインジェストできます。PostgreSQL レプリカに接続するコネクタを構成する前に、プライマリノードとレプリカノード間の複製が正しく機能することを確認します。コネクタでデータが欠落している問題を調査する場合は、まずコネクタが使用するレプリカサーバーに欠落している行が存在することを確認します。

スタンバイレプリカに接続する際のその他の考慮事項:

  • ホットスタンバイレプリカへの接続のみがサポートされています。ウォームスタンバイレプリカは、プライマリインスタンスにプロモートされるまでクライアントからの接続を受け入れることができないことに注意してください。

  • サーバーの PostgreSQL バージョンは>= 16である必要があります。

  • コネクタが必要とする:ref:パブリケーション <label-postgres_connector_create_a_publication> は、スタンバイサーバーではなく、プライマリサーバーで作成する必要があります。スタンバイサーバーは読み取り専用で、パブリケーションの作成は許可されません。

ホットスタンバイインスタンスに接続して :ui:` 複製スロットを作成しようとすると、'<replication slot>' がタイムアウトします。スタンバイインスタンスに接続する場合は、プライマリ PostgreSQL インスタンスにトラフィックがあることを確認します。そうでなければ、複製スロットを作成するための呼び出しは`Openflow通知でエラーを返さないか、 Read PostgreSQL CDC Stream プロセッサーが起動しません。プライマリ PostgreSQL インスタンスにログインし、次のクエリを実行します。

SELECT pg_log_standby_snapshot();

エラーは、プライマリサーバーにデータ変更がない場合に発生します。そのため、レプリカサーバーで複製スロットを作成しているときに、コネクタが停止する可能性があります。これは、レプリカサーバーが複製スロットを作成できるように、プライマリサーバーから実行中のトランザクションに関する情報を要求するためです。プライマリサーバーはアイドル状態の間、情報を送信しません。pg_log_standby_snapshot() 関数は、プライマリサーバーが実行中のトランザクションに関する情報をレプリカサーバーに送信するように強制します。

テーブルの列のサブセットを複製します。

コネクタは、テーブルごとに複製されるデータを構成列のサブセットにフィルターできます。主キー列は除外に関係なく常に含まれます。

列フィルターを適用するには、取り込みパラメーターコンテキストの:ui:`Column Filter JSON`パラメーターを、フィルターしたいテーブルごとに1つ、フィルターオブジェクトのJSON配列に設定します。

列は、名前または正規表現パターンによって含めたり除外したりできます。テーブルごとに単一の条件を適用することも、複数の条件を組み合わせることもできます。その場合、除外は常に包含よりも優先されます。

構文

配列の各オブジェクトはテーブルを識別し、含める列または除外する列を指定します。

[
    {
        "schema": "<schema>" | "schemaPattern": "<regex>",
        "table": "<table>" | "tablePattern": "<regex>",
        "included": ["<column>", "<column>"],
        "excluded": ["<column>", "<column>"],
        "includedPattern": "<regex>",
        "excludedPattern": "<regex>"
    }
]

次のルールが適用されます。

  • 完全な名前の一致には``schema``および``table``を、正規表現の一致には``schemaPattern``および``tablePattern``を使用します。同じオブジェクトで、フィールドとそのパターンバリアントの両方を使用することはできません(たとえば、``schema``と``schemaPattern``の両方を含めることはできません)。

  • includedexcludedincludedPattern、または``excludedPattern``の少なくとも1つを提供する必要があります。

  • 含めるフィルターと除外するフィルターの両方が指定されている場合は、除外が優先されます。

  • 複数のフィルターが同じテーブルに一致する場合、最後に一致するフィルターが使用され、完全一致がパターンベースのフィルターよりも優先されます。

  • 値はオブジェクトの配列にして、異なるテーブルに異なるフィルターを適用することができます。

名前で特定の列を含める:

[
    {
        "schema": "public",
        "table": "orders",
        "included": ["account_id", "status", "created_at"]
    }
]

名前で特定の列を除外する:

[
    {
        "schema": "public",
        "table": "orders",
        "excluded": ["internal_note", "debug_flag"]
    }
]

包含パターンと特定の除外を組み合わせる(たとえば、``admin_email``を除くすべてのメール列を含める):

[
    {
        "schema": "public",
        "table": "contacts",
        "includedPattern": ".*_email",
        "excluded": ["admin_email"]
    }
]

スキーマパターンと正確なテーブル名を組み合わせて、スキーマ全体にフィルターを適用します。

[
    {
        "schemaPattern": "data_.*",
        "table": "customers",
        "excluded": ["internal_note"]
    }
]

異なるテーブルに異なるルールを適用するために複数のフィルターオブジェクトを渡す:

[
    {"schema": "public", "table": "orders", "included": ["account_id", "status"]},
    {"schema": "public", "table": "customers", "excludedPattern": ".*_internal"}
]

パーティショニングされたテーブルを複製する

このコネクタは、バージョン15以降の PostgreSQL サーバーでパーティション化されたテーブルの複製をサポートしています。パーティショニングされた PostgreSQL テーブルは、単一の宛先テーブルとしてSnowflakeに複製されます。

たとえば、パーティショニングされたテーブル orders およびサブパーティションの orders_2023orders_2024 があって、コネクタが orders.* パターンに一致するすべてのテーブルを取り込むように構成されている場合、Snowflakeには orders テーブルのみが複製され、そこにすべてのサブパーティションのデータが含まれることになります。

パーティショニングされたテーブルの複製をサポートするには、確実に PostgreSQL で作成された パブリケーションpublish_via_partition_root オプションが true に設定されているようにしてください。

パーティショニングされたテーブルの取り込みには、現在のところ以下の制限があります。

  • 取り込み開始後に、パーティショニングされたテーブルにパーティションとしてテーブルがアタッチされると、コネクタはアタッチ前にパーティションテーブルに存在していたデータを取得しません。

  • 取り込み開始後に、サブパーティションテーブルがパーティションテーブルから切り離されると、コネクタはルートパーティションテーブルでこのサブパーティションのデータを削除済みとしてマークしません。

  • サブパーティションに対する切り捨て操作は、影響を受けるレコードを削除済みとしてマークしません。

テーブルでデータ変更を追跡する

コネクタはソーステーブルのデータの現在の状態だけでなく、すべての変更セットのすべての行の状態も複製します。このデータは、宛先テーブルと同じスキーマで作成されたジャーナルテーブルに格納されます。

ジャーナルテーブル名の形式は``<source_table_name>_JOURNAL_<timestamp>_<schema_generation>``です。ここで、``<timestamp>``はソーステーブルが複製に追加されたときのエポック秒の値であり、``<schema_generation>``はソーステーブルのスキーマ変更ごとに増加する整数です。その結果、スキーマが変更されるソーステーブルには、複数のジャーナルテーブルがあります。

テーブルが複製から削除され、再度追加されると、``<timestamp>``値が変更され、``<schema_generation>``は``1``から再開します。

重要

Snowflakeは、ジャーナルテーブルの構造を一切変更しないことを推奨します。これらは、コネクタによって複製プロセスの一部として宛先テーブルを更新するために使用されます。

コネクタはジャーナルテーブルをドロップすることはありませんが、複製されたすべてのソーステーブルに最新のジャーナルを使用し、ジャーナル上の追加専用ストリームのみを読み取ります。ストレージを回収するには、以下を実行します。

  • すべてのジャーナルテーブルをいつでも切り捨てます。

  • 複製から削除されたソーステーブルに関連するジャーナルテーブルをドロップします。

  • アクティブに複製されたテーブルの最新の生成ジャーナルテーブルを除いて、すべてをドロップします。

例えば、コネクタがソーステーブル orders をアクティブに複製するように設定されており、以前にテーブル customers を複製から削除した場合、以下のようなジャーナルテーブルが存在する可能性があります。この場合、 orders_5678_2除いて、それらのすべてをドロップできます。

customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2

マージタスクのスケジュールを構成する

コネクタはウェアハウスを使用して、変更データキャプチャ(CDC)データを宛先テーブルにマージします。この操作は、 MergeSnowflakeJournalTable プロセッサーによってトリガーされます。新しい変更がない場合、または MergeSnowflakeJournalTable キューで待機する新しいフローファイルがない場合、マージはトリガーされず、ウェアハウスは自動サスペンドします。

ウェアハウスのコストを制限し、スケジュールされた時間のみにマージを制限するには、Merge task Schedule CRON パラメーターで CRON 式を使用します。MergeSnowflakeJournalTable プロセッサーに送られてくるフローファイルをスロットルし、マージは専用の期間のみにトリガーされます。スケジュールに関する詳細は、 スケジュールストラテジー をご参照ください。

コネクタを停止または削除する

コネクタを停止または削除する場合は、コネクタが使用する 複製スロット を考慮する必要があります。

コネクタは、 snowflake_connector_ で始まる名前にランダムなサフィックスが続く独自の複製スロットを作成します。コネクタは複製ストリームを読み取るとスロットを進め、 PostgreSQL は WAL ログをトリミングしてディスク領域を解放します。

コネクタが一時停止している場合、スロットは進行せず、ソースデータベースへの変更によって WAL ログサイズが増加し続けます。特にトラフィックの多いデータベースでは、コネクタを長期間にわたり一時停止したままにしないでください。

コネクタをOpenflowキャンバスから削除したり、Openflowインスタンス全体を削除するなどの方法で削除しても、複製スロットはそのまま残るため、手動でドロップする必要があります。

同じ PostgreSQL データベースから複製する複数のコネクタインスタンスがある場合、各インスタンスは固有の名前の複製スロットを作成します。複製スロットを手動でドロップする場合は、それが正しいスロットであることを確認してください。CaptureChangePostgreSQL プロセッサーの状態を確認することで、指定されたコネクタインスタンスでどの複製スロットが使用されているかを確認できます。

フローを実行する

  1. プレーンを右クリックし、 Enable all Controller Services を選択します。

  2. インポートしたプロセスグループを右クリックし、 Start を選択します。コネクタがデータの取り込みを開始します。