Openflow Connector for SQL Server の設定

注釈

このコネクタは、 Snowflakeコネクタ規約 に従うものとします。

このトピックでは、|Sqlserver|を設定する方法について説明します。

増分ロードプロセスについては、 増分複製 をご参照ください。

前提条件

コネクタを設定する前に、以下の前提条件を満たしていることを確認してください。

  1. Openflow Connector for SQL Server について を確認してください。

  2. サポートされている SQL Serverバージョン を確認してください。

  3. ランタイムの展開を設定していることを確認してください。詳細については、次のトピックをご参照ください。

  4. |OFSFSPCS-plural|を使用する場合、 必要なドメインの構成 を精査し、 SQL サーバー コネクタに必要なドメインへのアクセス許可を付与していることを確認してください。

SQL Serverインスタンスの設定

コネクタを設定する前に、SQL Server環境で次のタスクを実行してください。

注釈

これらのタスクはデータベース管理者として実行する必要があります。

  1. 次のSQL Server の例に示すように、複製する予定の データベーステーブル で変更の追跡を有効にします。

    ALTER DATABASE <database>
      SET CHANGE_TRACKING = ON
      (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON);
    
    ALTER TABLE <schema>.<table>
      ENABLE CHANGE_TRACKING
      WITH (TRACK_COLUMNS_UPDATED = ON);
    
    Copy

    注釈

    複製する予定のすべてのデータベースとテーブルに対してこれらのコマンドを実行します。

    コネクタは、複製を開始する前に、データベースとテーブルで変更追跡を有効にする必要があります。複製を予定しているすべてのテーブルで、変更の追跡が有効になっていることを確認してください。コネクタの実行中に、追加のテーブルの変更追跡を有効にすることもできます。

  2. Create a login for the SQL Serverインスタンスのログインを作成する:

    CREATE LOGIN <user_name> WITH PASSWORD = '<password>';
    
    Copy

    このログインは、複製する予定のデータベースのユーザーを作成するために使用されます。

  3. 各データベースで次のSQL Serverコマンドを実行して、複製するデータベースごとにユーザーを作成します。

    USE <source_database>;
    CREATE USER <user_name> FOR LOGIN <user_name>;
    
    Copy
  4. 複製する各データベースのユーザーにSELECTおよびVIEW CHANGE TRACKING権限を付与します。

    GRANT SELECT ON <database>.<schema>.<table> TO <user_name>;
    GRANT VIEW CHANGE TRACKING ON <database>.<schema>.<table> TO <user_name>;
    
    Copy

    複製する予定のすべてのテーブルに対して、各データベースでこれらのコマンドを実行します。これらの権限は、前のステップで作成した各データベースのユーザーに付与する必要があります。

  5. (オプション)SSL接続を構成します。

    SSL接続を使用してSQL Server に接続する場合は、データベースサーバーのルート証明書を作成します。これはコネクタを構成するときに必要です。

Snowflake環境のセットアップ

Snowflake管理者は以下のタスクを実行します。

  1. 複製されたデータを格納するために、Snowflake内に宛先データベースを作成します。

    CREATE DATABASE <destination_database>;
    
    Copy
  2. Snowflakeの サービスユーザー を作成します。

    CREATE USER <openflow_user>
      TYPE = SERVICE
      COMMENT='Service user for automated access of Openflow';
    
    Copy
  3. コネクタ用Snowflakeロールを作成し、必要な権限を付与します。

    CREATE ROLE <openflow_role>;
    GRANT ROLE <openflow_role> TO USER <openflow_user>;
    GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>;
    GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
    
    Copy

    このロールを使用して、コネクタのSnowflakeデータベースへのアクセスを管理します。

    宛先データベースにオブジェクトを作成するには、アクセス管理に使用するロールに、データベースに対する USAGE権限およびCREATE SCHEMA権限 を付与する必要があります。

  4. コネクタ用にSnowflakeウェアハウスを作成し、必要な権限を付与します。

    CREATE WAREHOUSE <openflow_warehouse> WITH
      WAREHOUSE_SIZE = 'XSMALL'
      AUTO_SUSPEND = 300
      AUTO_RESUME = TRUE;
    GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
    
    Copy

    Snowflake recommends starting with a XSMALL warehouse size, then experimenting with size depending on the number of tables being replicated and the amount of data transferred. Large numbers of tables typically scale better with multi-cluster warehouses, rather than a larger warehouse size. For more information, see multi-cluster warehouses.

  5. キーペア認証用のパブリックキーとプライベートキーを設定します。

    1. 安全なキーのペア(パブリックおよび秘密)を作成します。

    2. ユーザーのプライベートキーをファイルに保存し、コネクタの構成に提供します。

    3. Snowflakeユーザーに公開キーを割り当てます:

      ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
      
      Copy

      詳細については、 キーペア認証とキーペアローテーション をご参照ください。

コネクタを構成する

データエンジニアは、次のセクションを使用してコネクタをインストールおよび構成します。

コネクタをインストールする

  1. Openflow概要ページに移動します。Featured connectors セクションで、 View more connectors を選択します。

  2. Openflowのコネクタページでコネクタを探し、 Add to runtime を選択します。

  3. Select runtime ダイアログで、Available runtimes ドロップダウンリストからランタイムを選択して Add をクリックします。

    注釈

    コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベースとスキーマをSnowflakeで作成したことを確認します。

  4. Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。

  5. Snowflakeアカウント認証情報でランタイムを認証します。

コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。

コネクタを構成する

コネクタを構成するには、次のステップを実行します。

  1. インポートしたプロセスグループを右クリックし、 Parameters を選択します。

  2. フローパラメーター の説明に従って、必要なパラメーター値を入力します。

フローパラメーター

まず、SQLServerのソースパラメーターコンテキストのパラメーターを設定し、次にSQLServerの宛先パラメーターコンテキストのパラメーターを設定します。これを完了したら、コネクタを有効にします。コネクタはSQLServerとSnowflakeの両方に接続し、実行を開始します。ただし、複製するテーブルが構成に明示的に追加されるまで、コネクタはデータを複製しません。

複製する特定のテーブルを構成するには、 SQLServer 取り込みパラメーターコンテキストを編集します。SQLServer 取り込みパラメーターコンテキストに変更を適用すると、コネクタが構成を取得し、各テーブルの複製ライフサイクルが開始します。

SQLServer ソースパラメーターコンテキスト

パラメーター

説明

SQL Server接続 URL

ソースデータベースへの完全な JDBC URL。

例:

  • jdbc:sqlserver://example.com:1433;encrypt=false;

SQL Server JDBC ドライバー

Reference asset チェックボックスを選択し、 SQL サーバー JDBC ドライバー をアップロードします。

SQL Serverユーザー名

コネクタのユーザー名。

SQL Serverパスワード

コネクタのパスワード。

SQLServer 宛先パラメーターコンテキスト

パラメーター

説明

必須

宛先データベース

データが永続化されるデータベース。Snowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。

有り

Snowflake認証ストラテジー

以下を使用する場合:

  • Snowflake Openflow Deployment or BYOC: Use SNOWFLAKE_MANAGED_TOKEN. This token is managed automatically by Snowflake. BYOC deployments must have previously configured runtime roles to use SNOWFLAKE_MANAGED_TOKEN.

  • BYOC: 代わりに、BYOC では認証戦略の値として KEY_PAIR を使用できます。

有り

Snowflakeアカウント識別子

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR: データが永続化される[organization-name]-[account-name]形式のSnowflakeアカウント名。

有り

Snowflake接続の戦略

KEY_PAIRを使用する場合は、Snowflakeに接続するための戦略を特定します。

  • STANDARD (デフォルト):Snowflakeサービスへの標準パブリックルーティングを使用して接続します。

  • PRIVATE_CONNECTIVITY:AWS PrivateLinkなど、サポート対象のクラウドプラットフォームに関連付けられたプライベートアドレスを使用して接続します。

KEY_PAIRを使用したBYOCのみに必要です。それ以外の場合は無視されます。

Snowflakeオブジェクト識別子の解決

スキーマ名、テーブル名、列名などのソースオブジェクト識別子をSnowflakeに格納し、クエリする方法を指定します。この設定は、SQLクエリで二重引用符を使用する必要があるかどうかを指定します。

オプション1:デフォルトで、大文字と小文字を区別しない(推奨)。

  • 変換:識別子はすべて大文字に変換されます。たとえば、 My_TableMY_TABLE になります。

  • クエリ: SQL クエリは大文字と小文字を区別せず、 SQL 二重引用符は必要ありません。

    たとえば、 SELECT * FROM my_table;SELECT * FROM MY_TABLE; と同じ結果を返します。

注釈

Snowflakeでは、データベースオブジェクトに大文字と小文字が混在する名前が想定されない場合、このオプションを使用することを推奨しています。

重要

コネクタの取り込み開始後は、この設定を変更しないでください。取り込み開始後にこの設定を変更すると、既存の取り込みは中断されます。この設定を変更する必要がある場合は、新しいコネクタインスタンスを作成してください。

オプション2: 大文字と小文字を区別する。

  • 変換:大文字小文字は維持されます。たとえば、 My_TableMy_Table のままです。

  • クエリ: SQL クエリは、データベースオブジェクトの大文字と小文字を正確に一致させるために、二重引用符を使用する必要があります。例: SELECT * FROM "My_Table";

注釈

Snowflakeでは、レガシーまたは互換性の理由からソースの大文字小文字の区別を保持する必要がある場合、このオプションを使用することを推奨しています。たとえば、 MY_TABLEmy_table のように大文字と小文字だけが異なるテーブル名がソースデータベースに含まれている場合、大文字と小文字を区別しない比較を使用すると、名前の衝突が発生します。

有り

Snowflake秘密キー

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:認証に使用される RSA プライベートキーである必要があります。

    その RSA キーは PKCS8 標準に従ってフォーマットされ、標準の PEM ヘッダーとフッターを持つ必要があります。SnowflakeプライベートキーファイルまたはSnowflakeプライベートキーのいずれかを定義する必要があることに注意してください。

無し

Snowflake秘密キーファイル

以下を使用する場合:

  • Session token authentication strategy: プライベートキーファイルは空白である必要があります。

  • KEY_PAIR:Snowflakeへの認証に使用される RSA プライベートキーを含むファイルをアップロードします。これは、PKCS8 標準に従ってフォーマットされ、標準の PEM ヘッダーとフッターを含んでいる必要があります。ヘッダー行は で始まります -----BEGIN PRIVATE.プライベートキーファイルをアップロードするには、Reference asset チェックボックスを選択します。

無し

Snowflake秘密キーパスワード

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:Snowflakeプライベートキーファイルに関連付けられたパスワードを提供します。

無し

Snowflakeロール

以下を使用する場合:

  • Session Token Authentication Strategy:ランタイムロールを使用します。ランタイムの View Details に移動すると、Openflow UI でランタイムロールを見つけることができます。

  • KEY_PAIR Authentication Strategy:サービスユーザーのために構成された有効なロールを使用します。

有り

Snowflakeのユーザー名

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:Snowflakeインスタンスへの接続に使用するユーザー名を提供します。

有り

Snowflakeウェアハウス

クエリの実行に使用されるSnowflakeウェアハウス。

有り

SQLServer 取り込みパラメーターコンテキスト

パラメーター

説明

含まれるテーブル名

データベースとスキーマを含むソーステーブルパスのコンマ区切りリスト。例:

database_1.public.table_1, database_2.schema_2.table_2

含まれるテーブル正規表現

データベース名とスキーマ名を含むテーブルパスに一致する正規表現。式に一致するすべてのパスが複製され、後で作成されるパターンに一致する新しいテーブルも自動的に含められます。例:

database_name\.public\.auto_.*

フィルター JSON

JSON 完全修飾テーブル名のリストと、複製に含める必要がある列名の正規表現パターンが含まれます。

次の例では、 my_db データベースのパブリックスキーマの table1 にある name で終わるすべての列が含まれます。

[ {"database":"my_db", "schema":"public", "table":"table1", "includedPattern":".*name"} ]

タスクスケジュール CRON をマージする

ジャーナルから宛先テーブルへのマージ操作がトリガーされる期間を定義する CRON 式。連続的なマージやタイムスケジュールでウェアハウスの実行時間を制限したい場合は、 * * * * * ? に設定します。

例:

  • * 0 * * * ? という文字列は、丸1時間で1分間のマージをスケジュールしたいことを示しています。

  • * 20 14 ? * MON-FRI という文字列は、毎週月曜日から金曜日の2:20 PM にマージをスケジュールしたいことを示しています。

その他の情報と例については、 Quartzドキュメント のcronトリガーチュートリアルをご参照ください。

テーブルを削除し、複製に再追加する

複製からテーブルを削除するには、複製パラメーターコンテキストの Included Table Names または Included Table Regex パラメーターからテーブルを削除します。

後でテーブルを複製に再度追加するには、まずはSnowflake内の対応する宛先テーブルを削除します。その後、テーブルを Included Table Names または Included Table Regex パラメーターに再度追加します。これにより、テーブルの複製プロセスが新しく開始されるようになります。

このアプローチは、失敗したテーブル複製シナリオからの復旧にも使用できます。

テーブルの列のサブセットを複製します。

コネクタは、テーブルごとに複製されたデータを構成された列のサブセットにフィルタリングします。

列にフィルターを適用するには、複製パラメーターコンテキストのColumn Filterプロパティを変更し、フィルターを適用したいテーブルごとに1エントリずつ、構成の配列を追加します。

名前またはパターンで列を含める、または除外します。テーブルごとに単一の条件を適用することも、複数の条件を組み合わせることもできます。その場合、除外は常に包含よりも優先されます。

以下の例は、利用可能なフィールドを示しています。schema および table フィールドは必須です。includedexcludedincludedPatternexcludedPattern のうち1つ以上が必要です。

[
    {
        "schema": "<source table schema>",
        "table" : "<source table name>",
        "included": ["<column name>", "<column name>"],
        "excluded": ["<column name>", "<column name>"],
        "includedPattern": "<regular expression>",
        "excludedPattern": "<regular expression>",
    }
]
Copy

テーブルでデータ変更を追跡する

コネクタは、ソーステーブルからのデータの現在の状態と、すべての変更セットからのすべての行のすべての状態を複製します。このデータは、宛先テーブルと同じスキーマで作成されたジャーナルテーブルに格納されます。

ジャーナルテーブル名の形式は次のとおりです。<source table name>_JOURNAL_<timestamp><schema generation> where <timestamp> is the value of epoch seconds when the source table was added to replication, and <schema generation> は整数で、ソーステーブルのスキーマが変更されるごとに増加します。その結果、スキーマが変更されるソーステーブルには、複数のジャーナルテーブルがあります。

レプリケーションからテーブルを削除し、再度追加すると、 <タイムスタンプ> の値が変更され、 <スキーマ生成>1 から再び開始されます。

重要

Snowflakeは、ジャーナルテーブルの構造を一切変更しないことを推奨します。コネクタはそれらを使用して、複製プロセスの一部として宛先テーブルを更新します。

コネクタはジャーナルテーブルをドロップすることはありませんが、複製されたソーステーブルごとに最新のジャーナルを使用し、ジャーナル上の追加専用ストリームのみを読み取ります。ストレージを回収するには、以下を実行します。

  • すべてのジャーナルテーブルをいつでも切り捨てます。

  • 複製から削除されたソーステーブルに関連するジャーナルテーブルをドロップします。

  • アクティブに複製されたテーブルの最新の生成ジャーナルテーブルを除いて、すべてをドロップします。

例えば、コネクタがソーステーブル orders をアクティブに複製するように設定されており、以前にテーブル customers を複製から削除した場合、以下のようなジャーナルテーブルが存在する可能性があります。この場合、 orders_5678_2除いて、それらのすべてをドロップできます。

customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2

マージタスクのスケジュールを構成する

コネクタはウェアハウスを使用して、変更データキャプチャ(CDC)データを宛先テーブルにマージします。この操作は、 MergeSnowflakeJournalTable プロセッサーによってトリガーされます。新しい変更がない場合、または MergeSnowflakeJournalTable キューで待機する新しいフローファイルがない場合、マージはトリガーされず、ウェアハウスは自動サスペンドします。

マージタスクのスケジュールCRONパラメーターでCRON式を使用して、ウェアハウスコストを制限し、マージをスケジュールされた時間のみに制限します。MergeSnowflakeJournalTableプロセッサーに送信されるフローファイルを制限し、マージは指定された期間にのみトリガーされます。スケジューリングの詳細については、 スケジューリング戦略 をご参照ください。

フローを実行する

  1. プレーンを右クリックし、 Enable all Controller Services を選択します。

  2. インポートしたプロセスグループを右クリックし、 Start を選択します。コネクタがデータの取り込みを開始します。