Openflow Connector for SQL Server の設定

注釈

This connector is subject to the Snowflake Connector Terms.

This topic describes how to set up the Openflow Connector for SQL Server.

増分ロードプロセスについては、 増分複製 をご参照ください。

前提条件

コネクタを設定する前に、以下の前提条件を満たしていることを確認してください。

  1. Openflow Connector for SQL Server について を確認してください。

  2. サポートされている SQL Serverバージョン を確認してください。

  3. ランタイムの展開を設定していることを確認してください。詳細については、次のトピックをご参照ください。

  4. If you use Openflow - Snowflake Deployments, ensure that you have reviewed configuring required domains and have granted access to the required domains for the SQL サーバー connector.

Set up your SQL Server instance

コネクタを設定する前に、SQL Server環境で次のタスクを実行してください。

注釈

これらのタスクはデータベース管理者として実行する必要があります。

  1. Enable change tracking on the databases and tables that you plan to replicate, as shown in the following SQL Server example:

    ALTER DATABASE <database>
      SET CHANGE_TRACKING = ON
      (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON);
    
    ALTER TABLE <schema>.<table>
      ENABLE CHANGE_TRACKING
      WITH (TRACK_COLUMNS_UPDATED = ON);
    
    Copy

    注釈

    複製する予定のすべてのデータベースとテーブルに対してこれらのコマンドを実行します。

    コネクタは、複製を開始する前に、データベースとテーブルで変更追跡を有効にする必要があります。複製を予定しているすべてのテーブルで、変更の追跡が有効になっていることを確認してください。コネクタの実行中に、追加のテーブルの変更追跡を有効にすることもできます。

  2. Create a login for the SQL Serverインスタンスのログインを作成する:

    CREATE LOGIN <user_name> WITH PASSWORD = '<password>';
    
    Copy

    このログインは、複製する予定のデータベースのユーザーを作成するために使用されます。

  3. 各データベースで次のSQL Serverコマンドを実行して、複製するデータベースごとにユーザーを作成します。

    USE <source_database>;
    CREATE USER <user_name> FOR LOGIN <user_name>;
    
    Copy
  4. 複製する各データベースのユーザーにSELECTおよびVIEW CHANGE TRACKING権限を付与します。

    GRANT SELECT ON <database>.<schema>.<table> TO <user_name>;
    GRANT VIEW CHANGE TRACKING ON <database>.<schema>.<table> TO <user_name>;
    
    Copy

    複製する予定のすべてのテーブルに対して、各データベースでこれらのコマンドを実行します。これらの権限は、前のステップで作成した各データベースのユーザーに付与する必要があります。

  5. (Optional) Configure SSL connection.

    If you use an SSL connection to connect SQL Server, create the root certificate for your database server. This is required when configuring the connector.

Snowflake環境のセットアップ

As a Snowflake administrator, perform the following tasks:

  1. 複製されたデータを格納するために、Snowflake内に宛先データベースを作成します。

    CREATE DATABASE <destination_database>;
    
    Copy
  2. Snowflakeの サービスユーザー を作成します。

    CREATE USER <openflow_user>
      TYPE = SERVICE
      COMMENT='Service user for automated access of Openflow';
    
    Copy
  3. コネクタ用Snowflakeロールを作成し、必要な権限を付与します。

    CREATE ROLE <openflow_role>;
    GRANT ROLE <openflow_role> TO USER <openflow_user>;
    GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>;
    GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
    
    Copy

    このロールを使用して、コネクタのSnowflakeデータベースへのアクセスを管理します。

    宛先データベースにオブジェクトを作成するには、アクセス管理に使用するロールに、データベースに対する USAGE権限およびCREATE SCHEMA権限 を付与する必要があります。

  4. コネクタ用にSnowflakeウェアハウスを作成し、必要な権限を付与します。

    CREATE WAREHOUSE <openflow_warehouse> WITH
      WAREHOUSE_SIZE = 'MEDIUM'
      AUTO_SUSPEND = 300
      AUTO_RESUME = TRUE;
    GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
    
    Copy

    Snowflake recommends starting with a MEDIUM warehouse size, then experimenting with size depending on the number of tables being replicated and the amount of data transferred. Large numbers of tables typically scale better with multi-cluster warehouses, rather than a larger warehouse size. For more information, see multi-cluster warehouses.

  5. Set up the public and private keys for key pair authentication:

    1. 安全なキーのペア(パブリックおよび秘密)を作成します。

    2. Store the private key for the user in a file to supply to the connector's configuration.

    3. Snowflakeユーザーに公開キーを割り当てます:

      ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
      
      Copy

      詳細については、 キーペア認証とキーペアローテーション をご参照ください。

コネクタを構成する

As a data engineer, install and configure the connector using the following sections.

コネクタをインストールする

  1. Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.

  2. Openflowのコネクタページでコネクタを探し、 Add to runtime を選択します。

  3. In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.

    注釈

    コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベースとスキーマをSnowflakeで作成したことを確認します。

  4. Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。

  5. Snowflakeアカウント認証情報でランタイムを認証します。

コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。

コネクタを構成する

To configure the connector, perform the following steps:

  1. インポートしたプロセスグループを右クリックし、 Parameters を選択します。

  2. Populate the required parameter values as described in フローパラメーター.

フローパラメーター

Start by setting the parameters of the SQLServer Source Parameters context, then the SQLServer Destination Parameters context. After you complete this, enable the connector. The connector connects to both SQLServer and Snowflake and starts running. However, the connector does not replicate any data until any tables to be replicated are explicitly added to its configuration.

複製する特定のテーブルを構成するには、 SQLServer 取り込みパラメーターコンテキストを編集します。SQLServer 取り込みパラメーターコンテキストに変更を適用すると、コネクタが構成を取得し、各テーブルの複製ライフサイクルが開始します。

SQLServer ソースパラメーターコンテキスト

パラメーター

説明

SQL Server接続 URL

ソースデータベースへの完全な JDBC URL。

例:

  • jdbc:sqlserver://example.com:1433;encrypt=false;

SQL Server JDBC ドライバー

Select the Reference asset checkbox to upload the SQL Server JDBC driver.

SQL Serverユーザー名

The user name for the connector.

SQL Serverパスワード

コネクタのパスワード。

SQLServer 宛先パラメーターコンテキスト

パラメーター

説明

必須

宛先データベース

The database where data is persisted. It must already exist in Snowflake. The name is case-sensitive. For unquoted identifiers, provide the name in uppercase.

有り

Snowflake認証ストラテジー

以下を使用する場合:

  • Snowflake Openflow Deployment または BYOC:SNOWFLAKE_SESSION_TOKEN を使用します。このトークンはSnowflakeによって自動的に管理されます。BYOC デプロイメントでは、SNOWFLAKE_SESSION_TOKENを使用するために、事前に:ref:ランタイムロール <label-deployment_byoc_setup_runtime_role> が構成されている必要があります。

  • BYOC: Alternatively BYOC can use KEY_PAIR as the value for authentication strategy.

有り

Snowflakeアカウント識別子

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR: Snowflake account name formatted as [organization-name]-[account-name] where data is persisted.

有り

Snowflake Connection Strategy

KEY_PAIRを使用する場合は、Snowflakeに接続するための戦略を特定します。

  • STANDARD (デフォルト):Snowflakeサービスへの標準パブリックルーティングを使用して接続します。

  • PRIVATE_CONNECTIVITY:AWS PrivateLinkなど、サポート対象のクラウドプラットフォームに関連付けられたプライベートアドレスを使用して接続します。

KEY_PAIRを使用したBYOCのみに必要です。それ以外の場合は無視されます。

Snowflake Object Identifier Resolution

スキーマ名、テーブル名、列名などのソースオブジェクト識別子をSnowflakeに格納し、クエリする方法を指定します。この設定は、SQLクエリで二重引用符を使用する必要があるかどうかを指定します。

オプション1:デフォルトで、大文字と小文字を区別しない(推奨)。

  • 変換:識別子はすべて大文字に変換されます。たとえば、 My_TableMY_TABLE になります。

  • クエリ: SQL クエリは大文字と小文字を区別せず、 SQL 二重引用符は必要ありません。

    たとえば、 SELECT * FROM my_table;SELECT * FROM MY_TABLE; と同じ結果を返します。

注釈

Snowflakeでは、データベースオブジェクトに大文字と小文字が混在する名前が想定されない場合、このオプションを使用することを推奨しています。

重要

コネクタの取り込み開始後は、この設定を変更しないでください。取り込み開始後にこの設定を変更すると、既存の取り込みは中断されます。この設定を変更する必要がある場合は、新しいコネクタインスタンスを作成してください。

オプション2: 大文字と小文字を区別する。

  • 変換:大文字小文字は維持されます。たとえば、 My_TableMy_Table のままです。

  • クエリ: SQL クエリは、データベースオブジェクトの大文字と小文字を正確に一致させるために、二重引用符を使用する必要があります。例: SELECT * FROM "My_Table";

注釈

Snowflakeでは、レガシーまたは互換性の理由からソースの大文字小文字の区別を保持する必要がある場合、このオプションを使用することを推奨しています。たとえば、 MY_TABLEmy_table のように大文字と小文字だけが異なるテーブル名がソースデータベースに含まれている場合、大文字と小文字を区別しない比較を使用すると、名前の衝突が発生します。

有り

Snowflake秘密キー

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:認証に使用される RSA プライベートキーである必要があります。

    The RSA key must be formatted according to PKCS8 standards and have standard PEM headers and footers. Note that either a Snowflake Private Key File or a Snowflake Private Key must be defined.

無し

Snowflake秘密キーファイル

以下を使用する場合:

  • Session token authentication strategy: The private key file must be blank.

  • KEY_PAIR:Snowflakeへの認証に使用される RSA プライベートキーを含むファイルをアップロードします。これは、PKCS8 標準に従ってフォーマットされ、標準の PEM ヘッダーとフッターを含んでいる必要があります。ヘッダー行は で始まります -----BEGIN PRIVATE.プライベートキーファイルをアップロードするには、Reference asset チェックボックスを選択します。

無し

Snowflake秘密キーパスワード

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:Snowflakeプライベートキーファイルに関連付けられたパスワードを提供します。

無し

Snowflakeロール

以下を使用する場合:

  • Session Token Authentication Strategy:ランタイムロールを使用します。ランタイムの View Details に移動すると、Openflow UI でランタイムロールを見つけることができます。

  • KEY_PAIR Authentication Strategy:サービスユーザーのために構成された有効なロールを使用します。

有り

Snowflakeのユーザー名

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:Snowflakeインスタンスへの接続に使用するユーザー名を提供します。

有り

Snowflakeウェアハウス

クエリの実行に使用されるSnowflakeウェアハウス。

有り

SQLServer 取り込みパラメーターコンテキスト

パラメーター

説明

含まれるテーブル名

A comma-separated list of source table paths, including their databases and schemas, for example:

database_1.public.table_1, database_2.schema_2.table_2

含まれるテーブル正規表現

A regular expression to match against table paths, including database and schema names. Every path matching the expression is replicated, and new tables matching the pattern that are created later are also included automatically, for example:

database_name\.public\.auto_.*

フィルター JSON

A JSON containing a list of fully-qualified table names and a regex pattern for column names that should be included into replication.

次の例では、 my_db データベースのパブリックスキーマの table1 にある name で終わるすべての列が含まれます。

[ {"database":"my_db", "schema":"public", "table":"table1", "includedPattern":".*name"} ]

タスクスケジュール CRON をマージする

ジャーナルから宛先テーブルへのマージ操作がトリガーされる期間を定義する CRON 式。連続的なマージやタイムスケジュールでウェアハウスの実行時間を制限したい場合は、 * * * * * ? に設定します。

例:

  • * 0 * * * ? という文字列は、丸1時間で1分間のマージをスケジュールしたいことを示しています。

  • * 20 14 ? * MON-FRI という文字列は、毎週月曜日から金曜日の2:20 PM にマージをスケジュールしたいことを示しています。

その他の情報と例については、 Quartzドキュメント のcronトリガーチュートリアルをご参照ください。

テーブルを削除し、複製に再追加する

To remove a table from replication, remove it from the Included Table Names or Included Table Regex parameters in the Replication Parameters context.

To re-add the table to replication later, first delete the corresponding destination table in Snowflake. Afterward, add the table back to the Included Table Names or Included Table Regex parameters. This ensures that the replication process starts fresh for the table.

このアプローチは、失敗したテーブル複製シナリオからの復旧にも使用できます。

テーブルの列のサブセットを複製します。

The connector filters the data replicated per table to a subset of configured columns.

列にフィルターを適用するには、複製パラメーターコンテキストのColumn Filterプロパティを変更し、フィルターを適用したいテーブルごとに1エントリずつ、構成の配列を追加します。

Include or exclude columns by name or pattern. You can apply a single condition per table, or combine multiple conditions, with exclusions always taking precedence over inclusions.

以下の例は、利用可能なフィールドを示しています。schema および table フィールドは必須です。includedexcludedincludedPatternexcludedPattern のうち1つ以上が必要です。

[
    {
        "schema": "<source table schema>",
        "table" : "<source table name>",
        "included": ["<column name>", "<column name>"],
        "excluded": ["<column name>", "<column name>"],
        "includedPattern": "<regular expression>",
        "excludedPattern": "<regular expression>",
    }
]
Copy

テーブルでデータ変更を追跡する

The connector replicates the current state of data from the source tables, as well as every state of every row from every changeset. This data is stored in journal tables created in the same schema as the destination table.

ジャーナルテーブル名の形式は次のとおりです。<source table name>_JOURNAL_<timestamp><schema generation> where <timestamp> is the value of epoch seconds when the source table was added to replication, and <schema generation> は整数で、ソーステーブルのスキーマが変更されるごとに増加します。その結果、スキーマが変更されるソーステーブルには、複数のジャーナルテーブルがあります。

When you remove a table from replication, then add it back, the <timestamp> value changes, and <schema generation> starts again from 1.

重要

Snowflake recommends not altering the structure of journal tables in any way. The connector uses them to update the destination table as part of the replication process.

The connector never drops journal tables, but uses the latest journal for every replicated source table, only reading append-only streams on top of journals. To reclaim the storage, you can:

  • すべてのジャーナルテーブルをいつでも切り捨てます。

  • 複製から削除されたソーステーブルに関連するジャーナルテーブルをドロップします。

  • アクティブに複製されたテーブルの最新の生成ジャーナルテーブルを除いて、すべてをドロップします。

例えば、コネクタがソーステーブル orders をアクティブに複製するように設定されており、以前にテーブル customers を複製から削除した場合、以下のようなジャーナルテーブルが存在する可能性があります。この場合、 orders_5678_2除いて、それらのすべてをドロップできます。

customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2

マージタスクのスケジュールを構成する

コネクタはウェアハウスを使用して、変更データキャプチャ(CDC)データを宛先テーブルにマージします。この操作は、 MergeSnowflakeJournalTable プロセッサーによってトリガーされます。新しい変更がない場合、または MergeSnowflakeJournalTable キューで待機する新しいフローファイルがない場合、マージはトリガーされず、ウェアハウスは自動サスペンドします。

Use the CRON expression in the Merge task Schedule CRON parameter to limit the warehouse cost and limit merges to only scheduled time. It throttles the flow files coming to the MergeSnowflakeJournalTable processor and merges are triggered only in a dedicated period of time. For more information about scheduling, see Scheduling strategy.

フローを実行する

  1. プレーンを右クリックし、 Enable all Controller Services を選択します。

  2. インポートしたプロセスグループを右クリックし、 Start を選択します。コネクタがデータの取り込みを開始します。