の設定 Openflow Connector for SQL Server¶
注釈
コネクタには、 コネクタ利用規約 が適用されます。
このトピックでは、 Openflow Connector for SQL Server を設定する手順について説明します。
前提条件¶
Openflow Connector for SQL Server について を確認してください。
サポートされている SQL Serverバージョン を確認してください。
推奨:ランタイムごとにコネクタインスタンスを1つだけ追加するようにします。
Openflowの設定 - BYOC または Openflowの設定 - Snowflakeデプロイメント - タスク概要 があることを確認してください。
データベース管理者として、以下のタスクを実行します。
データベース と テーブル の変更追跡を有効にします。コネクタでは、複製を開始する前にデータベースとテーブルで変更追跡が有効になっている必要があります。複製するすべてのテーブルで変更追跡が有効になっていることを確認してください。コネクタの実行中に、さらに多くのテーブルの変更追跡を有効にすることもできます。以下のコードスニペットをご参照ください。
ALTER DATABASE <database> SET CHANGE_TRACKING = ON (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON); ALTER TABLE <schema>.<table> ENABLE CHANGE_TRACKING WITH (TRACK_COLUMNS_UPDATED = ON);
コネクタのユーザーを作成します。コネクタでは、複製テーブルに対する VIEW CHANGE TRACKING 権限が付与されたユーザーが必要です。そのユーザーに、コネクタの構成にアクセスするためのパスワードを与えます。
CREATE LOGIN <user_name> WITH PASSWORD = <password>; CREATE USER <user_name> FOR LOGIN <user_name>; GRANT SELECT ON <schema>.<table> TO <user_name>; GRANT VIEW CHANGE TRACKING ON <schema>.<table> TO <user_name>;
SSL 経由で接続します。SQL Serverへの SSL 接続を使用する場合は、データベースサーバーのルート証明書を準備します。これは、構成時に必要です。
Snowflakeアカウント管理者として、以下のタスクを実行します。
タイプを SERVICE としてSnowflakeユーザーを作成します。複製データを格納するデータベースを作成し、Snowflakeユーザーに USAGE および CREATE SCHEMA 権限 を付与して、そのデータベースにオブジェクトを作成する権限を設定します。
CREATE DATABASE <destination_database>; CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow'; CREATE ROLE <openflow_role>; GRANT ROLE <openflow_role> TO USER <openflow_user>; GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>; GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>; CREATE WAREHOUSE <openflow_warehouse> WITH WAREHOUSE_SIZE = 'MEDIUM' AUTO_SUSPEND = 300 AUTO_RESUME = TRUE; GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
セキュアキーのペア(公開キーと秘密キー)を作成します。ユーザーの秘密キーをファイルに格納して、コネクタの構成に提供します。Snowflakeサービスユーザーに公開キーを割り当てます:
ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
詳細については、 キーペア認証とキーペアローテーション をご参照ください。
コネクタが使用するウェアハウスを指定します。で開始する
MEDIUMウェアハウスのサイズから、レプリケートされるテーブルの量とデータの転送量に応じてサイズを実験します。テーブル数が大きい場合は、通常、ウェアハウスのサイズよりも、 マルチクラスターウェアハウス を使用した方がスケーリングが向上します。
コネクタを設定する¶
データエンジニアとして、以下のタスクを実行してコネクタを構成します。
コネクタをインストールする¶
Openflowの概要ページに移動します。Featured connectors セクションで、 View more connectors を選択します。
Openflowのコネクタページでコネクタを探し、 Add to runtime を選択します。
Select runtime ダイアログで、 Available runtimes ドロップダウンリストからランタイムを選択します。
Add を選択します。
注釈
コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベースとスキーマをSnowflakeで作成したことを確認します。
Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。
Snowflakeアカウント認証情報でランタイムを認証します。
コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。
コネクタを構成する¶
以下のユースケースにコネクタを構成できます。
リアルタイムでテーブルのセットを複製する¶
インポートしたプロセスグループを右クリックし、 Parameters を選択します。
フローパラメーター の説明に従って、必要なパラメーター値を入力します。
フローパラメーター¶
まず、 SQLServer ソースパラメーターコンテキストのパラメーターを設定し、次に SQLServer 宛先パラメーターコンテキストを設定します。これが完了したら、コネクタを有効にします。コネクタを SQLServer とSnowflakeの両方に接続し、実行を開始する必要があります。ただし、複製するテーブルが構成に明示的に追加されるまでは、コネクタはデータを複製しません。
複製する特定のテーブルを構成するには、 SQLServer 取り込みパラメーターコンテキストを編集します。SQLServer 取り込みパラメーターコンテキストに変更を適用すると、コネクタが構成を取得し、各テーブルの複製ライフサイクルが開始します。
SQLServer ソースパラメーターコンテキスト¶
パラメーター |
説明 |
|---|---|
SQL Server接続 URL |
ソースデータベースへの完全な JDBC URL。 例:
|
SQL Server JDBC ドライバー |
Reference asset チェックボックスを選択し、SQL サーバー JDBC ドライバー をアップロードします。 |
SQL Server SSL モード |
SSL 接続を有効または無効にします。 |
SQL Serverルート SSL 証明書 |
データベースのルート証明書のフルコンテンツ。SSL が無効の場合はオプション。 |
SQL Serverユーザー名 |
コネクタのユーザー名。 |
SQL Serverパスワード |
コネクタのパスワード。 |
SQLServer 宛先パラメーターコンテキスト¶
パラメーター |
説明 |
必須 |
|---|---|---|
宛先データベース |
データが永続化されるデータベース。既にSnowflakeに存在している必要があります |
有り |
Snowflakeアカウント識別子 |
以下を使用する場合:
|
有り |
Snowflake認証ストラテジー |
以下を使用する場合:
|
有り |
Snowflake秘密キー |
以下を使用する場合:
|
無し |
Snowflake秘密キーファイル |
以下を使用する場合:
|
無し |
Snowflake秘密キーパスワード |
以下を使用する場合:
|
無し |
Snowflakeロール |
以下を使用する場合:
|
有り |
Snowflakeのユーザー名 |
以下を使用する場合:
|
有り |
Snowflakeウェアハウス |
クエリの実行に使用されるSnowflakeウェアハウス。 |
有り |
SQLServer 取り込みパラメーターコンテキスト¶
パラメーター |
説明 |
|---|---|
含まれるテーブル名 |
スキーマを含む、テーブルパスのコンマ区切りリスト。例: |
含まれるテーブル正規表現 |
テーブルパスに一致する正規表現。式に一致するすべてのパスがレプリケートされ、後で作成されるパターンに一致する新しいテーブルも自動的に含まれます。例: |
フィルター JSON |
JSON 完全修飾テーブル名のリストと、複製に含める必要がある列名の正規表現パターンが含まれます。例: |
タスクスケジュール CRON をマージする |
ジャーナルから宛先テーブルへのマージ操作がトリガーされる期間を定義する CRON 式。連続的なマージやタイムスケジュールでウェアハウスの実行時間を制限したい場合は、 例:
その他の情報と例については、 Quartzドキュメント のcronトリガーチュートリアルをご参照ください。 |
テーブルを削除し、複製に再追加する¶
複製からテーブルを削除するには、複製パラメーターコンテキストの Included Table Names または Included Table Regex パラメーターからテーブルが削除されていることを確認します。
後でテーブルを複製に再追加する場合は、まずSnowflakeで対応する宛先テーブルを削除します。その後、 Included Table Names または Included Table Regex パラメーターにテーブルを追加して戻します。これにより、テーブルの複製プロセスが新しく開始されます。
このアプローチは、失敗したテーブル複製シナリオからの復旧にも使用できます。
テーブルの列のサブセットを複製します。¶
コネクタは、テーブルごとに複製されるデータを構成列のサブセットにフィルターできます。
列にフィルターを適用するには、複製パラメーターコンテキストのColumn Filterプロパティを変更し、フィルターを適用したいテーブルごとに1エントリずつ、構成の配列を追加します。
列は、名前またはパターン別に包含したり、除外したりすることができます。テーブルごとに単一の条件を適用することも、複数の条件を組み合わせて適用することもできます。除外は常に包含より優先されます。
以下の例は、利用可能なフィールドを示しています。schema および table フィールドは必須です。included、 excluded、 includedPattern、 excludedPattern のうち1つ以上が必要です。
[
{
"schema": "<source table schema>",
"table" : "<source table name>",
"included": ["<column name>", "<column name>"],
"excluded": ["<column name>", "<column name>"],
"includedPattern": "<regular expression>",
"excludedPattern": "<regular expression>",
}
]
テーブルでデータ変更を追跡する¶
コネクタはソーステーブルのデータの現在の状態だけでなく、すべての変更セットのすべての行の状態も複製します。このデータは、宛先テーブルと同じスキーマで作成されたジャーナルテーブルに格納されます。
ジャーナルテーブル名の形式は次のとおりです。<source table name>_JOURNAL_<timestamp><schema generation> where <timestamp> is the value of epoch seconds when the source table was added to replication, and <schema generation> は整数で、ソーステーブルのスキーマが変更されるごとに増加します。その結果、スキーマが変更されるソーステーブルには、複数のジャーナルテーブルがあります。
テーブルが複製から削除され、その後再び追加されると、 <タイムスタンプ> の値が変更され、 <スキーマ生成> が 1 から再び開始されます。
重要
Snowflakeは、ジャーナルテーブルの構造を一切変更しないことを推奨します。これらは、コネクタによって複製プロセスの一部として宛先テーブルを更新するために使用されます。
コネクタはジャーナルテーブルをドロップすることはありませんが、複製されたすべてのソーステーブルに最新のジャーナルを使用し、ジャーナル上の追加専用ストリームのみを読み取ります。ストレージを回収するには、以下を実行します。
すべてのジャーナルテーブルをいつでも切り捨てます。
複製から削除されたソーステーブルに関連するジャーナルテーブルをドロップします。
アクティブに複製されたテーブルの最新の生成ジャーナルテーブルを除いて、すべてをドロップします。
例えば、コネクタがソーステーブル orders をアクティブに複製するように設定されており、以前にテーブル customers を複製から削除した場合、以下のようなジャーナルテーブルが存在する可能性があります。この場合、 orders_5678_2 を 除いて、それらのすべてをドロップできます。
customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2
マージタスクのスケジュールを構成する¶
コネクタはウェアハウスを使用して、変更データキャプチャ(CDC)データを宛先テーブルにマージします。この操作は、 MergeSnowflakeJournalTable プロセッサーによってトリガーされます。新しい変更がない場合、または MergeSnowflakeJournalTable キューで待機する新しいフローファイルがない場合、マージはトリガーされず、ウェアハウスは自動サスペンドします。
ウェアハウスのコストを制限し、スケジュールされた時間のみにマージを制限するには、Merge task Schedule CRON パラメーターで CRON 式を使用します。MergeSnowflakeJournalTable プロセッサーに送られてくるフローファイルをスロットルし、マージは専用の期間のみにトリガーされます。スケジュールに関する詳細は、 スケジュールストラテジー をご参照ください。
フローを実行する¶
プレーンを右クリックし、 Enable all Controller Services を選択します。
インポートしたプロセスグループを右クリックし、 Start を選択します。コネクタがデータの取り込みを開始します。