Openflow Connector for SQL Server の設定

注釈

このコネクタは、 Snowflakeコネクタ規約 に従うものとします。

このトピックでは、|Sqlserver|を設定する方法について説明します。

増分ロードプロセスについては、 増分複製 をご参照ください。

前提条件

コネクタを設定する前に、以下の前提条件を満たしていることを確認してください。

  1. Openflow Connector for SQL Server について を確認してください。

  2. サポートされている SQL Serverバージョン を確認してください。

  3. ランタイムの展開を設定していることを確認してください。詳細については、次のトピックをご参照ください。

  4. |OFSFSPCS-plural|を使用する場合、 必要なドメインの構成 を精査し、 SQL サーバー コネクタに必要なドメインへのアクセス許可を付与していることを確認してください。

SQL Serverインスタンスの設定

コネクタを設定する前に、SQL Server環境で次のタスクを実行してください。

注釈

これらのタスクはデータベース管理者として実行する必要があります。

  1. 次のSQL Server の例に示すように、複製する予定の データベーステーブル で変更の追跡を有効にします。

    ALTER DATABASE <database>
      SET CHANGE_TRACKING = ON
      (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON);
    
    ALTER TABLE <schema>.<table>
      ENABLE CHANGE_TRACKING;
    
    Copy

    注釈

    複製する予定のすべてのデータベースとテーブルに対してこれらのコマンドを実行します。

    コネクタは、複製を開始する前に、データベースとテーブルで変更追跡を有効にする必要があります。複製を予定しているすべてのテーブルで、変更の追跡が有効になっていることを確認してください。コネクタの実行中に、追加のテーブルの変更追跡を有効にすることもできます。

  2. Create a login for the SQL Serverインスタンスのログインを作成する:

    CREATE LOGIN <user_name> WITH PASSWORD = '<password>';
    
    Copy

    このログインは、複製する予定のデータベースのユーザーを作成するために使用されます。

  3. 各データベースで次のSQL Serverコマンドを実行して、複製するデータベースごとにユーザーを作成します。

    USE <source_database>;
    CREATE USER <user_name> FOR LOGIN <user_name>;
    
    Copy
  4. 複製する各データベースのユーザーにSELECTおよびVIEW CHANGE TRACKING権限を付与します。

    GRANT SELECT ON <database>.<schema>.<table> TO <user_name>;
    GRANT VIEW CHANGE TRACKING ON <database>.<schema>.<table> TO <user_name>;
    
    Copy

    複製する予定のすべてのテーブルに対して、各データベースでこれらのコマンドを実行します。これらの権限は、前のステップで作成した各データベースのユーザーに付与する必要があります。

  5. (オプション)ユーザー定義データ型(UDDT)に対する VIEW DEFINITION 権限を付与します。

    テーブルにユーザー定義データ型(UDDT)を使用する列が含まれていて、UDDT がコネクタユーザーとは異なるユーザーによって所有されている場合は、次のSQL Serverの例に示すように、コネクタユーザーに VIEW DEFINITION 権限を付与する必要があります。

    GRANT VIEW DEFINITION TO <user_name>;
    
    Copy

    この権限がない場合、UDDT を使用する列は自動的に複製から除外されます。

  6. (オプション)SSL接続を構成します。

    SSL接続を使用してSQL Server に接続する場合は、データベースサーバーのルート証明書を作成します。これはコネクタを構成するときに必要です。

Snowflake環境のセットアップ

Snowflake管理者は以下のタスクを実行します。

  1. 複製されたデータを格納するために、Snowflake内に宛先データベースを作成します。

    CREATE DATABASE <destination_database>;
    
    Copy
  2. Snowflakeの サービスユーザー を作成します。

    CREATE USER <openflow_user>
      TYPE = SERVICE
      COMMENT='Service user for automated access of Openflow';
    
    Copy
  3. コネクタ用Snowflakeロールを作成し、必要な権限を付与します。

    CREATE ROLE <openflow_role>;
    GRANT ROLE <openflow_role> TO USER <openflow_user>;
    GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>;
    GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
    
    Copy

    このロールを使用して、コネクタのSnowflakeデータベースへのアクセスを管理します。

    宛先データベースにオブジェクトを作成するには、アクセス管理に使用するロールに、データベースに対する USAGE権限およびCREATE SCHEMA権限 を付与する必要があります。

  4. コネクタ用にSnowflakeウェアハウスを作成し、必要な権限を付与します。

    CREATE WAREHOUSE <openflow_warehouse> WITH
      WAREHOUSE_SIZE = 'XSMALL'
      AUTO_SUSPEND = 300
      AUTO_RESUME = TRUE;
    GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
    
    Copy

    Snowflakeは、XSMALLウェアハウスサイズから始めて、複製されるテーブルの数と転送されるデータの量に応じてサイズ変更を試してみることを推奨しています。多数のテーブルを扱う場合、通常はウェアハウスサイズを大きくするよりも、マルチクラスターのウェアハウスを使用した方が適切にスケールできます。詳細については、 マルチクラスターウェアハウス をご参照ください。

  5. キーペア認証用のパブリックキーとプライベートキーを設定します。

    1. 安全なキーのペア(パブリックおよび秘密)を作成します。

    2. ユーザーのプライベートキーをファイルに保存し、コネクタの構成に提供します。

    3. Snowflakeユーザーに公開キーを割り当てます:

      ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
      
      Copy

      詳細については、 キーペア認証とキーペアローテーション をご参照ください。

コネクタをインストールする

コネクタをインストールするには、データエンジニアとして次を実行します。

  1. Openflow概要ページに移動します。Featured connectors セクションで、 View more connectors を選択します。

  2. Openflowのコネクタページでコネクタを探し、 Add to runtime を選択します。

  3. Select runtime ダイアログで、Available runtimes ドロップダウンリストからランタイムを選択して Add をクリックします。

    注釈

    コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベースとスキーマをSnowflakeで作成したことを確認します。

  4. Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。

  5. Snowflakeアカウント認証情報でランタイムを認証します。

コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。

コネクタを構成する

コネクタを構成するには、データエンジニアとして次の手順を実行します。

  1. インポートしたプロセスグループを右クリックし、 Parameters を選択します。

  2. フローパラメーター の説明に従って、必要なパラメーター値を入力します。

フローパラメーター

まず、SQLServerのソースパラメーターコンテキストのパラメーターを設定し、次にSQLServerの宛先パラメーターコンテキストのパラメーターを設定します。これを完了したら、コネクタを有効にします。コネクタはSQLServerとSnowflakeの両方に接続し、実行を開始します。ただし、複製するテーブルが構成に明示的に追加されるまで、コネクタはデータを複製しません。

複製する特定のテーブルを構成するには、 SQLServer 取り込みパラメーターコンテキストを編集します。SQLServer 取り込みパラメーターコンテキストに変更を適用すると、コネクタが構成を取得し、各テーブルの複製ライフサイクルが開始します。

SQLServer ソースパラメーターコンテキスト

パラメーター

説明

SQLServer 接続 URL

ソースデータベースへの完全な JDBC URL。

例:

  • jdbc:sqlserver://example.com:1433;encrypt=false;

SQLServer JDBC ドライバー

Reference asset チェックボックスを選択し、 SQL サーバー JDBC ドライバー をアップロードします。

SQLServer ユーザー名

コネクタのユーザー名。

SQLServer パスワード

コネクタのパスワード。

SQLServer 宛先パラメーターコンテキスト

パラメーター

説明

必須

宛先データベース

データが永続化されるデータベース。Snowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。

有り

Snowflake認証ストラテジー

以下を使用する場合:

  • Snowflake Openflow Deployment または BYOC:SNOWFLAKE_MANAGED_TOKEN を使用します。このトークンはSnowflakeによって自動的に管理されます。 BYOC デプロイメントでは、 SNOWFLAKE_MANAGED_TOKEN を使用するために、事前に ランタイムロール が構成されている必要があります。

  • BYOC: 代わりに、BYOC では認証戦略の値として KEY_PAIR を使用できます。

有り

Snowflakeアカウント識別子

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR: データが永続化される[organization-name]-[account-name]形式のSnowflakeアカウント名。

有り

Snowflake接続の戦略

KEY_PAIRを使用する場合は、Snowflakeに接続するための戦略を特定します。

  • STANDARD (デフォルト):Snowflakeサービスへの標準パブリックルーティングを使用して接続します。

  • PRIVATE_CONNECTIVITY:AWS PrivateLinkなど、サポート対象のクラウドプラットフォームに関連付けられたプライベートアドレスを使用して接続します。

KEY_PAIRを使用したBYOCのみに必要です。それ以外の場合は無視されます。

Snowflakeオブジェクト識別子の解決

スキーマ名、テーブル名、列名などのソースオブジェクト識別子をSnowflakeに格納し、クエリする方法を指定します。この設定は、SQLクエリで二重引用符を使用する必要があるかどうかを指定します。

オプション1:デフォルトで、大文字と小文字を区別しない(推奨)。

  • 変換:識別子はすべて大文字に変換されます。たとえば、 My_TableMY_TABLE になります。

  • クエリ: SQL クエリは大文字と小文字を区別せず、 SQL 二重引用符は必要ありません。

    たとえば、 SELECT * FROM my_table;SELECT * FROM MY_TABLE; と同じ結果を返します。

注釈

Snowflakeでは、データベースオブジェクトに大文字と小文字が混在する名前が想定されない場合、このオプションを使用することを推奨しています。

重要

コネクタの取り込み開始後は、この設定を変更しないでください。取り込み開始後にこの設定を変更すると、既存の取り込みは中断されます。この設定を変更する必要がある場合は、新しいコネクタインスタンスを作成してください。

オプション2: 大文字と小文字を区別する。

  • 変換:大文字小文字は維持されます。たとえば、 My_TableMy_Table のままです。

  • クエリ: SQL クエリは、データベースオブジェクトの大文字と小文字を正確に一致させるために、二重引用符を使用する必要があります。例: SELECT * FROM "My_Table";

注釈

Snowflakeでは、レガシーまたは互換性の理由からソースの大文字小文字の区別を保持する必要がある場合、このオプションを使用することを推奨しています。たとえば、 MY_TABLEmy_table のように大文字と小文字だけが異なるテーブル名がソースデータベースに含まれている場合、大文字と小文字を区別しない比較を使用すると、名前の衝突が発生します。

有り

Snowflake秘密キー

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:認証に使用される RSA プライベートキーである必要があります。

    その RSA キーは PKCS8 標準に従ってフォーマットされ、標準の PEM ヘッダーとフッターを持つ必要があります。SnowflakeプライベートキーファイルまたはSnowflakeプライベートキーのいずれかを定義する必要があることに注意してください。

無し

Snowflake秘密キーファイル

以下を使用する場合:

  • Session token authentication strategy: プライベートキーファイルは空白である必要があります。

  • KEY_PAIR:Snowflakeへの認証に使用される RSA プライベートキーを含むファイルをアップロードします。これは、PKCS8 標準に従ってフォーマットされ、標準の PEM ヘッダーとフッターを含んでいる必要があります。ヘッダー行は で始まります -----BEGIN PRIVATE.プライベートキーファイルをアップロードするには、Reference asset チェックボックスを選択します。

無し

Snowflake秘密キーパスワード

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:Snowflakeプライベートキーファイルに関連付けられたパスワードを提供します。

無し

Snowflakeロール

以下を使用する場合:

  • Session Token Authentication Strategy:ランタイムに割り当てられたSnowflakeロール、またはこのSnowflakeロールに付与された子ロールを使用します。ランタイムのSnowflakeロールは、Openflow UIでランタイムの:ui:`More Options [⋮]`ボタンを展開し、:ui:`Set Snowflake role`を選択すると確認できます。

  • KEY_PAIR Authentication Strategy:サービスユーザーのために構成された有効なロールを使用します。

有り

Snowflakeのユーザー名

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:Snowflakeインスタンスへの接続に使用するユーザー名を提供します。

有り

Snowflakeウェアハウス

クエリの実行に使用されるSnowflakeウェアハウス。

有り

SQLServer 取り込みパラメーターコンテキスト

パラメーター

説明

含まれるテーブル名

データベースとスキーマを含むソーステーブルパスのコンマ区切りリスト。例:

database_1.public.table_1, database_2.schema_2.table_2

含まれるテーブル正規表現

データベース名とスキーマ名を含むテーブルパスに一致する正規表現。式に一致するすべてのパスが複製され、後で作成されるパターンに一致する新しいテーブルも自動的に含められます。例:

database_name\.public\.auto_.*

フィルター JSON

JSON 完全修飾テーブル名のリストと、複製に含める必要がある列名の正規表現パターンが含まれます。

次の例では、 my_db データベースのパブリックスキーマの table1 にある name で終わるすべての列が含まれます。

[ {"database":"my_db", "schema":"public", "table":"table1", "includedPattern":".*name"} ]

タスクスケジュール CRON をマージする

ジャーナルから宛先テーブルへのマージ操作がトリガーされる期間を定義する CRON 式。連続的なマージやタイムスケジュールでウェアハウスの実行時間を制限したい場合は、 * * * * * ? に設定します。

例:

  • * 0 * * * ? という文字列は、丸1時間で1分間のマージをスケジュールしたいことを示しています。

  • * 20 14 ? * MON-FRI という文字列は、毎週月曜日から金曜日の2:20 PM にマージをスケジュールしたいことを示しています。

その他の情報と例については、 Quartzドキュメント のcronトリガーチュートリアルをご参照ください。

SQL Serverのレプリカサーバーからテーブルを複製する

コネクタは、プライマリサーバーから、`トランザクション複製<https://learn.microsoft.com/en-us/sql/relational-databases/replication/transactional/transactional-replication>`_を使用するサブスクライバーサーバーから、または`Always On可用性グループ<https://learn.microsoft.com/en-us/sql/database-engine/availability-groups/windows/overview-of-always-on-availability-groups-sql-server>`_のセカンダリレプリカからデータを取り込むことができます。SQL Serverレプリカに接続するコネクタを構成する前に、プライマリノードとレプリカノード間の複製が正しく機能することを確認します。コネクタでのデータの欠落に関する問題を調査する場合は、まずコネクタが使用するレプリカサーバーに欠落している行と変更追跡イベントが存在することを確認します。

継続性を確保するために、同じ接続ユーザーがプライマリサーバーとレプリカサーバーの両方で使用でき、データテーブルと変更追跡テーブルにアクセスできることを確認してください。

トランザクションの複製

`トランザクションの複製<https://learn.microsoft.com/en-us/sql/relational-databases/replication/transactional/transactional-replication>`_は、パブリッシャーからサブスクライバーにデータの変更をコピーするデータ配信メカニズムです。パブリッシャーではなくサブスクライバーサーバーから読み取るようにコネクタを構成するには、:ui:`SQLServer Connection URL`パラメーターにサブスクライバーサーバーURLを指定します。

警告

複製が開始された後は、データベースサーバーを変更しないでください。各データベースは独自の変更追跡状態を独立して維持するため、別のサーバーに切り替えると、コネクタはどの変更が既に処理されているかを知ることができなくなり、データが失われる可能性があります。

Always On可用性グループ

`Always On可用性グループ<https://learn.microsoft.com/en-us/sql/database-engine/availability-groups/windows/overview-of-always-on-availability-groups-sql-server>`_は、フェールオーバーの目的でデータベースの同期されたコピーを保持する、高可用性および障害復旧のソリューションです。コネクタは可用性グループのセカンダリレプリカから読み取ることができます。最適なエクスペリエンスを得るには、`可用性グループリスナー<https://learn.microsoft.com/en-us/sql/database-engine/availability-groups/windows/listeners-client-connectivity-application-failover>`_を構成し、:ui:`SQLServer Connection URL`パラメーターにリスナーDNS名を使用します。

テーブル複製の再開

(主キーが欠落している、またはスキーマの変更がサポートされていないなどの理由で) FAILED 状態のテーブルは、自動的に再開されません。テーブルが FAILED 状態になった場合、または複製を最初から再開する必要がある場合は、次の手順に従ってテーブルを削除し、複製に再度追加します。

注釈

主キーが欠落しているなど、ソーステーブルの問題が原因で障害が発生した場合は、続行する前にソースデータベースでその問題を解決します。

  1. フローパラメーターからテーブルを削除します。取り込みパラメーターのコンテキストで、Included Table Names からテーブルを削除するか、Included Table Regex を変更してテーブルが一致しないようにします。

  2. テーブルが削除されたことを確認します。

    1. Openflowランタイムキャンバスで、プロセッサーグループを右クリックし、Controller Services を選択します。

    2. コントローラーサービスをリストしたテーブルで、Table State Store 行を見つけ、行の右側にある縦3つのドットをクリックして、View State を選択します。

    重要

    続行する前に、テーブルの状態がこのリストから完全に削除されるまで待つ必要があります。この構成変更が完了するまで続行しないでください。

  3. 宛先をクリーンアップする:テーブルの状態が完全に削除されたと表示されたら、Snowflake で宛先テーブルを手動で DROP します。スナップショットフェーズ中に、コネクタは既存の宛先テーブルを上書きしないことに注意してください。テーブルがまだ存在する場合、複製は再度失敗します。オプションで、ジャーナルテーブルとストリームが不要になった場合は削除することもできます。

  4. テーブルを再度追加する: Included Table Names または Included Table Regex パラメーターを更新して、テーブルを再度含めます。

  5. 再開を確認する:前述の指示に従って Table State Store をチェックします。テーブルの状態は、ステータス NEW で表示され、次に SNAPSHOT_REPLICATION に移行し、最後に INCREMENTAL_REPLICATION になります。

テーブルの列のサブセットを複製します。

コネクタは、テーブルごとに複製されたデータを構成された列のサブセットにフィルタリングします。

列にフィルターを適用するには、複製パラメーターコンテキストのColumn Filterプロパティを変更し、フィルターを適用したいテーブルごとに1エントリずつ、構成の配列を追加します。

名前またはパターンで列を含める、または除外します。テーブルごとに単一の条件を適用することも、複数の条件を組み合わせることもできます。その場合、除外は常に包含よりも優先されます。

以下の例は、利用可能なフィールドを示しています。schema および table フィールドは必須です。includedexcludedincludedPatternexcludedPattern のうち1つ以上が必要です。

[
    {
        "schema": "<source table schema>",
        "table" : "<source table name>",
        "included": ["<column name>", "<column name>"],
        "excluded": ["<column name>", "<column name>"],
        "includedPattern": "<regular expression>",
        "excludedPattern": "<regular expression>",
    }
]
Copy

テーブルでデータ変更を追跡する

コネクタは、ソーステーブルからのデータの現在の状態と、すべての変更セットからのすべての行のすべての状態を複製します。このデータは、宛先テーブルと同じスキーマで作成されたジャーナルテーブルに格納されます。

ジャーナルテーブル名の形式は``<source_table_name>_JOURNAL_<timestamp>_<schema_generation>``です。ここで、``<timestamp>``はソーステーブルが複製に追加されたときのエポック秒の値であり、``<schema_generation>``はソーステーブルのスキーマ変更ごとに増加する整数です。その結果、スキーマが変更されるソーステーブルには、複数のジャーナルテーブルがあります。

複製からテーブルを削除してから再度追加すると、``<timestamp>``値が変更され、``<schema_generation>``は``1``から再度開始されます。

重要

Snowflakeは、ジャーナルテーブルの構造を一切変更しないことを推奨します。コネクタはそれらを使用して、複製プロセスの一部として宛先テーブルを更新します。

コネクタはジャーナルテーブルをドロップすることはありませんが、複製されたソーステーブルごとに最新のジャーナルを使用し、ジャーナル上の追加専用ストリームのみを読み取ります。ストレージを回収するには、以下を実行します。

  • すべてのジャーナルテーブルをいつでも切り捨てます。

  • 複製から削除されたソーステーブルに関連するジャーナルテーブルをドロップします。

  • アクティブに複製されたテーブルの最新の生成ジャーナルテーブルを除いて、すべてをドロップします。

例えば、コネクタがソーステーブル orders をアクティブに複製するように設定されており、以前にテーブル customers を複製から削除した場合、以下のようなジャーナルテーブルが存在する可能性があります。この場合、 orders_5678_2除いて、それらのすべてをドロップできます。

customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2

マージタスクのスケジュールを構成する

コネクタはウェアハウスを使用して、変更データキャプチャ(CDC)データを宛先テーブルにマージします。この操作は、 MergeSnowflakeJournalTable プロセッサーによってトリガーされます。新しい変更がない場合、または MergeSnowflakeJournalTable キューで待機する新しいフローファイルがない場合、マージはトリガーされず、ウェアハウスは自動サスペンドします。

マージタスクのスケジュールCRONパラメーターでCRON式を使用して、ウェアハウスコストを制限し、マージをスケジュールされた時間のみに制限します。MergeSnowflakeJournalTableプロセッサーに送信されるフローファイルを制限し、マージは指定された期間にのみトリガーされます。スケジューリングの詳細については、 スケジューリング戦略 をご参照ください。

フローを実行する

  1. プレーンを右クリックし、 Enable all Controller Services を選択します。

  2. インポートしたプロセスグループを右クリックし、 Start を選択します。コネクタがデータの取り込みを開始します。