の設定 Openflow Connector for SQL Server¶
注釈
このコネクタは、 Snowflakeコネクタ規約 に従うものとします。
このトピックでは、 Openflow Connector for SQL Server を設定する手順について説明します。
前提条件¶
Openflow Connector for SQL Server について を確認してください。
サポートされている SQL Serverバージョン を確認してください。
推奨:ランタイムごとにコネクタインスタンスを1つだけ追加するようにします。
Ensure that you have Openflowの設定 - BYOC or Set up Openflow - Snowflake Deployments.
If using Openflow - Snowflake Deployments, ensure that you've reviewed configuring required domains and have granted access to the required domains for the SQL Server connector.
データベース管理者として、以下のタスクを実行します。
データベース と テーブル の変更追跡を有効にします。コネクタでは、複製を開始する前にデータベースとテーブルで変更追跡が有効になっている必要があります。複製するすべてのテーブルで変更追跡が有効になっていることを確認してください。コネクタの実行中に、さらに多くのテーブルの変更追跡を有効にすることもできます。以下のコードスニペットをご参照ください。
ALTER DATABASE <database> SET CHANGE_TRACKING = ON (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON); ALTER TABLE <schema>.<table> ENABLE CHANGE_TRACKING WITH (TRACK_COLUMNS_UPDATED = ON);
コネクタのユーザーを作成します。コネクタでは、複製テーブルに対する VIEW CHANGE TRACKING 権限が付与されたユーザーが必要です。そのユーザーに、コネクタの構成にアクセスするためのパスワードを与えます。
CREATE LOGIN <user_name> WITH PASSWORD = <password>; CREATE USER <user_name> FOR LOGIN <user_name>; GRANT SELECT ON <schema>.<table> TO <user_name>; GRANT VIEW CHANGE TRACKING ON <schema>.<table> TO <user_name>;
SSL 経由で接続します。SQL Serverへの SSL 接続を使用する場合は、データベースサーバーのルート証明書を準備します。これは、構成時に必要です。
Snowflakeアカウント管理者として、以下のタスクを実行します。
タイプを SERVICE としてSnowflakeユーザーを作成します。複製データを格納するデータベースを作成し、Snowflakeユーザーに USAGE および CREATE SCHEMA 権限 を付与して、そのデータベースにオブジェクトを作成する権限を設定します。
CREATE DATABASE <destination_database>; CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow'; CREATE ROLE <openflow_role>; GRANT ROLE <openflow_role> TO USER <openflow_user>; GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>; GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>; CREATE WAREHOUSE <openflow_warehouse> WITH WAREHOUSE_SIZE = 'MEDIUM' AUTO_SUSPEND = 300 AUTO_RESUME = TRUE; GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
セキュアキーのペア(公開キーと秘密キー)を作成します。ユーザーの秘密キーをファイルに格納して、コネクタの構成に提供します。Snowflakeサービスユーザーに公開キーを割り当てます:
ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
詳細については、 キーペア認証とキーペアローテーション をご参照ください。
コネクタが使用するウェアハウスを指定します。で開始する
MEDIUMウェアハウスのサイズから、レプリケートされるテーブルの量とデータの転送量に応じてサイズを実験します。テーブル数が大きい場合は、通常、ウェアハウスのサイズよりも、 マルチクラスターウェアハウス を使用した方がスケーリングが向上します。
コネクタを設定する¶
データエンジニアとして、以下のタスクを実行してコネクタを構成します。
コネクタをインストールする¶
Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.
Openflowのコネクタページでコネクタを探し、 Add to runtime を選択します。
In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.
注釈
コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベースとスキーマをSnowflakeで作成したことを確認します。
Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。
Snowflakeアカウント認証情報でランタイムを認証します。
コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。
コネクタを構成する¶
以下のユースケースにコネクタを構成できます。
リアルタイムでテーブルのセットを複製する¶
インポートしたプロセスグループを右クリックし、 Parameters を選択します。
フローパラメーター の説明に従って、必要なパラメーター値を入力します。
フローパラメーター¶
まず、 SQLServer ソースパラメーターコンテキストのパラメーターを設定し、次に SQLServer 宛先パラメーターコンテキストを設定します。これが完了したら、コネクタを有効にします。コネクタを SQLServer とSnowflakeの両方に接続し、実行を開始する必要があります。ただし、複製するテーブルが構成に明示的に追加されるまでは、コネクタはデータを複製しません。
複製する特定のテーブルを構成するには、 SQLServer 取り込みパラメーターコンテキストを編集します。SQLServer 取り込みパラメーターコンテキストに変更を適用すると、コネクタが構成を取得し、各テーブルの複製ライフサイクルが開始します。
SQLServer ソースパラメーターコンテキスト¶
パラメーター |
説明 |
|---|---|
SQL Server接続 URL |
ソースデータベースへの完全な JDBC URL。 例:
|
SQL Server JDBC ドライバー |
Reference asset チェックボックスを選択し、SQL サーバー JDBC ドライバー をアップロードします。 |
SQL Serverユーザー名 |
コネクタのユーザー名。 |
SQL Serverパスワード |
コネクタのパスワード。 |
SQLServer 宛先パラメーターコンテキスト¶
パラメーター |
説明 |
必須 |
|---|---|---|
宛先データベース |
データが永続化されるデータベース。Snowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。 |
有り |
Snowflake認証ストラテジー |
以下を使用する場合:
|
有り |
Snowflakeアカウント識別子 |
以下を使用する場合:
|
有り |
Snowflake秘密キー |
以下を使用する場合:
|
無し |
Snowflake秘密キーファイル |
以下を使用する場合:
|
無し |
Snowflake秘密キーパスワード |
以下を使用する場合
|
無し |
Snowflakeロール |
以下を使用する場合
|
有り |
Snowflakeのユーザー名 |
以下を使用する場合
|
有り |
Snowflakeウェアハウス |
クエリの実行に使用されるSnowflakeウェアハウス。 |
有り |
SQLServer 取り込みパラメーターコンテキスト¶
パラメーター |
説明 |
|---|---|
含まれるテーブル名 |
スキーマを含む、テーブルパスのコンマ区切りリスト。例: |
含まれるテーブル正規表現 |
テーブルパスに一致する正規表現。式に一致するすべてのパスがレプリケートされ、後で作成されるパターンに一致する新しいテーブルも自動的に含まれます。例: |
フィルター JSON |
JSON 完全修飾テーブル名のリストと、複製に含める必要がある列名の正規表現パターンが含まれます。例: |
タスクスケジュール CRON をマージする |
ジャーナルから宛先テーブルへのマージ操作がトリガーされる期間を定義する CRON 式。連続的なマージやタイムスケジュールでウェアハウスの実行時間を制限したい場合は、 例:
その他の情報と例については、 Quartzドキュメント のcronトリガーチュートリアルをご参照ください。 |
テーブルを削除し、複製に再追加する¶
複製からテーブルを削除するには、複製パラメーターコンテキストの Included Table Names または Included Table Regex パラメーターからテーブルが削除されていることを確認します。
後でテーブルを複製に再追加する場合は、まずSnowflakeで対応する宛先テーブルを削除します。その後、 Included Table Names または Included Table Regex パラメーターにテーブルを追加して戻します。これにより、テーブルの複製プロセスが新しく開始されます。
このアプローチは、失敗したテーブル複製シナリオからの復旧にも使用できます。
テーブルの列のサブセットを複製します。¶
コネクタは、テーブルごとに複製されるデータを構成列のサブセットにフィルターできます。
列にフィルターを適用するには、複製パラメーターコンテキストのColumn Filterプロパティを変更し、フィルターを適用したいテーブルごとに1エントリずつ、構成の配列を追加します。
列は、名前またはパターン別に包含したり、除外したりすることができます。テーブルごとに単一の条件を適用することも、複数の条件を組み合わせて適用することもできます。除外は常に包含より優先されます。
以下の例は、利用可能なフィールドを示しています。schema および table フィールドは必須です。included、 excluded、 includedPattern、 excludedPattern のうち1つ以上が必要です。
[
{
"schema": "<source table schema>",
"table" : "<source table name>",
"included": ["<column name>", "<column name>"],
"excluded": ["<column name>", "<column name>"],
"includedPattern": "<regular expression>",
"excludedPattern": "<regular expression>",
}
]
テーブルでデータ変更を追跡する¶
コネクタはソーステーブルのデータの現在の状態だけでなく、すべての変更セットのすべての行の状態も複製します。このデータは、宛先テーブルと同じスキーマで作成されたジャーナルテーブルに格納されます。
ジャーナルテーブル名の形式は次のとおりです。<source table name>_JOURNAL_<timestamp><schema generation> where <timestamp> is the value of epoch seconds when the source table was added to replication, and <schema generation> は整数で、ソーステーブルのスキーマが変更されるごとに増加します。その結果、スキーマが変更されるソーステーブルには、複数のジャーナルテーブルがあります。
テーブルが複製から削除され、その後再び追加されると、 <タイムスタンプ> の値が変更され、 <スキーマ生成> が 1 から再び開始されます。
重要
Snowflakeは、ジャーナルテーブルの構造を一切変更しないことを推奨します。これらは、コネクタによって複製プロセスの一部として宛先テーブルを更新するために使用されます。
コネクタはジャーナルテーブルをドロップすることはありませんが、複製されたすべてのソーステーブルに最新のジャーナルを使用し、ジャーナル上の追加専用ストリームのみを読み取ります。ストレージを回収するには、以下を実行します。
すべてのジャーナルテーブルをいつでも切り捨てます。
複製から削除されたソーステーブルに関連するジャーナルテーブルをドロップします。
アクティブに複製されたテーブルの最新の生成ジャーナルテーブルを除いて、すべてをドロップします。
例えば、コネクタがソーステーブル orders をアクティブに複製するように設定されており、以前にテーブル customers を複製から削除した場合、以下のようなジャーナルテーブルが存在する可能性があります。この場合、 orders_5678_2 を 除いて、それらのすべてをドロップできます。
customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2
マージタスクのスケジュールを構成する¶
コネクタはウェアハウスを使用して、変更データキャプチャ(CDC)データを宛先テーブルにマージします。この操作は、 MergeSnowflakeJournalTable プロセッサーによってトリガーされます。新しい変更がない場合、または MergeSnowflakeJournalTable キューで待機する新しいフローファイルがない場合、マージはトリガーされず、ウェアハウスは自動サスペンドします。
ウェアハウスのコストを制限し、スケジュールされた時間のみにマージを制限するには、Merge task Schedule CRON パラメーターで CRON 式を使用します。MergeSnowflakeJournalTable プロセッサーに送られてくるフローファイルをスロットルし、マージは専用の期間のみにトリガーされます。スケジュールに関する詳細は、 スケジュールストラテジー をご参照ください。
フローを実行する¶
プレーンを右クリックし、 Enable all Controller Services を選択します。
インポートしたプロセスグループを右クリックし、 Start を選択します。コネクタがデータの取り込みを開始します。