Openflow Connector for SQL Server の設定¶
注釈
This connector is subject to the Snowflake Connector Terms.
This topic describes how to set up the Openflow Connector for SQL Server.
増分ロードプロセスについては、 増分複製 をご参照ください。
前提条件¶
コネクタを設定する前に、以下の前提条件を満たしていることを確認してください。
Openflow Connector for SQL Server について を確認してください。
サポートされている SQL Serverバージョン を確認してください。
ランタイムの展開を設定していることを確認してください。詳細については、次のトピックをご参照ください。
If you use Openflow - Snowflake Deployments, ensure that you have reviewed configuring required domains and have granted access to the required domains for the SQL サーバー connector.
Set up your SQL Server instance¶
コネクタを設定する前に、SQL Server環境で次のタスクを実行してください。
注釈
これらのタスクはデータベース管理者として実行する必要があります。
Enable change tracking on the databases and tables that you plan to replicate, as shown in the following SQL Server example:
ALTER DATABASE <database> SET CHANGE_TRACKING = ON (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON); ALTER TABLE <schema>.<table> ENABLE CHANGE_TRACKING WITH (TRACK_COLUMNS_UPDATED = ON);
注釈
複製する予定のすべてのデータベースとテーブルに対してこれらのコマンドを実行します。
コネクタは、複製を開始する前に、データベースとテーブルで変更追跡を有効にする必要があります。複製を予定しているすべてのテーブルで、変更の追跡が有効になっていることを確認してください。コネクタの実行中に、追加のテーブルの変更追跡を有効にすることもできます。
Create a login for the SQL Serverインスタンスのログインを作成する:
CREATE LOGIN <user_name> WITH PASSWORD = '<password>';
このログインは、複製する予定のデータベースのユーザーを作成するために使用されます。
各データベースで次のSQL Serverコマンドを実行して、複製するデータベースごとにユーザーを作成します。
USE <source_database>; CREATE USER <user_name> FOR LOGIN <user_name>;
複製する各データベースのユーザーにSELECTおよびVIEW CHANGE TRACKING権限を付与します。
GRANT SELECT ON <database>.<schema>.<table> TO <user_name>; GRANT VIEW CHANGE TRACKING ON <database>.<schema>.<table> TO <user_name>;
複製する予定のすべてのテーブルに対して、各データベースでこれらのコマンドを実行します。これらの権限は、前のステップで作成した各データベースのユーザーに付与する必要があります。
(Optional) Configure SSL connection.
If you use an SSL connection to connect SQL Server, create the root certificate for your database server. This is required when configuring the connector.
Snowflake環境のセットアップ¶
As a Snowflake administrator, perform the following tasks:
複製されたデータを格納するために、Snowflake内に宛先データベースを作成します。
CREATE DATABASE <destination_database>;
Snowflakeの サービスユーザー を作成します。
CREATE USER <openflow_user> TYPE = SERVICE COMMENT='Service user for automated access of Openflow';
コネクタ用Snowflakeロールを作成し、必要な権限を付与します。
CREATE ROLE <openflow_role>; GRANT ROLE <openflow_role> TO USER <openflow_user>; GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>; GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
このロールを使用して、コネクタのSnowflakeデータベースへのアクセスを管理します。
宛先データベースにオブジェクトを作成するには、アクセス管理に使用するロールに、データベースに対する USAGE権限およびCREATE SCHEMA権限 を付与する必要があります。
コネクタ用にSnowflakeウェアハウスを作成し、必要な権限を付与します。
CREATE WAREHOUSE <openflow_warehouse> WITH WAREHOUSE_SIZE = 'MEDIUM' AUTO_SUSPEND = 300 AUTO_RESUME = TRUE; GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
Snowflake recommends starting with a MEDIUM warehouse size, then experimenting with size depending on the number of tables being replicated and the amount of data transferred. Large numbers of tables typically scale better with multi-cluster warehouses, rather than a larger warehouse size. For more information, see multi-cluster warehouses.
Set up the public and private keys for key pair authentication:
安全なキーのペア(パブリックおよび秘密)を作成します。
Store the private key for the user in a file to supply to the connector's configuration.
Snowflakeユーザーに公開キーを割り当てます:
ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
詳細については、 キーペア認証とキーペアローテーション をご参照ください。
コネクタを構成する¶
As a data engineer, install and configure the connector using the following sections.
コネクタをインストールする¶
Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.
Openflowのコネクタページでコネクタを探し、 Add to runtime を選択します。
In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.
注釈
コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベースとスキーマをSnowflakeで作成したことを確認します。
Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。
Snowflakeアカウント認証情報でランタイムを認証します。
コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。
コネクタを構成する¶
To configure the connector, perform the following steps:
インポートしたプロセスグループを右クリックし、 Parameters を選択します。
Populate the required parameter values as described in フローパラメーター.
フローパラメーター¶
Start by setting the parameters of the SQLServer Source Parameters context, then the SQLServer Destination Parameters context. After you complete this, enable the connector. The connector connects to both SQLServer and Snowflake and starts running. However, the connector does not replicate any data until any tables to be replicated are explicitly added to its configuration.
複製する特定のテーブルを構成するには、 SQLServer 取り込みパラメーターコンテキストを編集します。SQLServer 取り込みパラメーターコンテキストに変更を適用すると、コネクタが構成を取得し、各テーブルの複製ライフサイクルが開始します。
SQLServer ソースパラメーターコンテキスト¶
パラメーター |
説明 |
|---|---|
SQL Server接続 URL |
ソースデータベースへの完全な JDBC URL。 例:
|
SQL Server JDBC ドライバー |
Select the Reference asset checkbox to upload the SQL Server JDBC driver. |
SQL Serverユーザー名 |
The user name for the connector. |
SQL Serverパスワード |
コネクタのパスワード。 |
SQLServer 宛先パラメーターコンテキスト¶
パラメーター |
説明 |
必須 |
|---|---|---|
宛先データベース |
The database where data is persisted. It must already exist in Snowflake. The name is case-sensitive. For unquoted identifiers, provide the name in uppercase. |
有り |
Snowflake認証ストラテジー |
以下を使用する場合:
|
有り |
Snowflakeアカウント識別子 |
以下を使用する場合:
|
有り |
Snowflake Connection Strategy |
KEY_PAIRを使用する場合は、Snowflakeに接続するための戦略を特定します。
|
KEY_PAIRを使用したBYOCのみに必要です。それ以外の場合は無視されます。 |
Snowflake Object Identifier Resolution |
スキーマ名、テーブル名、列名などのソースオブジェクト識別子をSnowflakeに格納し、クエリする方法を指定します。この設定は、SQLクエリで二重引用符を使用する必要があるかどうかを指定します。 オプション1:デフォルトで、大文字と小文字を区別しない(推奨)。
注釈 Snowflakeでは、データベースオブジェクトに大文字と小文字が混在する名前が想定されない場合、このオプションを使用することを推奨しています。 重要 コネクタの取り込み開始後は、この設定を変更しないでください。取り込み開始後にこの設定を変更すると、既存の取り込みは中断されます。この設定を変更する必要がある場合は、新しいコネクタインスタンスを作成してください。 オプション2: 大文字と小文字を区別する。
注釈 Snowflakeでは、レガシーまたは互換性の理由からソースの大文字小文字の区別を保持する必要がある場合、このオプションを使用することを推奨しています。たとえば、 |
有り |
Snowflake秘密キー |
以下を使用する場合:
|
無し |
Snowflake秘密キーファイル |
以下を使用する場合:
|
無し |
Snowflake秘密キーパスワード |
以下を使用する場合:
|
無し |
Snowflakeロール |
以下を使用する場合:
|
有り |
Snowflakeのユーザー名 |
以下を使用する場合:
|
有り |
Snowflakeウェアハウス |
クエリの実行に使用されるSnowflakeウェアハウス。 |
有り |
SQLServer 取り込みパラメーターコンテキスト¶
パラメーター |
説明 |
|---|---|
含まれるテーブル名 |
A comma-separated list of source table paths, including their databases and schemas, for example:
|
含まれるテーブル正規表現 |
A regular expression to match against table paths, including database and schema names. Every path matching the expression is replicated, and new tables matching the pattern that are created later are also included automatically, for example:
|
フィルター JSON |
A JSON containing a list of fully-qualified table names and a regex pattern for column names that should be included into replication. 次の例では、
|
タスクスケジュール CRON をマージする |
ジャーナルから宛先テーブルへのマージ操作がトリガーされる期間を定義する CRON 式。連続的なマージやタイムスケジュールでウェアハウスの実行時間を制限したい場合は、 例:
その他の情報と例については、 Quartzドキュメント のcronトリガーチュートリアルをご参照ください。 |
テーブルを削除し、複製に再追加する¶
To remove a table from replication, remove it from the Included Table Names or Included Table Regex parameters in the Replication Parameters context.
To re-add the table to replication later, first delete the corresponding destination table in Snowflake.
Afterward, add the table back to the Included Table Names or Included Table Regex parameters.
This ensures that the replication process starts fresh for the table.
このアプローチは、失敗したテーブル複製シナリオからの復旧にも使用できます。
テーブルの列のサブセットを複製します。¶
The connector filters the data replicated per table to a subset of configured columns.
列にフィルターを適用するには、複製パラメーターコンテキストのColumn Filterプロパティを変更し、フィルターを適用したいテーブルごとに1エントリずつ、構成の配列を追加します。
Include or exclude columns by name or pattern. You can apply a single condition per table, or combine multiple conditions, with exclusions always taking precedence over inclusions.
以下の例は、利用可能なフィールドを示しています。schema および table フィールドは必須です。included、 excluded、 includedPattern、 excludedPattern のうち1つ以上が必要です。
[
{
"schema": "<source table schema>",
"table" : "<source table name>",
"included": ["<column name>", "<column name>"],
"excluded": ["<column name>", "<column name>"],
"includedPattern": "<regular expression>",
"excludedPattern": "<regular expression>",
}
]
テーブルでデータ変更を追跡する¶
The connector replicates the current state of data from the source tables, as well as every state of every row from every changeset. This data is stored in journal tables created in the same schema as the destination table.
ジャーナルテーブル名の形式は次のとおりです。<source table name>_JOURNAL_<timestamp><schema generation> where <timestamp> is the value of epoch seconds when the source table was added to replication, and <schema generation> は整数で、ソーステーブルのスキーマが変更されるごとに増加します。その結果、スキーマが変更されるソーステーブルには、複数のジャーナルテーブルがあります。
When you remove a table from replication, then add it back, the <timestamp> value changes, and <schema generation> starts again from 1.
重要
Snowflake recommends not altering the structure of journal tables in any way. The connector uses them to update the destination table as part of the replication process.
The connector never drops journal tables, but uses the latest journal for every replicated source table, only reading append-only streams on top of journals. To reclaim the storage, you can:
すべてのジャーナルテーブルをいつでも切り捨てます。
複製から削除されたソーステーブルに関連するジャーナルテーブルをドロップします。
アクティブに複製されたテーブルの最新の生成ジャーナルテーブルを除いて、すべてをドロップします。
例えば、コネクタがソーステーブル orders をアクティブに複製するように設定されており、以前にテーブル customers を複製から削除した場合、以下のようなジャーナルテーブルが存在する可能性があります。この場合、 orders_5678_2 を 除いて、それらのすべてをドロップできます。
customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2
マージタスクのスケジュールを構成する¶
コネクタはウェアハウスを使用して、変更データキャプチャ(CDC)データを宛先テーブルにマージします。この操作は、 MergeSnowflakeJournalTable プロセッサーによってトリガーされます。新しい変更がない場合、または MergeSnowflakeJournalTable キューで待機する新しいフローファイルがない場合、マージはトリガーされず、ウェアハウスは自動サスペンドします。
Use the CRON expression in the Merge task Schedule CRON parameter to limit the warehouse cost and limit merges to only scheduled time. It throttles the flow files coming to the MergeSnowflakeJournalTable processor and merges are triggered only in a dedicated period of time. For more information about scheduling, see Scheduling strategy.
フローを実行する¶
プレーンを右クリックし、 Enable all Controller Services を選択します。
インポートしたプロセスグループを右クリックし、 Start を選択します。コネクタがデータの取り込みを開始します。