チュートリアル: Snowflake Native SDK for Connectors のJavaコネクタテンプレート¶
概要¶
Snowflake Native SDK for Connectors を使用したコネクタテンプレートのチュートリアルへようこそ。このガイドでは、シンプルなConnector Native Applicationの設定について説明します。
このチュートリアルでは、次の方法を学習します。
Connector Native Applicationのデプロイ
データを取り込むためのテンプレートコネクタを構成する
テンプレートコネクタを自分のニーズに合わせてカスタマイズする
テンプレートにはコード内にさまざまな役立つコメントが含まれており、変更が必要な特定のファイルを簡単に見つけることができます。次のキーワードを含むコメントを探します。そのキーワードを使用すると、独自のコネクタを実装するのに役立ちます。
TODO
TODO: HINT
TODO: IMPLEMENT ME
このチュートリアルを始める前に、次の推奨コンテンツを確認して準備してください。
前提条件¶
開始する前に、次の要件を満たしていることを確認してください。
ACCOUNTADMIN
ロールを持つ Snowflake アカウントへのアクセスこのチュートリアルに従う間、 コネクタ用Snowflake Native SDK を確認し、開いておいてください。
レビュー チュートリアル: Snowflake Native SDK for Connectors のJavaコネクタの例
このチュートリアルでは、このテンプレートに基づいたコネクタの例を使用しており、様々なコンポーネントの実装例を確認するためにリファレンスすることができます。
ローカル環境の準備¶
クローニングを進める前に、必要なソフトウェアがすべてマシンにインストールされていることを確認し、コネクタテンプレートをクローンする必要があります。
Javaのインストール¶
Snowflake Native SDK for Connectors Java LTS (Long-Term Support) バージョン11以上が必要です。お使いのマシンにJavaの最低必要バージョンがインストールされていない場合は、Oracle Javaまたは OpenJDK のいずれかをインストールする必要があります。
Oracle Java¶
JDK の最新リリース LTS は、Oracle NFTC のもと、無料でダウンロードし、コストなしで使用することができます。ダウンロードとインストール方法については、 Oracle のページ をご覧ください。
OpenJDK¶
OpenJDK はJavaのオープンソース実装です。ダウンロードおよびインストール方法については、 openjdk.org および jdk.java.net をご参照ください。
Eclipse Temurin や Amazon Corretto など、サードパーティのOpenJDK バージョンを使用することもできます。
Snowflake CLI の構成¶
コネクタのビルド、デプロイ、インストールには、 Snowflake CLI ツールが必要です。お使いのマシンに Snowflake CLI がインストールされていない場合は、 こちら の指示に従ってインストールしてください。
ツールのインストール後、 構成ファイル で Snowflake への接続を構成する必要があります。
接続が構成されていない場合は、 native_sdk_connection
という名前の接続を新規に作成してください。 deployment/snowflake.toml
ファイルに接続例があります。
すでに接続が構成されていて、それをコネクタで使用したい場合は、このチュートリアルでこの接続を使用するときは、 native_sdk_connection
の代わりにその名前を使用してください。
テンプレートクローニング¶
コネクタ・テンプレートをクローンするには、次のコマンドを使用します。
snow init <project_dir> \
--template-source https://github.com/snowflakedb/connectors-native-sdk \
--template templates/connectors-native-sdk-template
<project_dir>
の代わりに、コネクタの Java プロジェクトを作成するディレクトリ名(存在してはなりません)を入力します。
コマンド実行後、アプリケーション・インスタンスとステージ名構成のための追加情報の入力を求められます。有効な引用符で囲まれていないSnowflake識別子であれば、どのような名前を指定してもかまいません。また、Enterをクリックすると、角括弧で囲まれたデフォルト値が使用されます。
カスタムアプリケーション名とステージ名を提供するコマンド実行例:
$ snow init my_connector \
--template-source https://github.com/snowflakedb/connectors-native-sdk \
--template templates/connectors-native-sdk-template
Name of the application instance which will be created in Snowflake [connectors-native-sdk-template]: MY_CONNECTOR
Name of the schema in which the connector files stage will be created [TEST_SCHEMA]:
Name of the stage used to store connector files in the application package [TEST_STAGE]: CUSTOM_STAGE_NAME
Initialized the new project in my_connector
コネクタのビルド、展開、クリーンアップ¶
このテンプレートは、修正前であっても、すぐに導入することができます。以下のセクションでは、コネクタのビルド、デプロイ、インストール方法を説明します。
コネクタの作成¶
Snowflake Native SDK for Connectors を使用して作成されたコネクタの構築は、一般的な Java アプリケーションの構築とは少し異なります。ソースから.jarアーカイブをビルドするだけでなく、やらなければならないことがいくつかあります。アプリケーションの構築は以下の手順で行います。
カスタム内部コンポーネントのビルドディレクトリへのコピー
コンポーネントのビルドディレクトリへ SDK をコピーする
内部コンポーネントのコピー¶
このステップでは、コネクタ .jar ファイルをビルドし、それを(UI、マニフェストおよびセットアップ ファイルとともに) sf_build
ディレクトリにコピーします。
このステップを実行するには、コマンドを実行します: ./gradlew copyInternalComponents
.
SDK コンポーネントのコピー¶
このステップでは、 SDK .jarファイル(コネクタGradleモジュールの依存関係として追加)を sf_build
ディレクトリにコピーし、.jarアーカイブからバンドルされた.sqlファイルを抽出します。
これらの.sqlファイルによって、アプリケーションのインストール中にどのプロバイダーオブジェクトが作成されるかをカスタマイズすることができます。オブジェクトの省略は、間違って実行するといくつかの機能を失敗させる可能性があるため、初めてのユーザーにはカスタマイズをお勧めしません。テンプレート・コネクタ・アプリケーションは all.sql
ファイルを使用し、推奨される SDK オブジェクトをすべて作成します。
このステップを実行するには、コマンドを実行します: ./gradlew copySdkComponents
.
コネクタの展開¶
Native Appsをデプロイするには、Snowflake内でアプリケーションパッケージを作成する必要があります。その後、 sf_build
ディレクトリのすべてのファイルを Snowflake にアップロードする必要があります。
アプリケーションインスタンスはステージングされたファイルから直接作成できます。このアプローチでは、バージョンとアプリケーションのインスタンスを再作成することなく、ほとんどのコネクタファイルの変更を確認できます。
以下の演算子を行います。
アプリケーションパッケージがまだ存在しない場合は、新しいアプリケーションパッケージを作成します。
パッケージ内にスキーマとステージングされたファイルを作成します。
sf_build
ディレクトリからステージにファイルをアップロードします(このステップには時間がかかる場合があります)。
コネクタをデプロイするには、コマンドを実行します: snow app deploy --connection=native_sdk_connection
.
snow app deploy
コマンドの詳細については、 snow app deploy をご参照ください。
作成されたアプリケーションパッケージは、アカウントの Snowflake UI の App packages
タブの Data products
カテゴリに表示されます。
コネクタをインストールする¶
アプリケーションのインストールはプロセスの最後のステップです。先に作成したアプリケーションパッケージからアプリケーションを作成します。
コネクタをインストールするには、コマンドを実行します: snow app run --connection=native_sdk_connection
.
snow app run
コマンドの詳細については、 snow app run をご参照ください。
インストールされたアプリケーションは、アカウントの Snowflake UI の Data products
カテゴリにある Installed apps
タブに表示されます。
コネクタファイルの更新¶
コネクタファイルを変更したい場合は、変更したファイルをアプリケーションパッケージのステージに簡単にアップロードできます。アップロードコマンドは、どのファイルが更新されたかに依存します。
アップデートコマンドを実行する前に、 sf_build
ディレクトリにコネクタの新しいファイルをコピーする必要があります。 ./gradlew copyInternalComponents
UI .pyファイルまたはコネクタ.javaファイル¶
snow app deploy --connection=native_sdk_connection
コマンドを使用すると、現在のアプリケーション・インスタンスは再インストールせずに新しいファイルを使用します。
setup.sql または manifest.yml ファイル¶
snow app run --connection=native_sdk_connection
コマンドを使用すると、新しいファイルがステージにアップロードされた後、現在のアプリケーションインスタンスが再インストールされます。
クリーンアップ¶
チュートリアルが終了した後、あるいは何らかの理由でアプリケーションとそのパッケージを削除したい場合は、コマンドを使ってアカウントから完全に削除することができます。
snow app teardown --connection=native_sdk_connection --cascade --force
--cascade
オプションは、アカウント管理者に所有権を移さずに移行先データベースを削除するために必要です。実際のコネクタでは、取り込まれたデータを保持するためにデータベースを削除すべきではありません。データベースは、アカウント管理者が所有するか、アンインストール前に所有権を譲渡する必要があります。
インジェストが構成されていなくても、一時停止または削除されるまで、コネクタはクレジットを消費します!
前提条件のステップ¶
インストール直後、コネクタはウィザード段階にあります。このフェーズは、エンドユーザーに必要なすべての構成をガイドするいくつかのステップで構成されます。
最初のステップは前提条件のステップです。これはオプションであり、すべてのコネクタに必要なわけではありません。前提条件は通常、アプリケーション外部のユーザーから要求されるアクションで、例えば SQL ワークシートでクエリを実行したり、ソースシステム側で構成を行うなどです。
前提条件の詳細については、次をご参照ください。 前提条件
各前提条件の内容は、コネクタ内にある STATE.PREREQUISITES
テーブルから直接取得されます。 setup.sql
スクリプトでカスタマイズできます。ただし、 setup.sql
スクリプトはアプリケーションのインストール、アップグレード、ダウングレードのたびに実行されることに注意してください。このため、以下の例のようにマージクエリを使用することをお勧めします。
MERGE INTO STATE.PREREQUISITES AS dest
USING (SELECT * FROM VALUES
('1',
'Sample prerequisite',
'Prerequisites can be used to notice the end user of the connector about external configurations. Read more in the SDK documentation below. This content can be modified inside `setup.sql` script',
'https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/prerequisites',
NULL,
NULL,
1
)
) AS src (id, title, description, documentation_url, learnmore_url, guide_url, position)
ON dest.id = src.id
WHEN NOT MATCHED THEN
INSERT (id, title, description, documentation_url, learnmore_url, guide_url, position)
VALUES (src.id, src.title, src.description, src.documentation_url, src.learnmore_url, src.guide_url, src.position);
コネクタ構成のステップ¶
ウィザードフェーズの次のステップは、コネクタ構成のステップです。このステップでは、コネクタに必要なデータベースオブジェクトと権限を構成できます。このステップでは、次の構成プロパティを指定できます。
warehouse
operational_warehouse
cortex_warehouse
destination_database
destination_schema
global_schedule
data_owner_role
cortex_user_role
agent_username
agent_role
その他のカスタムプロパティが必要な場合は、ウィザードフェーズの次のステップのいずれかで構成できます。各プロパティの詳細については、次をご参照ください。 コネクタ構成
さらに、テンプレートで提供される Streamlit コンポーネント(streamlit/wizard/connector_config.py
)は、 Native Apps Permission SDK をトリガーし、エンドユーザーに権限付与をリクエストする方法を示しています。使用可能なプロパティがコネクタのニーズを満たしている限り、バックエンドクラスを上書きする必要はありませんが、構成のステップを追加してコンポーネントと同じ方法で上書きすることもできます。
内部プロシージャとJavaオブジェクトの詳細については、次をご参照ください。 コネクタ構成リファレンス
Streamlitの例では、 manifest.yml
ファイル(CREATE DATABASE
および EXECUTE TASKS
)で設定されたアカウントレベルの権限をリクエストすることができます。また、ユーザーはPermission SDK ポップアップからウェアハウスリファレンスを指定することができます。
テンプレートでは、ユーザーは destination_database
および destination_schema
のみを入力するように求められます。しかし、 streamlit/wizard/connector_configuration.py
の TODO
のコメントには、Streamlit UI でより多くの入力フィールドを表示するために再利用できるコメント付きコードが含まれています。
# TODO: Here you can add additional fields in connector configuration.
# For example:
st.subheader("Operational warehouse")
input_col, _ = st.columns([2, 1])
with input_col:
st.text_input("", key="operational_warehouse", label_visibility="collapsed")
st.caption("Name of the operational warehouse to be used")
接続構成のステップ¶
ウィザードフェーズの次のステップは、接続構成のステップです。このステップでは、エンドユーザーはコネクタの外部接続パラメーターを構成できます。この構成には、シークレットや統合などのオブジェクトの識別子が含まれる場合があります。
この情報はコネクタが取り込むデータのソースシステムによって異なるため、ソースコードでより大きなカスタマイズを行わなければならない最初の場所です。
接続構成の詳細については、次をご参照ください。
Streamlit UI 側(streamlit/wizard/connection_config.py
)から、必要なパラメーターのテキスト入力を追加する必要があります。テキスト入力の例が実装されています。このファイルのコードを検索すると、新しいフィールドのためのコメント付きコード TODO
が見つかります。
# TODO: Additional configuration properties can be added to the UI like this:
st.subheader("Additional connection parameter")
input_col, _ = st.columns([2, 1])
with input_col:
st.text_input("", key="additional_connection_property", label_visibility="collapsed")
st.caption("Some description of the additional property")
プロパティをフォームに追加したら、コネクタのバックエンドレイヤーに渡す必要があります。そのためには、Streamlitファイルの2か所を追加で変更する必要があります。1つ目は、 streamlit/wizard/connection_config.py
ファイルの finish_config
関数です。新しく追加されたテキスト入力の状態は、ここで読む必要があります。さらに、必要に応じて検証し、 set_connection_configuration
関数に渡すこともできます。
たとえば、 additional_connection_property
が追加された場合、編集後は次のように表示されます。
def finish_config():
try:
# TODO: If some additional properties were specified they need to be passed to the set_connection_configuration function.
# The properties can also be validated, for example, check whether they are not blank strings etc.
response = set_connection_configuration(
custom_connection_property=st.session_state["custom_connection_property"],
additional_connection_property=st.session_state["additional_connection_property"],
)
# rest of the method without changes
2つ目は、 streamlit/native_sdk_api/connection_config.py
ファイルにある set_connection_configuration
関数を編集する必要があります。この関数は、Streamlit UI と、コネクタのバックエンドへのエントリポイントである基になる SQL プロシージャ間のプロキシです。
def set_connection_configuration(custom_connection_property: str, additional_connection_property: str):
# TODO: this part of the code sends the config to the backend so all custom properties need to be added here
config = {
"custom_connection_property": escape_identifier(custom_connection_property),
"additional_connection_property": escape_identifier(additional_connection_property),
}
return call_procedure(
"PUBLIC.SET_CONNECTION_CONFIGURATION",
[variant_argument(config)]
)
これを行うと、新しいプロパティが、構成を含む内部コネクタテーブルに保存されます。しかし、可能なカスタマイズはこれで終わりではありません。一部のバックエンドコンポーネントはカスタマイズできます。コード内で次のコメントを探して見つけてください。
TODO: IMPLEMENT ME connection configuration validate
TODO: IMPLEMENT ME connection callback
TODO: IMPLEMENT ME test connection
検証部分では、 UI から受信したデータについて、追加の検証を実行できます。また、大文字と小文字を変更したり、提供されたデータをトリミングしたり、提供された名前のオブジェクトが実際にSnowflake内に存在するかどうかをチェックするなど、データを変換することもできます。
接続コールバックは、 外部統合セットアップリファレンス で説明されているソリューションを使用して、外部アクセス統合を使用する必要があるプロシージャを変更するなど、設定に基づいて追加の操作を実行できるようにする部分です。
接続テストは接続構成の最終コンポーネントで、コネクタとソース・システム間で接続が確立できるかどうかをチェックします。
内部コンポーネントの詳細については、次をご参照ください。
実装例は次のようになります。
public class TemplateConfigurationInputValidator implements ConnectionConfigurationInputValidator {
private static final String ERROR_CODE = "INVALID_CONNECTION_CONFIGURATION";
@Override
public ConnectorResponse validate(Variant config) {
// TODO: IMPLEMENT ME connection configuration validate: If the connection configuration input
// requires some additional validation this is the place to implement this logic.
// See more in docs:
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
var integrationCheck = checkParameter(config, INTEGRATION_PARAM, false);
if (!integrationCheck.isOk()) {
return integrationCheck;
}
var secretCheck = checkParameter(config, SECRET_PARAM, true);
if (!secretCheck.isOk()) {
return ConnectorResponse.error(ERROR_CODE);
}
return ConnectorResponse.success();
}
}
public class TemplateConnectionConfigurationCallback implements ConnectionConfigurationCallback {
private static final String[] EXTERNAL_SOURCE_PROCEDURE_SIGNATURES = {
asVarchar(format("%s.%s()", PUBLIC_SCHEMA, TEST_CONNECTION_PROCEDURE)),
asVarchar(format("%s.%s(VARIANT)", PUBLIC_SCHEMA, FINALIZE_CONNECTOR_CONFIGURATION_PROCEDURE)),
asVarchar(format("%s.%s(NUMBER, STRING)", PUBLIC_SCHEMA, WORKER_PROCEDURE))
};
private final Session session;
public TemplateConnectionConfigurationCallback(Session session) {
this.session = session;
}
@Override
public ConnectorResponse execute(Variant config) {
// TODO: If you need to alter some procedures with external access you can use
// configureProcedure method or implement a similar method on your own.
// TODO: IMPLEMENT ME connection callback: Implement the custom logic of changes in application
// to be done after connection configuration, like altering procedures with external access.
// See more in docs:
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
var response = configureProceduresWithReferences();
if (response.isNotOk()) {
return response;
}
return ConnectorResponse.success();
}
private ConnectorResponse configureProceduresWithReferences() {
return callProcedure(
session,
PUBLIC_SCHEMA,
SETUP_EXTERNAL_INTEGRATION_WITH_NAMES_PROCEDURE,
EXTERNAL_SOURCE_PROCEDURE_SIGNATURES);
}
}
public class TemplateConnectionValidator {
private static final String ERROR_CODE = "TEST_CONNECTION_FAILED";
public static Variant testConnection(Session session) {
// TODO: IMPLEMENT ME test connection: Implement the custom logic of testing the connection to
// the source system here. This usually requires connection to some webservice or other external
// system. It is suggested to perform only the basic connectivity validation here.
// If that's the case then this procedure must be altered in TemplateConnectionConfigurationCallback first.
// See more in docs:
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
return test().toVariant();
}
private static ConnectorResponse test() {
try {
var response = SourceSystemHttpHelper.testEndpoint();
if (isSuccessful(response.statusCode())) {
return ConnectorResponse.success();
} else {
return ConnectorResponse.error(ERROR_CODE, "Connection to source system failed");
}
} catch (Exception exception) {
return ConnectorResponse.error(ERROR_CODE, "Test connection failed");
}
}
}
構成ステップをファイナライズする¶
コネクタ構成を確定するステップは、ウィザード・フェーズの最終ステップです。このステップには複数の責任があります。
ユーザーがコネクタに必要な追加構成を指定できるようにします。
必要に応じて、シンクデータベース、スキーマ、取り込まれたデータ用の追加テーブルとビューを作成します。
スケジューラやタスクリアクタなどの内部コンポーネントの初期化
構成確定に関する詳細情報はこちらをご覧ください。
タスク・リアクターとスケジュールに関する詳しい情報はこちらをご覧ください。
接続構成ステップと同様に、カスタマイズは Streamlit UI で開始できます。 streamlit/wizard/finalize_config.py
ファイルには、 プロパティ例を持つフォームが含まれてい ます。コネクタのニーズに応じて、さらにプロパティを追加できます。別のプロパティを追加するには、上記のファイルに新しいプロパティを追加するサンプルコードを含む TODO
コメントを探します。
# TODO: Here you can add additional fields in finalize connector configuration.
# For example:
st.subheader("Some additional property")
input_col, _ = st.columns([2, 1])
with input_col:
st.text_input("", key="some_additional_property", label_visibility="collapsed")
st.caption("Description of some new additional property")
新しいプロパティのテキスト入力を追加した後、それをバックエンド側に渡す必要があります。実行するには、同じファイル内の finalize_configuration
関数を変更します。
def finalize_configuration():
try:
st.session_state["show_main_error"] = False
# TODO: If some additional properties were introduced, they need to be passed to the finalize_connector_configuration function.
response = finalize_connector_configuration(
st.session_state.get("custom_property"),
st.session_state.get("some_additional_property")
)
次に、 streamlit/native_sdk_api/finalize_config.py
ファイルを開き、以下の関数に新しいプロパティを追加します:
def finalize_connector_configuration(custom_property: str, some_additional_property: str):
# TODO: If some custom properties were configured, then they need to be specified here and passed to the FINALIZE_CONNECTOR_CONFIGURATION procedure.
config = {
"custom_property": custom_property,
"some_additional_property": some_additional_property,
}
return call_procedure(
"PUBLIC.FINALIZE_CONNECTOR_CONFIGURATION",
[variant_argument(config)]
)
接続構成ステップと同様に、このステップでもさまざまなバックエンドコンポーネントをカスタマイズすることができます。
TODO: IMPLEMENT ME validate source
TODO: IMPLEMENT ME finalize internal
ソースの検証では、ソースシステムでより高度な検証を実行します。前のテスト接続が接続が確立できることをチェックするだけであった場合、検証ソースはシステム内の特定のデータへのアクセスをチェックすることができます。
Finalize internal は、タスクリアクタとスケジューラを初期化し、シンクデータベースと必要なネストされたオブジェクトを作成する内部プロシージャです。また、ファイナライズステップ中に提供された構成を保存するためにも使用できます(この構成はデフォルトでは保存されません)。
内部コンポーネントの詳細については、次をご参照ください。
さらに、 FinalizeConnectorInputValidator
インターフェイスを使用して入力を検証し、それを finalize ハンドラーに提供することができます - TemplateFinalizeConnectorConfigurationCustomHandler
ファイルを確認してください。ビルダーの使用に関する詳細については、次をご参照ください。 ストアドプロシージャとハンドラーのカスタマイズ
検証ソースの実装例は次のようになります。
public class SourceSystemAccessValidator implements SourceValidator {
@Override
public ConnectorResponse validate(Variant variant) {
// TODO: IMPLEMENT ME validate source: Implement the custom logic of validating the source
// system. In some cases this can be the same validation that happened in
// TemplateConnectionValidator.
// However, it is suggested to perform more complex validations, like specific access rights to
// some specific resources here.
// See more in docs:
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/finalize_configuration_reference
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/finalize_configuration
var finalizeProperties = Configuration.fromCustomConfig(variant);
var httpResponse = SourceSystemHttpHelper.validateSource(finalizeProperties.get("custom_property"));
return prepareConnectorResponse(httpResponse.statusCode());
}
private ConnectorResponse prepareConnectorResponse(int statusCode) {
switch (statusCode) {
case 200:
return ConnectorResponse.success();
case 401:
return ConnectorResponse.error("Unauthorized error");
case 404:
return ConnectorResponse.error("Not found error");
default:
return ConnectorResponse.error("Unknown error");
}
}
}
リソースを作成する¶
ウィザードフェーズが完了すると、コネクタはデータの取り込みを開始する準備が整います。しかしその前に、リソースの実装と構成が必要です。リソースとは、ソース・システム内の特定のデータ・セット(テーブル、エンドポイント、ファイルなど)を記述する抽象化です。
異なるリソースシステムはリソースに関する異なる情報を必要とする場合があります。そのため、リソース定義は特定のニーズに応じてカスタマイズする必要があります。実行するには、 streamlit/daily_use/data_sync_page.py
ファイルに移動します。リソースパラメーターのテキスト入力の追加については、 TODO
コメントを確認できます。リソースパラメーターにより、ソースシステムからのデータの識別と取得が可能になる必要があります。これらのパラメーターは、インジェスチョン中に抽出できます。
# TODO: specify all the properties needed to define a resource in the source system. A subset of those properties should allow for a identification of a single resource, be it a table, endpoint, repository or some other data storage abstraction
st.text_input(
"Resource name",
key="resource_name",
)
st.text_input(
"Some resource parameter",
key="some_resource_parameter"
)
すべての必要なプロパティがフォームに追加されると、バックエンド側に渡すことができます。まず、テキストフィールドの状態を抽出し、 streamlit/daily_use/data_sync_page.py
ファイル内の API レベル queue_resource
メソッドに渡す必要があります。
def queue_resource():
# TODO: add additional properties here and pass them to create_resource function
resource_name = st.session_state.get("resource_name")
some_resource_parameter = st.session_state.get("some_resource_parameter")
if not resource_name:
st.error("Resource name cannot be empty")
return
result = create_resource(resource_name, some_resource_parameter)
if result.is_ok():
st.success("Resource created")
else:
st.error(result.get_message())
それから、 streamlit/native_sdk_api/resource_management.py
ファイルの create_resource
関数を更新する必要があります:
def create_resource(resource_name, some_resource_parameter):
ingestion_config = [{
"id": "ingestionConfig",
"ingestionStrategy": "INCREMENTAL",
# TODO: HINT: scheduleType and scheduleDefinition are currently not supported out of the box, due to globalSchedule being used. However, a custom implementation of the scheduler can use those fields. They need to be provided becuase they are mandatory in the resourceDefinition.
"scheduleType": "INTERVAL",
"scheduleDefinition": "60m"
}]
# TODO: HINT: resource_id should allow identification of a table, endpoint etc. in the source system. It should be unique.
resource_id = {
"resource_name": resource_name,
}
id = f"{resource_name}_{random_suffix()}"
# TODO: if you specified some additional resource parameters then you need to put them inside resource metadata:
resource_metadata = {
"some_resource_parameter": some_resource_parameter
}
return call_procedure("PUBLIC.CREATE_RESOURCE",
[
varchar_argument(id),
variant_argument(resource_id),
variant_list_argument(ingestion_config),
varchar_argument(id),
"true",
variant_argument(resource_metadata)
])
CREATE_RESOURCE() プロシージャ・ロジックのカスタマイズ¶
PUBLIC.CREATE_RESOURCE()
プロシージャは、メイン実行フローのいくつかの場所にプラグインされるカスタムロジックを実装することによって、開発者がその実行をカスタマイズすることができます。SDK によって、開発者は次のことができます。
リソースを作成する前に検証します。このロジックは、
PUBLIC.CREATE_RESOURCE_VALIDATE()
プロシージャに実装する必要があります。リソースが作成される前にカスタムオペレーションを実行します。このロジックは、
PUBLIC.PRE_CREATE_RESOURCE()
プロシージャに実装する必要があります。リソース作成後にカスタムオペレーションを実行します。このロジックは、
PUBLIC.POST_CREATE_RESOURCE()
プロシージャに実装する必要があります。
PUBLIC.CREATE_RESOURCE()
プロシージャのカスタマイズについての詳細は、こちらをご覧ください。
TemplateCreateResourceHandler.java¶
このクラスは PUBLIC.CREATE_RESOURCE()
プロシージャのハンドラーです。ここでは、前述のコールバック・プロシージャのハンドラのJava実装を注入することができます。デフォルトでは、プロシージャの実行時間を長くしてしまう SQL プロシージャの呼び出しから解放されるように、テンプレートはコールバックハンドラーのモックされた Java 実装を提供します。これらのモックされた実装は、成功レスポンスを返す以外には何もしません。テンプレートによって準備されたコールバック・クラスにカスタム実装を提供するか、これらのコールバックをゼロから作成し、ハンドラー・ビルダーでメイン・プロシージャの実行フローに注入することができます。
デフォルトで呼び出されるコールバック・メソッドのカスタム・ロジックを実装するには、コード内で以下のコメントを探します。
TODO: IMPLEMENT ME create resource validate
TODO: IMPLEMENT ME pre create resource callback
TODO: IMPLEMENT ME post create resource callback
インジェスチョン¶
データのインジェスチョンを実行するには、リソース構成に基づいてソースシステムとの接続を処理し、データを取得するクラスを実装する必要があります。SchedulerモジュールとTask Reactorモジュールは、インジェスションタスクのトリガーとキューを担当します。
インジェストロジックは TemplateIngestion
クラスから呼び出されます。コード内の TODO: IMPLEMENT ME ingestion
のコメントを探し、ランダムなデータ生成をソースシステムからのデータ取得に置き換えます。リ ソ ース定義にカ ス タ ムプ ロ パテ ィ を追加 し た場合は、 TemplateWorkItem
で利用可能な ResourceIngestionDefinitionRepository
とプロパティを用いて、 内部コネクタテーブルか らこれらを取得できます。
resourceIngestionDefinitionId
ingestionConfigurationId
ウェブサービスからデータを取得する例は以下のようになる かもしれません。
public final class SourceSystemHttpHelper {
private static final String DATA_URL = "https://source_system.com/data/%s";
private static final SourceSystemHttpClient sourceSystemClient = new SourceSystemHttpClient();
private static final ObjectMapper objectMapper = new ObjectMapper();
private static List<Variant> fetchData(String resourceId) {
var response = sourceSystemClient.get(String.format(url, resourceId));
var body = response.body();
try {
return Arrays.stream(objectMapper.readValue(body, Map[].class))
.map(Variant::new)
.collect(Collectors.toList());
} catch (JsonProcessingException e) {
throw new RuntimeException("Cannot parse json", e);
}
}
}
public class SourceSystemHttpClient {
private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(15);
private final HttpClient client;
private final String secret;
public SourceSystemHttpClient() {
this.client = HttpClient.newHttpClient();
this.secret =
SnowflakeSecrets.newInstance()
.getGenericSecretString(ConnectionConfiguration.TOKEN_NAME);
}
public HttpResponse<String> get(String url) {
var request =
HttpRequest.newBuilder()
.uri(URI.create(url))
.GET()
.header("Authorization", format("Bearer %s", secret))
.header("Content-Type", "application/json")
.timeout(REQUEST_TIMEOUT)
.build();
try {
return client.send(request, HttpResponse.BodyHandlers.ofString());
} catch (IOException | InterruptedException ex) {
throw new RuntimeException(format("HttpRequest failed: %s", ex.getMessage()), ex);
}
}
}
リソースのライフサイクル管理¶
リソースの作成と取り込みのロジックが実装されたら、以下のプロシージャを呼び出すことで、リソースのライフサイクルを管理することができます:
PUBLIC.ENABLE_RESOURCE()
特定のリソースを有効にします。つまり、そのリソースの取り込みがスケジュールされます。PUBLIC.DISABLE_RESOURCE()
特定のリソースを無効にします。つまり、そのリソースの取り込みスケジュールを停止します。PUBLIC.UPDATE_RESOURCE()
を使用すると、特定のリソースの取り込み構成を更新できます。Streamlit UI には既定では実装されていません。コネクタユーザーがインジェスト構成をカスタマイズできるようにすることは、開発者にとって望ましくない場合があるからです(このプロシージャの使用を完全に禁止するには、アプリケーションロールADMIN
へのプロシージャの許可を取り消します)。
これらのプロシージャはすべてJavaハンドラーを持ち、実行をカスタマイズできるコールバックで拡張されています。これらのハンドラー用のビルダーを使って、コールバックのカスタム実装を注入することができます。デフォルトでは、テンプレートはコールバックハンドラーのモックされたJava実装を提供します。これらのモックされた実装は、成功レスポンスを返す以外には何もしません。テンプレートによって準備されたコールバッククラスにカスタム実装を提供するか、これらのコールバックをゼロから作成し、ハンドラービルダーのメインプロシージャ実行フローに注入することができます。
TemplateEnableResourceHandler.java¶
このクラスは、 PUBLIC.ENABLE_RESOURCE()
プロシージャのハンドラーであり、専用のコールバックで拡張できます。
有効化する前にリソースを検証します。カスタム実装を提供するために、
TODO: IMPLEMENT ME enable resource validate
のコメントをコードで探してください。リソースが有効になる前にカスタムオペレーションを実行します。カスタム実装を提供するために、
TODO: IMPLEMENT ME pre enable resource
のコメントをコードで探してください。リソースが有効になった後にカスタムオペレーションを実行します。カスタム実装を提供するために、
TODO: IMPLEMENT ME post enable resource
のコメントをコードで探してください。
詳しくは、 PUBLIC.ENABLE_RESOURCE()
プロシージャの詳細ドキュメントをご覧ください。
TemplateDisableResourceHandler.java¶
このクラスは、 PUBLIC.DISABLE_RESOURCE()
プロシージャのハンドラーであり、専用のコールバックで拡張できます。
無効にする前にリソースを検証します。カスタム実装を提供するために、
TODO: IMPLEMENT ME disable resource validate
のコメントをコードで探してください。リソースが無効になる前にカスタムオペレーションを実行します。カスタム実装をプロバイダーするために、
TODO: IMPLEMENT ME pre disable resource
のコメントを探してください。
詳しくは、 PUBLIC.DISABLE_RESOURCE()
プロシージャの詳細ドキュメントをご覧ください。
TemplateUpdateResourceHandler.java¶
このクラスは、 PUBLIC.UPDATE_RESOURCE()
プロシージャのハンドラーであり、専用のコールバックで拡張できます。
リソースを更新する前に検証します。カスタム実装を提供するために、
TODO: IMPLEMENT ME update resource validate
のコメントをコードで探してください。リソースが更新される前にカスタムオペレーションを実行します。カスタム実装を提供するために、
TODO: IMPLEMENT ME pre update resource
のコメントをコードで探してください。リソースの更新後にカスタムオペレーションを実行します。カスタム実装を提供するために、
TODO: IMPLEMENT ME post update resource
のコメントをコードで探してください。
詳しくは、 PUBLIC.UPDATE_RESOURCE()
プロシージャの詳細ドキュメントをご覧ください。
設定¶
テンプレートには、以前に実行されたすべての構成を表示できる設定タブが含まれています。しかし、構成プロパティがカスタマイズされている場合、この表示にもカスタマイズが必要です。設定タブのコードは streamlit/daily_use/settings_page.py
ファイルにあります。
カスタマイズするには、それぞれの構成で追加されたキーの構成から値を抽出します。例えば、以前の additional_connection_property
が接続構成ステップで追加された場合、設定表示では次のように追加できます。
def connection_config_page():
current_config = get_connection_configuration()
# TODO: implement the display for all the custom properties defined in the connection configuration step
custom_property = current_config.get("custom_connection_property", "")
additional_connection_property = current_config.get("additional_connection_property", "")
st.header("Connector configuration")
st.caption("Here you can see the connector connection configuration saved during the connection configuration step "
"of the Wizard. If some new property was introduced it has to be added here to display.")
st.divider()
st.text_input(
"Custom connection property:",
value=custom_property,
disabled=True
)
st.text_input(
"Additional connection property:",
value=additional_connection_property,
disabled=True
)
st.divider()