Openflow Connector for Salesforce Bulk API について¶
注釈
このコネクタは、 Snowflakeコネクタ規約 に従うものとします。
このトピックでは、Openflow Connector for Salesforce Bulk API の基本的な概念、ワークフロー、および制限について説明します。
Salesforce Data Cloudとのゼロコピー統合¶
Snowflakeは、Salesforceとのゼロコピーの双方向共有および統合を提供します。この統合は、Salesforce Data Cloudを使用しており、ほぼリアルタイムの双方向の統合を必要とする場合に推奨されます。
Salesforce Data Cloud とのゼロコピー統合の詳細については、以下のブログ投稿をご参照ください。
Openflow Connector for Salesforce Bulk API について¶
Openflow Connector for Salesforce Bulk API は複製ベースのデータ統合を提供します。このコネクタは、Salesforce Data Cloudを使用せず、完全に管理されたSnowflake Openflowコネクタを希望するユーザー向けに設計されています。コネクタはパブリック Salesforce REST APIs を使用して、ユーザーが定義した頻度でSalesforceからSnowflakeにデータを複製します。コネクタは、変更データキャプチャ(CDC)をサポートし、SnowflakeのデータとSalesforceと同期させて維持します。
特定のユースケースに応じて、どちらか一方または両方のタイプのデータ統合を使用できます。このトピックでは、Openflow Connector for Salesforce Bulk API を設定して使用し、SalesforceからSnowflakeにデータを複製する方法について説明します。
ユースケース¶
Openflow Connector for Salesforce Bulk API を使用すると、ユーザーが指定した頻度で標準オブジェクトまたはカスタムオブジェクトをSalesforceからSnowflakeに複製し、Snowflakeで最新の状態に保つことができます。
ワークフロー¶
以下のワークフローは、|SalesforceBulkAPIOF|をセットアップして使用する手順の詳細を示しています。
Salesforce管理者は、Salesforceで外部クライアントアプリを作成および構成し、特定のユーザーに対して承認します。
Openflow管理者は以下のタスクを実行します。
コネクタのサービスユーザー、コネクタのウェアハウス、複製されたデータとスキーマを格納する宛先データベースを作成します。
コネクタをインストールします。
フローテンプレートに必要なパラメーターを指定します。
データエンジニアはフローを実行して、オブジェクトをSalesforceからSnowflakeに複製します。
制限事項¶
コネクタを使用する場合は、次の制限を考慮してください。
カスタムSalesforceドメインはサポートされていません。
オブジェクト関係のトラバースと関連オブジェクトのフェッチはサポートされていません。
コネクタは、Snowflakeのハード削除をサポートしていません。宛先テーブルでクエリを実行して、
isDeleted列がtrueであるすべての行を削除するか、宛先テーブルの完全更新を実行して「ハード削除」を反映することができます。location、address、または``base64`` 型のフィールドはサポートされておらず、無視されます。複数のSalesforceインスタンスのデータをSnowflakeの単一のデータベースに統合することはできません。単一のSalesforceインスタンス組織からのデータは、Snowflakeの単一のデータベースにインジェストされます。複製されたSalesforceオブジェクトごとに、このデータベースにテーブルが作成されます。
Salesforceの記録に添付されたファイルは無視されます。
数式フィールドは増分的に取り込むことはできません。
認証¶
コネクタは、外部クライアントアプリ経由の JWT 認証メソッドを使用してSalesforceに接続し、データを取得します。Salesforceでクライアントアプリを構成する方法については、Openflow Connector for Salesforce Bulk API:Salesforceの設定 のドキュメントをご参照ください。
複製のライフサイクル¶
コネクタは、初期複製と増分複製の2つのステージでデータを複製します。
初期複製¶
コネクタはSalesforce Bulk API 2.0を呼び出して、コネクタ構成で指定された標準オブジェクトとカスタムオブジェクトを検出します。コネクタは Bulk API 2.0 API の制限を尊重します。
コネクタは、カスタムオブジェクトまたは標準オブジェクトごとに1つのテーブルを作成し、各フィールドに1つの列を使用します。
コネクタは、初期ロードにSnowpipe Streamingを使用して、Salesforceオブジェクトのフィールドの値に基づいてテーブルに行を挿入します。
増分複製¶
増分更新では、コネクタパラメーターで構成できるSnowflakeウェアハウスを使用します。レイテンシとデータ鮮度の要件に応じて、更新の頻度を1分から24時間まで構成できます。これにより、Snowflake内のテーブルが更新される頻度が決まります。
指定した更新頻度を使用して、コネクタはSalesforce Bulk API を呼び出して、以前に取り込まれたオブジェクトの変更を検出します。コネクタは、Salesforceオブジェクトの特定のタイムスタンプフィールドをチェックすることにより、変更された記録を識別します。
ほとんどのオブジェクトに対して、コネクタは SystemModstamp フィールドを使用します。SystemModstamp が使用できない場合、コネクタは優先順位に従って次のフィールドの使用を試行します。
LastModifiedDateCreatedDateLoginTime
注釈
履歴テーブル(履歴追跡が有効になっているオブジェクト)の場合、コネクタは常に CreatedDate フィールドを使用して変更を検出します。
次に、コネクタはSnowpipe Streamingを使用して増分データをステージングテーブルにプッシュし、マージクエリを実行してデータを最終宛先テーブルにロードします。
スキーマの進化¶
コネクタは、Salesforceでソースオブジェクトが変更された場合のスキーマの進化をサポートしています。
- 新しいフィールドがソースオブジェクトに追加されたとき:
コネクタは、Snowflakeの宛先テーブルに新しい列を追加します。
- ソースオブジェクトで既存のフィールドの名前が変更された場合:
コネクタは、名前の変更をフィールドの削除とフィールドの追加の両方として扱います。フィールドの追加により、宛先テーブルに新しい列が追加されます。フィールドの削除は、次に説明するように処理されます。
- ソースオブジェクトで既存のフィールドが削除された場合:
コネクタは、次の3つの戦略をサポートしています。
削除:Snowflakeの宛先テーブルの対応する列を削除します。これがデフォルトの動作です。
無視:ソース内の削除されたフィールドを無視し、将来的にスキップします。
名前変更:宛先テーブルの削除されたフィールドの名前を変更します。
たとえば、削除戦略が Ignore に設定され、Salesforceオブジェクトのフィールドの名前が変更された場合、Snowflakeの既存の列は変更されず、新しいフィールド名を持つ新しい列が追加されます。
オブジェクトの削除方法¶
オブジェクトがSalesforceで削除された場合、コネクタはSnowflakeからオブジェクトを「ハード削除」しません。コネクタは、Salesforceで削除されたオブジェクトに対して「ソフト削除」を実行し、対応するSnowflakeテーブルの isDeleted 列を true に設定することで、ソースオブジェクトが削除されたことを示します。
コネクタは「ハード削除」をサポートしていません。宛先テーブルでクエリを実行して、isDeleted 列が true であるすべての行を削除するか、宛先テーブルの完全更新を実行して「ハード削除」を反映することができます。
コネクタが一時停止または停止された場合など、コネクタが実行されていないときにSalesforceでオブジェクトが削除され、Salesforceのごみ箱から消去された場合、コネクタは削除操作を実行できないことがあります。このような状況で回復するには、宛先テーブルのフルリフレッシュを実行する必要があります。
自動再試行の処理¶
コネクタは、指数バックオフ戦略を使用して、失敗した操作または API エラーを自動的に再試行します。コネクタは最初の再試行の前に1秒間待機し、その後の再試行ごとに待機時間を2倍(2秒、4秒など)にします。失敗が続くと、コネクタは次回のスケジュールされた実行まで再試行を停止します。このアクティビティは :doc:`イベントテーブル </user-guide/data-integration/openflow/monitor>`で監視できます。
複数のコネクタインスタンスを使用して異なる同期スケジュールを処理する¶
異なる頻度で異なるオブジェクトを同期する必要がある場合(例えば、30分ごとのオブジェクトと24時間ごとのオブジェクト、など)、Snowflakeは同じランタイム内に2つの別々のコネクタインスタンスをデプロイすることをお勧めします。その後、インスタンスごとに同期パラメーターを個別に構成できます。
注釈
同じランタイムで複数のコネクタインスタンスをデプロイしても、追加コストは発生しません。
同様に、数式の制限を回避するためなど、コネクタが実行されるたびに一部のオブジェクトを完全にフェッチする必要がある場合、Snowflakeでは、同じランタイム内に2つの個別のコネクタインスタンスをデプロイし、各インスタンスのパラメーターを構成することをお勧めします。
次のステップ¶
コネクタの設定方法については、次のトピックをご参照ください。