Openflow Connector for Oracle のインストールと構成

注釈

このコネクタは、 Snowflakeコネクタ規約 に従うものとします。

注釈

|OracleOFC|には、標準のコネクタ利用規約以外の追加利用規約も適用されます。詳しくは、`Openflow Connector for Oracle Addendum<https://www.snowflake.com/en/legal/optional-offerings/offering-specific-terms/openflow-oracle-terms/>`_を参照してください。

このトピックでは、|OracleOFC|コネクタをインストールして構成する手順について説明します。

データエンジニアとして、以下のタスクを実行してコネクタをインストールおよび構成します。

コネクタをインストールする

コネクタをインストールするには、データエンジニアとして次を実行します。

  1. Openflow概要ページに移動します。Featured connectors セクションで、 View more connectors を選択します。

  2. Openflowのコネクタページでコネクタを探し、 Add to runtime を選択します。

  3. Select runtime ダイアログで、Available runtimes ドロップダウンリストからランタイムを選択して Add をクリックします。

    注釈

    コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベースとスキーマをSnowflakeで作成したことを確認します。

  4. Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。

  5. Snowflakeアカウント認証情報でランタイムを認証します。

コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。

コネクタを構成する

コネクタを構成するには、データエンジニアとして次の手順を実行します。

  1. 追加されたランタイムを右クリックして、:ui:`Parameters`を選択します。

  2. 必要なパラメーター値を入力します。

    必要なパラメーター値について詳しくは、次のセクションを参照してください。

Snowflake宛先パラメーター

パラメーター

説明

必須

宛先データベース

The database where data is persisted. It must already exist in Snowflake. The name is case-sensitive. For unquoted identifiers, provide the name in uppercase.

有り

Destination Schema Pattern

A pattern for the names of destination schemas where data is persisted. The connector creates the schemas if they don't exist.

You can customize the pattern per ingested table using these optional variables:

  • ${source.database.name}: a source table's database.

  • ${source.schema.name}: a source table's schema.

  • ${source.table.name}: a source table's name.

For example, for a table with the qualified name source_db.tenant_a.data, the pattern prefix_${source.database.name}_${source.schema.name} evaluates to prefix_source_db_tenant_a.

To ingest all tables into a single schema, provide a schema name without any variables, like destination_schema.

重要

Don't change this setting after the connector has begun ingesting data. Changing this setting after ingestion has begun breaks the existing ingestion. If you must change this setting, create a new connector instance.

有り

Snowflake認証ストラテジー

以下を使用する場合:

  • Snowflake Openflow Deployment または BYOC:SNOWFLAKE_MANAGED_TOKEN を使用します。このトークンはSnowflakeによって自動的に管理されます。BYOC デプロイメントでは、 SNOWFLAKE_MANAGED_TOKEN を使用するために、事前に ランタイムロール が構成されている必要があります。

  • BYOC: 代わりに、BYOC では認証戦略の値として KEY_PAIR を使用できます。

有り

Snowflakeアカウント識別子

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR: Snowflake account name formatted as [organization-name]-[account-name] where data is persisted.

有り

Snowflake接続の戦略

KEY_PAIRを使用する場合は、Snowflakeに接続するための戦略を特定します。

  • STANDARD (デフォルト):Snowflakeサービスへの標準パブリックルーティングを使用して接続します。

  • PRIVATE_CONNECTIVITY:AWS PrivateLinkなど、サポート対象のクラウドプラットフォームに関連付けられたプライベートアドレスを使用して接続します。

KEY_PAIRを使用したBYOCのみに必要です。それ以外の場合は無視されます。

Snowflake秘密キー

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:認証に使用される RSA プライベートキーである必要があります。

    その RSA キーは PKCS8 標準に従ってフォーマットされ、標準の PEM ヘッダーとフッターを持つ必要があります。SnowflakeプライベートキーファイルまたはSnowflakeプライベートキーのいずれかを定義する必要があることに注意してください。

無し

Snowflake秘密キーファイル

以下を使用する場合:

  • Session token authentication strategy:プライベートキーファイルは空白である必要があります。

  • KEY_PAIR:Snowflakeへの認証に使用される RSA プライベートキーを含むファイルをアップロードします。これは、PKCS8 標準に従ってフォーマットされ、標準の PEM ヘッダーとフッターを含んでいる必要があります。ヘッダー行は -----BEGIN PRIVATE で始まります。プライベートキーファイルをアップロードするには、Reference asset チェックボックスを選択します。

無し

Snowflake秘密キーパスワード

以下を使用する場合

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:Snowflakeプライベートキーファイルに関連付けられたパスワードを提供します。

無し

Snowflakeロール

以下を使用する場合

  • Session Token Authentication Strategy:ランタイムに割り当てられたSnowflakeロール、またはこのSnowflakeロールに付与された子ロールを使用します。Openflow UIのランタイムのSnowflakeロールは、ランタイムの:ui:`More Options [⋮]`ボタンを展開して:ui:`Set Snowflake role`を選択することで見つけることができます。

  • KEY_PAIR Authentication Strategy:サービスユーザーのために構成された有効なロールを使用します。

有り

Snowflakeのユーザー名

以下を使用する場合

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:Snowflakeインスタンスへの接続に使用するユーザー名を提供します。

有り

オーバーサイズ値戦略

複製中にコネクタが内部サイズ制限(16MB)を超える値を処理する方法を決定します。可能な値は次のとおりです。

  • **失敗テーブル**(デフォルト):テーブルは永久に失敗したものとしてマークされ、そのテーブルの複製が停止します。

  • Nullの設定:値は宛先テーブルで``NULL``に置き換えられます。オーバーサイズ値を超えるテーブル内のデータが失われても許容できる場合、これを使用してテーブルの失敗を防ぎます。

無し

Snowflakeウェアハウス

クエリの実行に使用されるSnowflakeウェアハウス。

有り

Oracle取り込みパラメーター

パラメーター

説明

含まれるテーブル名

完全修飾テーブルパスのコンマ区切りリスト。テーブルは、完全修飾データベース、スキーマ、テーブル名の形式(DATABASE_NAME.SCHEMA_NAME.TABLE_NAME)を使用して指定する必要があります。

例: MYPDB.SALES.CUSTOMERS, MYPDB.SALES.ORDERS

含まれるテーブル正規表現

既存および新しいテーブルを自動的に含めるための、テーブルパスに一致する正規表現。正規表現パターンは、DATABASE_NAME.SCHEMA_NAME.TABLE_NAMEの3部構成の命名規則に一致する必要があります。

例: MYPDBデータベース内のSALESスキーマにあるすべてのテーブルに一致させるために:code:`MYPDB\.SALES\..*`とします。

列フィルター JSON

オプション。テーブルごとに含める列または除外する列を指定するフィルターオブジェクトのJSON配列。構文の詳細と例については、`テーブル内の列のサブセットを複製する方法`_を参照してください。

タスクスケジュール CRON をマージする

ジャーナルから宛先テーブルへのマージ操作がトリガーされるタイミングを定義するCRON式。たとえば、継続的なマージの場合は * * * * * ?とします。

オブジェクト識別子の解決

スキーマ名、テーブル名、列名などのソースオブジェクト識別子をSnowflakeに格納し、クエリする方法を指定します。この設定では、SQLクエリで二重引用符を使用する必要があるかどうかを指定します。

オプション1:デフォルトで、大文字と小文字を区別しない(推奨)。

  • 変換:識別子はすべて大文字に変換されます。たとえば、 My_TableMY_TABLE になります。

  • クエリ: SQL クエリは大文字と小文字を区別せず、 SQL 二重引用符は必要ありません。

    たとえば、 SELECT * FROM my_table;SELECT * FROM MY_TABLE; と同じ結果を返します。

注釈

Snowflakeでは、データベースオブジェクトに大文字と小文字が混在する名前が想定されない場合、このオプションを使用することを推奨しています。

オプション2: 大文字と小文字を区別する。

  • 変換:大文字小文字は維持されます。たとえば、 My_TableMy_Table のままです。

  • クエリ: SQL クエリは、データベースオブジェクトの大文字と小文字を正確に一致させるために、二重引用符を使用する必要があります。例: SELECT * FROM "My_Table";

重要

コネクタの取り込み開始後は、この設定を変更しないでください。取り込み開始後にこの設定を変更すると、既存の取り込みは中断されます。この設定を変更する必要がある場合は、新しいコネクタインスタンスを作成してください。

スナップショットのフェッチ戦略

スナップショットロードのフェッチ戦略を決定します。

  • SEQUENTIAL_BY_PRIMARY_KEY (デフォルト):主キーによって順次取得される固定サイズのバッチを使用します。

  • CONCURRENT_BY_ROWID:物理行IDの範囲によって境界されたチャンクにテーブルを分割し、各チャンクを並行して取得します。

Oracleソースパラメーター

パラメーター

説明

必須

Oracle接続URL

DBへのデータベース接続のJDBC URL。URLは、複製するデータを含むターゲットコンテナ(PDBまたはCDB)を指定する必要があります。たとえば、YOUR_DB_NAMEがPDBまたはCDBの名前である場合、:code:`jdbc:oracle:thin@<host>:<port>/YOUR_DB_NAME`とします。

SSLが有効な場合は、TCPSプロトコルを使用します(例:jdbc:oracle:thin:@tcps://<host>:<tcps_port>/YOUR_DB_NAME)。

注釈

コネクタは単一のデータベース/コンテナ内で機能します。JDBCURLが、複製するテーブルを保持するコンテナを直接指していることを確認してください。

有り

Oracleユーザー名

XStreamサーバーにアクセスできる接続ユーザーのユーザー名。

有り

Oracleパスワード

XStreamサーバーにアクセスできる接続ユーザーのパスワード。

有り

OracleSSLモード

Oracleデータベースへの接続に対するSSL暗号化を制御します。

  • **DISABLED**(デフォルト):SSLなしで接続します。

  • VERIFY_CA:SSLを使用して接続します。信頼できる認証機関がサーバー証明書を発行したことを検証します。

  • VERIFY_IDENTITY:SSLを使用して接続します。CA証明書を検証し、サーバーのホスト名が証明書のサブジェクトと一致することを確認します。

VERIFY_CAまたはVERIFY_IDENTITYに設定した場合は、Oracle Walletファイル名パラメーターも指定する必要があります。

有り

Oracle Walletファイル名

Oracle自動ログインウォレットファイル(cwallet.sso)を含むファイルをアップロードします。ウォレットには、SSL接続用の信頼できるサーバー証明書が含まれている必要があります。

ウォレットの作成について詳しくは、:ref:`label-configure_ssl_connections`を参照してください。

SSLモードがDISABLEDでない場合に必要です。

Oracleデータベースプロセッサ乗数

`Oracle Processor Core Factor Table<https://www.oracle.com/contracts/docs/processor-core-factor-table-070634.pdf>`_で説明されているコアプロセッサーライセンス要素

組み込みライセンスにのみ必要

Oracleデータベースプロセッサコア

Oracleデータベースのプロセッサコアの数。

組み込みライセンスにのみ必要

XStream請求の承認

ライセンス契約の確認

組み込みライセンスにのみ必要

XStreamアウトサーバー名

Oracleに既に存在する必要があるXStreamサーバーの名前。

有り

XStreamアウトサーバーURL

XStreamのデータベース接続のJDBC URLでは、OCIドライバーを使用する必要があります。例: jdbc:oracle:oci:@<host>:<port>/SID

SSLが有効な場合は、TCPSプロトコルを使用します(例:jdbc:oracle:oci:@tcps://<host>:<tcps_port>/SID)。

注釈

SSLモードが有効な場合、コネクタは``SSL_SERVER_DN_MATCH``および``MY_WALLET_DIRECTORY``をXStreamURLに自動的に追加します。これらを手動で含める必要はありません。

有り

テーブル複製の再開

(主キーが欠落している、またはスキーマの変更がサポートされていないなどの理由で) FAILED 状態のテーブルは、自動的に再開されません。テーブルが FAILED 状態になった場合、または複製を最初から再開する必要がある場合は、次の手順に従ってテーブルを削除し、複製に再度追加します。

注釈

主キーが欠落しているなど、ソーステーブルの問題が原因で障害が発生した場合は、続行する前にソースデータベースでその問題を解決します。

  1. Remove the table from replication, using one of the following methods:

    • Add the table to the Re-snapshot Table Exclusions parameter to temporarily exclude it from replication. This is convenient when the table is matched by an Included Table Regex that you don't want to change.

    • In the Ingestion Parameters context, either remove the table from Included Table Names or modify the Included Table Regex so the table is no longer matched.

  2. テーブルが削除されたことを確認します。

    1. Openflowランタイムキャンバスで、プロセッサーグループを右クリックし、Controller Services を選択します。

    2. コントローラーサービスをリストしたテーブルで、Table State Store 行を見つけ、行の右側にある縦3つのドットをクリックして、View State を選択します。

    重要

    続行する前に、テーブルの状態がこのリストから完全に削除されるまで待つ必要があります。この構成変更が完了するまで続行しないでください。

  3. 宛先をクリーンアップする:テーブルの状態が完全に削除されたと表示されたら、Snowflake で宛先テーブルを手動で DROP します。スナップショットフェーズ中に、コネクタは既存の宛先テーブルを上書きしないことに注意してください。テーブルがまだ存在する場合、複製は再度失敗します。オプションで、ジャーナルテーブルとストリームが不要になった場合は削除することもできます。

  4. Re-add the table by reversing the change you made in the first step: either remove the table from Re-snapshot Table Exclusions, or add it back to Included Table Names or Included Table Regex. The connector then re-snapshots the table.

  5. 再開を確認する:前述の指示に従って Table State Store をチェックします。テーブルの状態は、ステータス NEW で表示され、次に SNAPSHOT_REPLICATION に移行し、最後に INCREMENTAL_REPLICATION になります。

テーブルの列のサブセットを複製します。

コネクタは、テーブルごとに複製されるデータを構成列のサブセットにフィルターできます。主キー列は除外に関係なく常に含まれます。

列フィルターを適用するには、取り込みパラメーターコンテキストの:ui:`Column Filter JSON`パラメーターを、フィルターしたいテーブルごとに1つ、フィルターオブジェクトのJSON配列に設定します。

列は、名前または正規表現パターンによって含めたり除外したりできます。テーブルごとに単一の条件を適用することも、複数の条件を組み合わせることもできます。その場合、除外は常に包含よりも優先されます。

構文

配列の各オブジェクトはテーブルを識別し、含める列または除外する列を指定します。このコネクタは3部構成の完全修飾名(データベース、スキーマ、テーブル)を使用するため、各オブジェクトには、スキーマおよびテーブルフィールドに加えて``database``または``databasePattern``フィールドを含めることができます。

[
    {
        "database": "<database>" | "databasePattern": "<regex>",
        "schema": "<schema>" | "schemaPattern": "<regex>",
        "table": "<table>" | "tablePattern": "<regex>",
        "included": ["<column>", "<column>"],
        "excluded": ["<column>", "<column>"],
        "includedPattern": "<regex>",
        "excludedPattern": "<regex>"
    }
]

次のルールが適用されます。

  • 完全な名前の一致には``database``、schema、および``table``を使用し、正規表現の一致には``databasePattern``、schemaPattern、および``tablePattern``を使用します。同じオブジェクトで、フィールドとそのパターンバリアントの両方を使用することはできません(たとえば、``schema``と``schemaPattern``の両方を含めることはできません)。

  • includedexcludedincludedPattern、または``excludedPattern``の少なくとも1つを提供する必要があります。

  • 含めるフィルターと除外するフィルターの両方が指定されている場合は、除外が優先されます。

  • 複数のフィルターが同じテーブルに一致する場合、最後に一致するフィルターが使用され、完全一致がパターンベースのフィルターよりも優先されます。

  • 値はオブジェクトの配列にして、異なるテーブルに異なるフィルターを適用することができます。

名前で特定の列を含める:

[
    {
        "database": "my_db",
        "schema": "dbo",
        "table": "orders",
        "included": ["account_id", "status", "created_at"]
    }
]

名前で特定の列を除外する:

[
    {
        "database": "my_db",
        "schema": "dbo",
        "table": "orders",
        "excluded": ["internal_note", "debug_flag"]
    }
]

包含パターンと特定の除外を組み合わせる(たとえば、``admin_email``を除くすべてのメール列を含める):

[
    {
        "database": "my_db",
        "schema": "dbo",
        "table": "contacts",
        "includedPattern": ".*_email",
        "excluded": ["admin_email"]
    }
]

データベースパターンと正確なスキーマおよびテーブル名を組み合わせて、データベース全体にフィルターを適用する:

[
    {
        "databasePattern": "prod_.*",
        "schema": "dbo",
        "table": "customers",
        "excluded": ["internal_note"]
    }
]

異なるテーブルに異なるルールを適用するために複数のフィルターオブジェクトを渡す:

[
    {"database": "my_db", "schema": "dbo", "table": "orders", "included": ["account_id", "status"]},
    {"database": "my_db", "schema": "dbo", "table": "customers", "excludedPattern": ".*_internal"}
]

フローを実行する

  1. プレーンを右クリックし、 Enable all Controller Services を選択します。

  2. インポートしたプロセスグループを右クリックし、 Start を選択します。コネクタがデータの取り込みを開始します。

次のステップ

  • (オプション):doc:スナップショットなしの増分複製を設定する<incremental-replication>

  • フローをモニターする