Openflow Connector for SQL Server の設定

注釈

このコネクタは、 Snowflakeコネクタ規約 に従うものとします。

このトピックでは、|Sqlserver|を設定する方法について説明します。

増分ロードプロセスについては、 増分複製 をご参照ください。

前提条件

コネクタを設定する前に、以下の前提条件を満たしていることを確認してください。

  1. Openflow Connector for SQL Server について を確認してください。

  2. サポートされている SQL Serverバージョン を確認してください。

  3. ランタイムの展開を設定していることを確認してください。詳細については、次のトピックをご参照ください。

  4. |OFSFSPCS-plural|を使用する場合、 必要なドメインの構成 を精査し、 SQL サーバー コネクタに必要なドメインへのアクセス許可を付与していることを確認してください。

SQL Serverインスタンスの設定

コネクタを設定する前に、SQL Server環境で次のタスクを実行してください。

注釈

これらのタスクはデータベース管理者として実行する必要があります。

  1. 次のSQL Server の例に示すように、複製する予定の データベーステーブル で変更の追跡を有効にします。

    ALTER DATABASE <database>
      SET CHANGE_TRACKING = ON
      (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON);
    
    ALTER TABLE <schema>.<table>
      ENABLE CHANGE_TRACKING;
    

    注釈

    複製する予定のすべてのデータベースとテーブルに対してこれらのコマンドを実行します。

    コネクタは、複製を開始する前に、データベースとテーブルで変更追跡を有効にする必要があります。複製を予定しているすべてのテーブルで、変更の追跡が有効になっていることを確認してください。コネクタの実行中に、追加のテーブルの変更追跡を有効にすることもできます。

  2. Create a login for the SQL Serverインスタンスのログインを作成する:

    CREATE LOGIN <user_name> WITH PASSWORD = '<password>';
    

    このログインは、複製する予定のデータベースのユーザーを作成するために使用されます。

  3. 各データベースで次のSQL Serverコマンドを実行して、複製するデータベースごとにユーザーを作成します。

    USE <source_database>;
    CREATE USER <user_name> FOR LOGIN <user_name>;
    
  4. 複製する各データベースのユーザーにSELECTおよびVIEW CHANGE TRACKING権限を付与します。

    GRANT SELECT ON <database>.<schema>.<table> TO <user_name>;
    GRANT VIEW CHANGE TRACKING ON <database>.<schema>.<table> TO <user_name>;
    

    複製する予定のすべてのテーブルに対して、各データベースでこれらのコマンドを実行します。これらの権限は、前のステップで作成した各データベースのユーザーに付与する必要があります。

  5. (オプション)ユーザー定義データ型(UDDT)に対する VIEW DEFINITION 権限を付与します。

    テーブルにユーザー定義データ型(UDDT)を使用する列が含まれていて、UDDT がコネクタユーザーとは異なるユーザーによって所有されている場合は、次のSQL Serverの例に示すように、コネクタユーザーに VIEW DEFINITION 権限を付与する必要があります。

    GRANT VIEW DEFINITION TO <user_name>;
    

    この権限がない場合、UDDT を使用する列は自動的に複製から除外されます。

  6. (オプション)SSL接続を構成します。

    SSL接続を使用してSQL Server に接続する場合は、データベースサーバーのルート証明書を作成します。これはコネクタを構成するときに必要です。

Snowflake環境のセットアップ

Snowflake管理者は以下のタスクを実行します。

  1. 複製されたデータを格納するために、Snowflake内に宛先データベースを作成します。

    CREATE DATABASE <destination_database>;
    
  2. Snowflakeの サービスユーザー を作成します。

    CREATE USER <openflow_user>
      TYPE = SERVICE
      COMMENT='Service user for automated access of Openflow';
    
  3. コネクタ用Snowflakeロールを作成し、必要な権限を付与します。

    CREATE ROLE <openflow_role>;
    GRANT ROLE <openflow_role> TO USER <openflow_user>;
    GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>;
    GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
    

    このロールを使用して、コネクタのSnowflakeデータベースへのアクセスを管理します。

    宛先データベースにオブジェクトを作成するには、アクセス管理に使用するロールに、データベースに対する USAGE権限およびCREATE SCHEMA権限 を付与する必要があります。

  4. コネクタ用にSnowflakeウェアハウスを作成し、必要な権限を付与します。

    CREATE WAREHOUSE <openflow_warehouse> WITH
      WAREHOUSE_SIZE = 'XSMALL'
      AUTO_SUSPEND = 300
      AUTO_RESUME = TRUE;
    GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
    

    Snowflakeは、XSMALLウェアハウスサイズから始めて、複製されるテーブルの数と転送されるデータの量に応じてサイズ変更を試してみることを推奨しています。多数のテーブルを扱う場合、通常はウェアハウスサイズを大きくするよりも、マルチクラスターのウェアハウスを使用した方が適切にスケールできます。詳細については、 マルチクラスターウェアハウス をご参照ください。

  5. キーペア認証用のパブリックキーとプライベートキーを設定します。

    1. 安全なキーのペア(パブリックおよび秘密)を作成します。

    2. ユーザーの秘密キーをファイルに格納して、コネクタの構成に提供します。

    3. Snowflakeユーザーに公開キーを割り当てます:

      ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
      

      詳細については、 キーペア認証とキーペアローテーション をご参照ください。

コネクタをインストールする

コネクタをインストールするには、データエンジニアとして次を実行します。

  1. Openflow概要ページに移動します。Featured connectors セクションで、 View more connectors を選択します。

  2. Openflowのコネクタページでコネクタを探し、 Add to runtime を選択します。

  3. Select runtime ダイアログで、Available runtimes ドロップダウンリストからランタイムを選択して Add をクリックします。

    注釈

    コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベースとスキーマをSnowflakeで作成したことを確認します。

  4. Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。

  5. Snowflakeアカウント認証情報でランタイムを認証します。

コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。

コネクタを構成する

コネクタを構成するには、データエンジニアとして次の手順を実行します。

  1. インポートしたプロセスグループを右クリックし、 Parameters を選択します。

  2. 必要なパラメーター値を入力します。

    必要なパラメーター値について詳しくは、次のセクションを参照してください。

まず、SQLServerのソースパラメーターコンテキストのパラメーターを設定し、次にSQLServerの宛先パラメーターコンテキストのパラメーターを設定します。これを完了したら、コネクタを有効にします。コネクタはSQLServerとSnowflakeの両方に接続し、実行を開始します。ただし、複製するテーブルが構成に明示的に追加されるまで、コネクタはデータを複製しません。

複製する特定のテーブルを構成するには、 SQLServer 取り込みパラメーターコンテキストを編集します。SQLServer 取り込みパラメーターコンテキストに変更を適用すると、コネクタが構成を取得し、各テーブルの複製ライフサイクルが開始します。

SQLServerソースパラメーター

パラメーター

説明

SQLServer 接続 URL

ソースデータベースへの完全な JDBC URL。

例:

  • jdbc:sqlserver://example.com:1433;encrypt=false;

SQLServer JDBC ドライバー

Reference asset チェックボックスを選択し、 SQL サーバー JDBC ドライバー をアップロードします。

SQLServer ユーザー名

コネクタのユーザー名。

SQLServer パスワード

コネクタのパスワード。

SQLServer宛先パラメーター

パラメーター

説明

必須

宛先データベース

データが永続化されるデータベース。Snowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。

有り

宛先スキーマパターン

データが永続化される宛先スキーマの名前のパターン。コネクタはスキーマが存在しない場合にスキーマを作成します。

これらのオプション変数を使用して、取り込まれたテーブルごとにパターンをカスタマイズできます。

  • ${source.database.name}: ソーステーブルのデータベース。

  • ${source.schema.name}: ソーステーブルのスキーマ。

  • ${source.table.name}: ソーステーブルの名前。

たとえば、修飾名 source_db.tenant_a.data を持つテーブルの場合、パターン prefix_${source.database.name}_${source.schema.name}prefix_source_db_tenant_a に評価されます。

すべてのテーブルを1つのスキーマに取り込むには、destination_schema のように変数を設定せずにスキーマ名を指定します。

重要

コネクタがデータの取り込みを開始した後は、この設定を変更しないでください。取り込み開始後にこの設定を変更すると、既存の取り込みは中断されます。この設定を変更する必要がある場合は、新しいコネクタインスタンスを作成してください。

有り

Snowflake認証ストラテジー

以下を使用する場合:

  • Snowflake Openflow Deployment または BYOC:SNOWFLAKE_MANAGED_TOKEN を使用します。このトークンはSnowflakeによって自動的に管理されます。 BYOC デプロイメントでは、 SNOWFLAKE_MANAGED_TOKEN を使用するために、事前に ランタイムロール が構成されている必要があります。

  • BYOC: 代わりに、BYOC では認証戦略の値として KEY_PAIR を使用できます。

有り

Snowflakeアカウント識別子

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR: データが永続化される[organization-name]-[account-name]形式のSnowflakeアカウント名。

有り

Snowflake接続の戦略

KEY_PAIRを使用する場合は、Snowflakeに接続するための戦略を特定します。

  • STANDARD (デフォルト):Snowflakeサービスへの標準パブリックルーティングを使用して接続します。

  • PRIVATE_CONNECTIVITY:AWS PrivateLinkなど、サポート対象のクラウドプラットフォームに関連付けられたプライベートアドレスを使用して接続します。

KEY_PAIRを使用したBYOCのみに必要です。それ以外の場合は無視されます。

Snowflakeオブジェクト識別子の解決

スキーマ名、テーブル名、列名などのソースオブジェクト識別子をSnowflakeに格納し、クエリする方法を指定します。この設定は、SQLクエリで二重引用符を使用する必要があるかどうかを指定します。

オプション1:デフォルトで、大文字と小文字を区別しない(推奨)。

  • 変換:識別子はすべて大文字に変換されます。たとえば、 My_TableMY_TABLE になります。

  • クエリ: SQL クエリは大文字と小文字を区別せず、 SQL 二重引用符は必要ありません。

    たとえば、 SELECT * FROM my_table;SELECT * FROM MY_TABLE; と同じ結果を返します。

注釈

Snowflakeでは、データベースオブジェクトに大文字と小文字が混在する名前が想定されない場合、このオプションを使用することを推奨しています。

重要

コネクタの取り込み開始後は、この設定を変更しないでください。取り込み開始後にこの設定を変更すると、既存の取り込みは中断されます。この設定を変更する必要がある場合は、新しいコネクタインスタンスを作成してください。

オプション2: 大文字と小文字を区別する。

  • 変換:大文字小文字は維持されます。たとえば、 My_TableMy_Table のままです。

  • クエリ: SQL クエリは、データベースオブジェクトの大文字と小文字を正確に一致させるために、二重引用符を使用する必要があります。例: SELECT * FROM "My_Table";

注釈

Snowflakeでは、レガシーまたは互換性の理由からソースの大文字小文字の区別を保持する必要がある場合、このオプションを使用することを推奨しています。たとえば、 MY_TABLEmy_table のように大文字と小文字だけが異なるテーブル名がソースデータベースに含まれている場合、大文字と小文字を区別しない比較を使用すると、名前の衝突が発生します。

有り

Snowflake秘密キー

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:認証に使用される RSA プライベートキーである必要があります。

    その RSA キーは PKCS8 標準に従ってフォーマットされ、標準の PEM ヘッダーとフッターを持つ必要があります。SnowflakeプライベートキーファイルまたはSnowflakeプライベートキーのいずれかを定義する必要があることに注意してください。

無し

Snowflake秘密キーファイル

以下を使用する場合:

  • Session token authentication strategy:プライベートキーファイルは空白である必要があります。

  • KEY_PAIR:Snowflakeへの認証に使用される RSA プライベートキーを含むファイルをアップロードします。これは、PKCS8 標準に従ってフォーマットされ、標準の PEM ヘッダーとフッターを含んでいる必要があります。ヘッダー行は で始まります -----BEGIN PRIVATE.プライベートキーファイルをアップロードするには、Reference asset チェックボックスを選択します。

無し

Snowflake秘密キーパスワード

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:Snowflakeプライベートキーファイルに関連付けられたパスワードを提供します。

無し

Snowflakeロール

以下を使用する場合:

  • Session Token Authentication Strategy :ランタイムに割り当てられたSnowflakeロール、またはこのSnowflakeロールに付与された子ロールを使用します。Openflow UI のランタイムのSnowflakeロールは、ランタイムの More Options [⋮] ボタンを展開して Set Snowflake role を選択することで見つけることができます。

  • KEY_PAIR Authentication Strategy:サービスユーザーのために構成された有効なロールを使用します。

有り

Snowflakeのユーザー名

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:Snowflakeインスタンスへの接続に使用するユーザー名を提供します。

有り

オーバーサイズ値戦略

複製中にコネクタが内部サイズ制限(16MB)を超える値を処理する方法を決定します。可能な値は次のとおりです。

  • **失敗テーブル**(デフォルト):テーブルは永久に失敗したものとしてマークされ、そのテーブルの複製が停止します。

  • Nullの設定:値は宛先テーブルで``NULL``に置き換えられます。オーバーサイズ値を超えるテーブル内のデータが失われても許容できる場合、これを使用してテーブルの失敗を防ぎます。

無し

Snowflakeウェアハウス

クエリの実行に使用されるSnowflakeウェアハウス。

有り

SQLServer取り込みパラメーター

パラメーター

説明

含まれるテーブル名

データベースとスキーマを含むソーステーブルパスのコンマ区切りリスト。例:

database_1.public.table_1, database_2.schema_2.table_2

含まれるテーブル正規表現

データベース名とスキーマ名を含むテーブルパスに一致する正規表現。式に一致するすべてのパスが複製され、後で作成されるパターンに一致する新しいテーブルも自動的に含められます。例:

database_name\.public\.auto_.*

列フィルター JSON

オプション。テーブルごとに含める列または除外する列を指定するフィルターオブジェクトのJSON配列。構文の詳細と例については、`テーブル内の列のサブセットを複製する方法`_を参照してください。

タスクスケジュール CRON をマージする

ジャーナルから宛先テーブルへのマージ操作がトリガーされる期間を定義する CRON 式。連続的なマージやタイムスケジュールでウェアハウスの実行時間を制限したい場合は、 * * * * * ? に設定します。

例:

  • * 0 * * * ? という文字列は、丸1時間で1分間のマージをスケジュールしたいことを示しています。

  • * 20 14 ? * MON-FRI という文字列は、毎週月曜日から金曜日の2:20 PM にマージをスケジュールしたいことを示しています。

その他の情報と例については、 Quartzドキュメント のcronトリガーチュートリアルをご参照ください。

SQL Serverのレプリカサーバーからテーブルを複製する

このコネクタは、トランザクションの複製<https://learn.microsoft.com/en-us/sql/relational-databases/replication/transactional/transactional-replication>`_を使用して、プライマリサーバーまたはサブスクライバーサーバーからデータを取り込むことができます。SQL Serverレプリカに接続するコネクタを構成する前に、プライマリノードとレプリカノード間の複製が正しく機能することを確認します。トランザクションの複製の設定手順については、`チュートリアル: トランザクションの複製を構成する をご参照ください。コネクタでのデータの欠落に関する問題を調査する場合は、まずコネクタが使用するレプリカサーバーに欠落している行と変更追跡イベントが存在することを確認します。

注釈

レプリカサーバーを使用する場合、コネクタの設定は標準のプライマリサーバーの構成とは異なります。接続ユーザーと変更追跡は、プライマリサーバーで構成する必要はありません。代わりに、接続ユーザーがレプリカサーバーで使用可能であり、そこにあるデータと変更追跡テーブルにアクセスできることを確認してください。

パブリッシャーではなくサブスクライバーサーバーから読み取るようにコネクタを構成するには、:ui:`SQLServer Connection URL`パラメーターにサブスクライバーサーバーURLを指定します。

警告

複製が開始された後は、データベースサーバーを変更しないでください。各データベースは独自の変更追跡状態を独立して維持するため、別のサーバーに切り替えると、コネクタはどの変更が既に処理されているかを知ることができなくなり、データが失われる可能性があります。

テーブル複製の再開

(主キーが欠落している、またはスキーマの変更がサポートされていないなどの理由で) FAILED 状態のテーブルは、自動的に再開されません。テーブルが FAILED 状態になった場合、または複製を最初から再開する必要がある場合は、次の手順に従ってテーブルを削除し、複製に再度追加します。

注釈

主キーが欠落しているなど、ソーステーブルの問題が原因で障害が発生した場合は、続行する前にソースデータベースでその問題を解決します。

  1. 次のいずれかの方法を使用して、複製対象からテーブルを除外します。

    • テーブルを Re-snapshot Table Exclusions パラメーターに追加して、一時的に複製対象から除外します。これは、変更することを回避する必要がある Included Table Regex によってテーブルが一致する場合に簡便な方法です。

    • 取り込みパラメーターのコンテキストで、Included Table Names からテーブルを削除するか、Included Table Regex を変更してテーブルが一致しないようにします。

  2. テーブルが削除されたことを確認します。

    1. Openflowランタイムキャンバスで、プロセッサーグループを右クリックし、Controller Services を選択します。

    2. コントローラーサービスをリストしたテーブルで、Table State Store 行を見つけ、行の右側にある縦3つのドットをクリックして、View State を選択します。

    重要

    続行する前に、テーブルの状態がこのリストから完全に削除されるまで待つ必要があります。この構成変更が完了するまで続行しないでください。

  3. 宛先をクリーンアップする:テーブルの状態が完全に削除されたと表示されたら、Snowflake で宛先テーブルを手動で DROP します。スナップショットフェーズ中に、コネクタは既存の宛先テーブルを上書きしないことに注意してください。テーブルがまだ存在する場合、複製は再度失敗します。オプションで、ジャーナルテーブルとストリームが不要になった場合は削除することもできます。

  4. 最初のステップで行った変更を元に戻して、テーブルを再度追加します。すなわち、Re-snapshot Table Exclusions からテーブルを削除するか、Included Table Names または Included Table Regex に再度追加します。その後、コネクタはテーブルのスナップショットを再度取得します。

  5. 再開を確認する:前述の指示に従って Table State Store をチェックします。テーブルの状態は、ステータス NEW で表示され、次に SNAPSHOT_REPLICATION に移行し、最後に INCREMENTAL_REPLICATION になります。

テーブルの列のサブセットを複製します。

コネクタは、テーブルごとに複製されるデータを構成列のサブセットにフィルターできます。主キー列は除外に関係なく常に含まれます。

列フィルターを適用するには、取り込みパラメーターコンテキストの:ui:`Column Filter JSON`パラメーターを、フィルターしたいテーブルごとに1つ、フィルターオブジェクトのJSON配列に設定します。

列は、名前または正規表現パターンによって含めたり除外したりできます。テーブルごとに単一の条件を適用することも、複数の条件を組み合わせることもできます。その場合、除外は常に包含よりも優先されます。

構文

配列の各オブジェクトはテーブルを識別し、含める列または除外する列を指定します。このコネクタは3部構成の完全修飾名(データベース、スキーマ、テーブル)を使用するため、各オブジェクトには、スキーマおよびテーブルフィールドに加えて``database``または``databasePattern``フィールドを含めることができます。

[
    {
        "database": "<database>" | "databasePattern": "<regex>",
        "schema": "<schema>" | "schemaPattern": "<regex>",
        "table": "<table>" | "tablePattern": "<regex>",
        "included": ["<column>", "<column>"],
        "excluded": ["<column>", "<column>"],
        "includedPattern": "<regex>",
        "excludedPattern": "<regex>"
    }
]

次のルールが適用されます。

  • 完全な名前の一致には``database``、schema、および``table``を使用し、正規表現の一致には``databasePattern``、schemaPattern、および``tablePattern``を使用します。同じオブジェクトで、フィールドとそのパターンバリアントの両方を使用することはできません(たとえば、``schema``と``schemaPattern``の両方を含めることはできません)。

  • includedexcludedincludedPattern、または``excludedPattern``の少なくとも1つを提供する必要があります。

  • 含めるフィルターと除外するフィルターの両方が指定されている場合は、除外が優先されます。

  • 複数のフィルターが同じテーブルに一致する場合、最後に一致するフィルターが使用され、完全一致がパターンベースのフィルターよりも優先されます。

  • 値はオブジェクトの配列にして、異なるテーブルに異なるフィルターを適用することができます。

名前で特定の列を含める:

[
    {
        "database": "my_db",
        "schema": "dbo",
        "table": "orders",
        "included": ["account_id", "status", "created_at"]
    }
]

名前で特定の列を除外する:

[
    {
        "database": "my_db",
        "schema": "dbo",
        "table": "orders",
        "excluded": ["internal_note", "debug_flag"]
    }
]

包含パターンと特定の除外を組み合わせる(たとえば、``admin_email``を除くすべてのメール列を含める):

[
    {
        "database": "my_db",
        "schema": "dbo",
        "table": "contacts",
        "includedPattern": ".*_email",
        "excluded": ["admin_email"]
    }
]

データベースパターンと正確なスキーマおよびテーブル名を組み合わせて、データベース全体にフィルターを適用する:

[
    {
        "databasePattern": "prod_.*",
        "schema": "dbo",
        "table": "customers",
        "excluded": ["internal_note"]
    }
]

異なるテーブルに異なるルールを適用するために複数のフィルターオブジェクトを渡す:

[
    {"database": "my_db", "schema": "dbo", "table": "orders", "included": ["account_id", "status"]},
    {"database": "my_db", "schema": "dbo", "table": "customers", "excludedPattern": ".*_internal"}
]

パーティショニングされたテーブルを複製する

このコネクタは、パーティション化されたテーブルの複製をサポートしています。SQLサーバーのパーティション化されたテーブルは、すべてのパーティションからのデータを含む単一の宛先テーブルとしてSnowflakeに複製されます。

パーティション化されたテーブルを複製するには、:ref:`label-sql_server_connector_setup_instance`で説明されているように、パーティション化されたテーブルで変更追跡が有効になっていることを確認します。

テーブルでデータ変更を追跡する

このコネクタは、ソーステーブルのデータの現在の状態と、各ポーリング間隔から検出された変更を複製します。このデータは、宛先テーブルと同じスキーマで作成されたジャーナルテーブルに格納されます。

注釈

このコネクタはSQLサーバーの変更追跡を使用するため、ポーリングとポーリングの間に同一行に対する更新が複数回行われた場合は、単一の変更にロールアップされます。ジャーナルテーブルには、すべての中間状態ではなく、変更の最終的な結果が反映されます。詳細については、 Openflow Connector for SQL Server について をご参照ください。

ジャーナルテーブル名の形式は``<source_table_name>_JOURNAL_<timestamp>_<schema_generation>``です。ここで、``<timestamp>``はソーステーブルが複製に追加されたときのエポック秒の値であり、``<schema_generation>``はソーステーブルのスキーマ変更ごとに増加する整数です。その結果、スキーマが変更されるソーステーブルには、複数のジャーナルテーブルがあります。

複製からテーブルを削除してから再度追加すると、``<timestamp>``値が変更され、``<schema_generation>``は``1``から再度開始されます。

重要

Snowflakeは、ジャーナルテーブルの構造を一切変更しないことを推奨します。コネクタはそれらを使用して、複製プロセスの一部として宛先テーブルを更新します。

コネクタはジャーナルテーブルをドロップすることはありませんが、複製されたソーステーブルごとに最新のジャーナルを使用し、ジャーナル上の追加専用ストリームのみを読み取ります。ストレージを回収するには、以下を実行します。

  • すべてのジャーナルテーブルをいつでも切り捨てます。

  • 複製から削除されたソーステーブルに関連するジャーナルテーブルをドロップします。

  • アクティブに複製されたテーブルの最新の生成ジャーナルテーブルを除いて、すべてをドロップします。

例えば、コネクタがソーステーブル orders をアクティブに複製するように設定されており、以前にテーブル customers を複製から削除した場合、以下のようなジャーナルテーブルが存在する可能性があります。この場合、 orders_5678_2除いて、それらのすべてをドロップできます。

customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2

マージタスクのスケジュールを構成する

コネクタはウェアハウスを使用して、変更データキャプチャ(CDC)データを宛先テーブルにマージします。この操作は、 MergeSnowflakeJournalTable プロセッサーによってトリガーされます。新しい変更がない場合、または MergeSnowflakeJournalTable キューで待機する新しいフローファイルがない場合、マージはトリガーされず、ウェアハウスは自動サスペンドします。

マージタスクのスケジュールCRONパラメーターでCRON式を使用して、ウェアハウスコストを制限し、マージをスケジュールされた時間のみに制限します。MergeSnowflakeJournalTableプロセッサーに送信されるフローファイルを制限し、マージは指定された期間にのみトリガーされます。スケジュールに関する詳細は、 スケジュールストラテジー をご参照ください。

フローを実行する

  1. プレーンを右クリックし、 Enable all Controller Services を選択します。

  2. インポートしたプロセスグループを右クリックし、 Start を選択します。コネクタがデータの取り込みを開始します。