Openflow Connector for SQL Server の設定¶
注釈
このコネクタは、 Snowflakeコネクタ規約 に従うものとします。
このトピックでは、|Sqlserver|を設定する方法について説明します。
増分ロードプロセスについては、 増分複製 をご参照ください。
前提条件¶
コネクタを設定する前に、以下の前提条件を満たしていることを確認してください。
Openflow Connector for SQL Server について を確認してください。
サポートされている SQL Serverバージョン を確認してください。
ランタイムの展開を設定していることを確認してください。詳細については、次のトピックをご参照ください。
|OFSFSPCS-plural|を使用する場合、 必要なドメインの構成 を精査し、 SQL サーバー コネクタに必要なドメインへのアクセス許可を付与していることを確認してください。
SQL Serverインスタンスの設定¶
コネクタを設定する前に、SQL Server環境で次のタスクを実行してください。
注釈
これらのタスクはデータベース管理者として実行する必要があります。
次のSQL Server の例に示すように、複製する予定の データベース と テーブル で変更の追跡を有効にします。
ALTER DATABASE <database> SET CHANGE_TRACKING = ON (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON); ALTER TABLE <schema>.<table> ENABLE CHANGE_TRACKING WITH (TRACK_COLUMNS_UPDATED = ON);
注釈
複製する予定のすべてのデータベースとテーブルに対してこれらのコマンドを実行します。
コネクタは、複製を開始する前に、データベースとテーブルで変更追跡を有効にする必要があります。複製を予定しているすべてのテーブルで、変更の追跡が有効になっていることを確認してください。コネクタの実行中に、追加のテーブルの変更追跡を有効にすることもできます。
Create a login for the SQL Serverインスタンスのログインを作成する:
CREATE LOGIN <user_name> WITH PASSWORD = '<password>';
このログインは、複製する予定のデータベースのユーザーを作成するために使用されます。
各データベースで次のSQL Serverコマンドを実行して、複製するデータベースごとにユーザーを作成します。
USE <source_database>; CREATE USER <user_name> FOR LOGIN <user_name>;
複製する各データベースのユーザーにSELECTおよびVIEW CHANGE TRACKING権限を付与します。
GRANT SELECT ON <database>.<schema>.<table> TO <user_name>; GRANT VIEW CHANGE TRACKING ON <database>.<schema>.<table> TO <user_name>;
複製する予定のすべてのテーブルに対して、各データベースでこれらのコマンドを実行します。これらの権限は、前のステップで作成した各データベースのユーザーに付与する必要があります。
(オプション)SSL接続を構成します。
SSL接続を使用してSQL Server に接続する場合は、データベースサーバーのルート証明書を作成します。これはコネクタを構成するときに必要です。
Snowflake環境のセットアップ¶
Snowflake管理者は以下のタスクを実行します。
複製されたデータを格納するために、Snowflake内に宛先データベースを作成します。
CREATE DATABASE <destination_database>;
Snowflakeの サービスユーザー を作成します。
CREATE USER <openflow_user> TYPE = SERVICE COMMENT='Service user for automated access of Openflow';
コネクタ用Snowflakeロールを作成し、必要な権限を付与します。
CREATE ROLE <openflow_role>; GRANT ROLE <openflow_role> TO USER <openflow_user>; GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>; GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
このロールを使用して、コネクタのSnowflakeデータベースへのアクセスを管理します。
宛先データベースにオブジェクトを作成するには、アクセス管理に使用するロールに、データベースに対する USAGE権限およびCREATE SCHEMA権限 を付与する必要があります。
コネクタ用にSnowflakeウェアハウスを作成し、必要な権限を付与します。
CREATE WAREHOUSE <openflow_warehouse> WITH WAREHOUSE_SIZE = 'XSMALL' AUTO_SUSPEND = 300 AUTO_RESUME = TRUE; GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
Snowflake recommends starting with a XSMALL warehouse size, then experimenting with size depending on the number of tables being replicated and the amount of data transferred. Large numbers of tables typically scale better with multi-cluster warehouses, rather than a larger warehouse size. For more information, see multi-cluster warehouses.
キーペア認証用のパブリックキーとプライベートキーを設定します。
安全なキーのペア(パブリックおよび秘密)を作成します。
ユーザーのプライベートキーをファイルに保存し、コネクタの構成に提供します。
Snowflakeユーザーに公開キーを割り当てます:
ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
詳細については、 キーペア認証とキーペアローテーション をご参照ください。
コネクタを構成する¶
データエンジニアは、次のセクションを使用してコネクタをインストールおよび構成します。
コネクタをインストールする¶
Openflow概要ページに移動します。Featured connectors セクションで、 View more connectors を選択します。
Openflowのコネクタページでコネクタを探し、 Add to runtime を選択します。
Select runtime ダイアログで、Available runtimes ドロップダウンリストからランタイムを選択して Add をクリックします。
注釈
コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベースとスキーマをSnowflakeで作成したことを確認します。
Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。
Snowflakeアカウント認証情報でランタイムを認証します。
コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。
コネクタを構成する¶
コネクタを構成するには、次のステップを実行します。
インポートしたプロセスグループを右クリックし、 Parameters を選択します。
フローパラメーター の説明に従って、必要なパラメーター値を入力します。
フローパラメーター¶
まず、SQLServerのソースパラメーターコンテキストのパラメーターを設定し、次にSQLServerの宛先パラメーターコンテキストのパラメーターを設定します。これを完了したら、コネクタを有効にします。コネクタはSQLServerとSnowflakeの両方に接続し、実行を開始します。ただし、複製するテーブルが構成に明示的に追加されるまで、コネクタはデータを複製しません。
複製する特定のテーブルを構成するには、 SQLServer 取り込みパラメーターコンテキストを編集します。SQLServer 取り込みパラメーターコンテキストに変更を適用すると、コネクタが構成を取得し、各テーブルの複製ライフサイクルが開始します。
SQLServer ソースパラメーターコンテキスト¶
パラメーター |
説明 |
|---|---|
SQL Server接続 URL |
ソースデータベースへの完全な JDBC URL。 例:
|
SQL Server JDBC ドライバー |
Reference asset チェックボックスを選択し、 SQL サーバー JDBC ドライバー をアップロードします。 |
SQL Serverユーザー名 |
コネクタのユーザー名。 |
SQL Serverパスワード |
コネクタのパスワード。 |
SQLServer 宛先パラメーターコンテキスト¶
パラメーター |
説明 |
必須 |
|---|---|---|
宛先データベース |
データが永続化されるデータベース。Snowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。 |
有り |
Snowflake認証ストラテジー |
以下を使用する場合:
|
有り |
Snowflakeアカウント識別子 |
以下を使用する場合:
|
有り |
Snowflake接続の戦略 |
KEY_PAIRを使用する場合は、Snowflakeに接続するための戦略を特定します。
|
KEY_PAIRを使用したBYOCのみに必要です。それ以外の場合は無視されます。 |
Snowflakeオブジェクト識別子の解決 |
スキーマ名、テーブル名、列名などのソースオブジェクト識別子をSnowflakeに格納し、クエリする方法を指定します。この設定は、SQLクエリで二重引用符を使用する必要があるかどうかを指定します。 オプション1:デフォルトで、大文字と小文字を区別しない(推奨)。
注釈 Snowflakeでは、データベースオブジェクトに大文字と小文字が混在する名前が想定されない場合、このオプションを使用することを推奨しています。 重要 コネクタの取り込み開始後は、この設定を変更しないでください。取り込み開始後にこの設定を変更すると、既存の取り込みは中断されます。この設定を変更する必要がある場合は、新しいコネクタインスタンスを作成してください。 オプション2: 大文字と小文字を区別する。
注釈 Snowflakeでは、レガシーまたは互換性の理由からソースの大文字小文字の区別を保持する必要がある場合、このオプションを使用することを推奨しています。たとえば、 |
有り |
Snowflake秘密キー |
以下を使用する場合:
|
無し |
Snowflake秘密キーファイル |
以下を使用する場合:
|
無し |
Snowflake秘密キーパスワード |
以下を使用する場合:
|
無し |
Snowflakeロール |
以下を使用する場合:
|
有り |
Snowflakeのユーザー名 |
以下を使用する場合:
|
有り |
Snowflakeウェアハウス |
クエリの実行に使用されるSnowflakeウェアハウス。 |
有り |
SQLServer 取り込みパラメーターコンテキスト¶
パラメーター |
説明 |
|---|---|
含まれるテーブル名 |
データベースとスキーマを含むソーステーブルパスのコンマ区切りリスト。例:
|
含まれるテーブル正規表現 |
データベース名とスキーマ名を含むテーブルパスに一致する正規表現。式に一致するすべてのパスが複製され、後で作成されるパターンに一致する新しいテーブルも自動的に含められます。例:
|
フィルター JSON |
JSON 完全修飾テーブル名のリストと、複製に含める必要がある列名の正規表現パターンが含まれます。 次の例では、
|
タスクスケジュール CRON をマージする |
ジャーナルから宛先テーブルへのマージ操作がトリガーされる期間を定義する CRON 式。連続的なマージやタイムスケジュールでウェアハウスの実行時間を制限したい場合は、 例:
その他の情報と例については、 Quartzドキュメント のcronトリガーチュートリアルをご参照ください。 |
テーブルを削除し、複製に再追加する¶
複製からテーブルを削除するには、複製パラメーターコンテキストの Included Table Names または Included Table Regex パラメーターからテーブルを削除します。
後でテーブルを複製に再度追加するには、まずはSnowflake内の対応する宛先テーブルを削除します。その後、テーブルを Included Table Names または Included Table Regex パラメーターに再度追加します。これにより、テーブルの複製プロセスが新しく開始されるようになります。
このアプローチは、失敗したテーブル複製シナリオからの復旧にも使用できます。
テーブルの列のサブセットを複製します。¶
コネクタは、テーブルごとに複製されたデータを構成された列のサブセットにフィルタリングします。
列にフィルターを適用するには、複製パラメーターコンテキストのColumn Filterプロパティを変更し、フィルターを適用したいテーブルごとに1エントリずつ、構成の配列を追加します。
名前またはパターンで列を含める、または除外します。テーブルごとに単一の条件を適用することも、複数の条件を組み合わせることもできます。その場合、除外は常に包含よりも優先されます。
以下の例は、利用可能なフィールドを示しています。schema および table フィールドは必須です。included、 excluded、 includedPattern、 excludedPattern のうち1つ以上が必要です。
[
{
"schema": "<source table schema>",
"table" : "<source table name>",
"included": ["<column name>", "<column name>"],
"excluded": ["<column name>", "<column name>"],
"includedPattern": "<regular expression>",
"excludedPattern": "<regular expression>",
}
]
テーブルでデータ変更を追跡する¶
コネクタは、ソーステーブルからのデータの現在の状態と、すべての変更セットからのすべての行のすべての状態を複製します。このデータは、宛先テーブルと同じスキーマで作成されたジャーナルテーブルに格納されます。
ジャーナルテーブル名の形式は次のとおりです。<source table name>_JOURNAL_<timestamp><schema generation> where <timestamp> is the value of epoch seconds when the source table was added to replication, and <schema generation> は整数で、ソーステーブルのスキーマが変更されるごとに増加します。その結果、スキーマが変更されるソーステーブルには、複数のジャーナルテーブルがあります。
レプリケーションからテーブルを削除し、再度追加すると、 <タイムスタンプ> の値が変更され、 <スキーマ生成> が 1 から再び開始されます。
重要
Snowflakeは、ジャーナルテーブルの構造を一切変更しないことを推奨します。コネクタはそれらを使用して、複製プロセスの一部として宛先テーブルを更新します。
コネクタはジャーナルテーブルをドロップすることはありませんが、複製されたソーステーブルごとに最新のジャーナルを使用し、ジャーナル上の追加専用ストリームのみを読み取ります。ストレージを回収するには、以下を実行します。
すべてのジャーナルテーブルをいつでも切り捨てます。
複製から削除されたソーステーブルに関連するジャーナルテーブルをドロップします。
アクティブに複製されたテーブルの最新の生成ジャーナルテーブルを除いて、すべてをドロップします。
例えば、コネクタがソーステーブル orders をアクティブに複製するように設定されており、以前にテーブル customers を複製から削除した場合、以下のようなジャーナルテーブルが存在する可能性があります。この場合、 orders_5678_2 を 除いて、それらのすべてをドロップできます。
customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2
マージタスクのスケジュールを構成する¶
コネクタはウェアハウスを使用して、変更データキャプチャ(CDC)データを宛先テーブルにマージします。この操作は、 MergeSnowflakeJournalTable プロセッサーによってトリガーされます。新しい変更がない場合、または MergeSnowflakeJournalTable キューで待機する新しいフローファイルがない場合、マージはトリガーされず、ウェアハウスは自動サスペンドします。
マージタスクのスケジュールCRONパラメーターでCRON式を使用して、ウェアハウスコストを制限し、マージをスケジュールされた時間のみに制限します。MergeSnowflakeJournalTableプロセッサーに送信されるフローファイルを制限し、マージは指定された期間にのみトリガーされます。スケジューリングの詳細については、 スケジューリング戦略 をご参照ください。
フローを実行する¶
プレーンを右クリックし、 Enable all Controller Services を選択します。
インポートしたプロセスグループを右クリックし、 Start を選択します。コネクタがデータの取り込みを開始します。