チュートリアル: Snowflake Native SDK for Connectors のJavaコネクタテンプレート

概要

Snowflake Native SDK for Connectors を使用したコネクタテンプレートのチュートリアルへようこそ。このガイドでは、シンプルなConnector Native Applicationの設定について説明します。

このチュートリアルでは、次の方法を学習します。

  • Connector Native Applicationのデプロイ

  • データを取り込むためのテンプレートコネクタを構成する

  • テンプレートコネクタを自分のニーズに合わせてカスタマイズする

テンプレートにはコード内にさまざまな役立つコメントが含まれており、変更が必要な特定のファイルを簡単に見つけることができます。次のキーワードを含むコメントを探します。そのキーワードを使用すると、独自のコネクタを実装するのに役立ちます。

  • TODO

  • TODO: HINT

  • TODO: IMPLEMENT ME

このチュートリアルを始める前に、次の推奨コンテンツを確認して準備してください。

前提条件

開始する前に、次の要件を満たしていることを確認してください。

ローカル環境の準備

クローニングを進める前に、必要なソフトウェアがすべてマシンにインストールされていることを確認し、コネクタテンプレートをクローンする必要があります。

Javaのインストール

Snowflake Native SDK for Connectors Java LTS (Long-Term Support) バージョン11以上が必要です。お使いのマシンにJavaの最低必要バージョンがインストールされていない場合は、Oracle Javaまたは OpenJDK のいずれかをインストールする必要があります。

Oracle Java

JDK の最新リリース LTS は、Oracle NFTC のもと、無料でダウンロードし、コストなしで使用することができます。ダウンロードとインストール方法については、 Oracle のページ をご覧ください。

OpenJDK

OpenJDK はJavaのオープンソース実装です。ダウンロードおよびインストール方法については、 openjdk.org および jdk.java.net をご参照ください。

Eclipse TemurinAmazon Corretto など、サードパーティのOpenJDK バージョンを使用することもできます。

Snowflake CLI の構成

コネクタのビルド、デプロイ、インストールには、 Snowflake CLI ツールが必要です。お使いのマシンに Snowflake CLI がインストールされていない場合は、 こちら の指示に従ってインストールしてください。

ツールのインストール後、 構成ファイル で Snowflake への接続を構成する必要があります。

接続が構成されていない場合は、 native_sdk_connection という名前の接続を新規に作成してください。 deployment/snowflake.toml ファイルに接続例があります。

すでに接続が構成されていて、それをコネクタで使用したい場合は、このチュートリアルでこの接続を使用するときは、 native_sdk_connection の代わりにその名前を使用してください。

テンプレートクローニング

コネクタ・テンプレートをクローンするには、次のコマンドを使用します。

snow init <project_dir> \
  --template-source https://github.com/snowflakedb/connectors-native-sdk \
  --template templates/connectors-native-sdk-template

<project_dir> の代わりに、コネクタの Java プロジェクトを作成するディレクトリ名(存在してはなりません)を入力します。

コマンド実行後、アプリケーション・インスタンスとステージ名構成のための追加情報の入力を求められます。有効な引用符で囲まれていないSnowflake識別子であれば、どのような名前を指定してもかまいません。また、Enterをクリックすると、角括弧で囲まれたデフォルト値が使用されます。

カスタムアプリケーション名とステージ名を提供するコマンド実行例:

$ snow init my_connector \
    --template-source https://github.com/snowflakedb/connectors-native-sdk \
    --template templates/connectors-native-sdk-template

Name of the application instance which will be created in Snowflake [connectors-native-sdk-template]: MY_CONNECTOR
Name of the schema in which the connector files stage will be created [TEST_SCHEMA]:
Name of the stage used to store connector files in the application package [TEST_STAGE]: CUSTOM_STAGE_NAME
Initialized the new project in my_connector

コネクタのビルド、展開、クリーンアップ

このテンプレートは、修正前であっても、すぐに導入することができます。以下のセクションでは、コネクタのビルド、デプロイ、インストール方法を説明します。

コネクタの作成

Snowflake Native SDK for Connectors を使用して作成されたコネクタの構築は、一般的な Java アプリケーションの構築とは少し異なります。ソースから.jarアーカイブをビルドするだけでなく、やらなければならないことがいくつかあります。アプリケーションの構築は以下の手順で行います。

  1. カスタム内部コンポーネントのビルドディレクトリへのコピー

  2. コンポーネントのビルドディレクトリへ SDK をコピーする

内部コンポーネントのコピー

このステップでは、コネクタ .jar ファイルをビルドし、それを(UI、マニフェストおよびセットアップ ファイルとともに) sf_build ディレクトリにコピーします。

このステップを実行するには、コマンドを実行します: ./gradlew copyInternalComponents.

SDK コンポーネントのコピー

このステップでは、 SDK .jarファイル(コネクタGradleモジュールの依存関係として追加)を sf_build ディレクトリにコピーし、.jarアーカイブからバンドルされた.sqlファイルを抽出します。

これらの.sqlファイルによって、アプリケーションのインストール中にどのプロバイダーオブジェクトが作成されるかをカスタマイズすることができます。オブジェクトの省略は、間違って実行するといくつかの機能を失敗させる可能性があるため、初めてのユーザーにはカスタマイズをお勧めしません。テンプレート・コネクタ・アプリケーションは all.sql ファイルを使用し、推奨される SDK オブジェクトをすべて作成します。

このステップを実行するには、コマンドを実行します: ./gradlew copySdkComponents.

コネクタの展開

Native Appsをデプロイするには、Snowflake内でアプリケーションパッケージを作成する必要があります。その後、 sf_build ディレクトリのすべてのファイルを Snowflake にアップロードする必要があります。

アプリケーションインスタンスはステージングされたファイルから直接作成できます。このアプローチでは、バージョンとアプリケーションのインスタンスを再作成することなく、ほとんどのコネクタファイルの変更を確認できます。

以下の演算子を行います。

  1. アプリケーションパッケージがまだ存在しない場合は、新しいアプリケーションパッケージを作成します。

  2. パッケージ内にスキーマとステージングされたファイルを作成します。

  3. sf_build ディレクトリからステージにファイルをアップロードします(このステップには時間がかかる場合があります)。

コネクタをデプロイするには、コマンドを実行します: snow app deploy --connection=native_sdk_connection.

snow app deploy コマンドの詳細については、 snow app deploy をご参照ください。

作成されたアプリケーションパッケージは、アカウントの Snowflake UI の App packages タブの Data products カテゴリに表示されます。

コネクタをインストールする

アプリケーションのインストールはプロセスの最後のステップです。先に作成したアプリケーションパッケージからアプリケーションを作成します。

コネクタをインストールするには、コマンドを実行します: snow app run --connection=native_sdk_connection.

snow app run コマンドの詳細については、 snow app run をご参照ください。

インストールされたアプリケーションは、アカウントの Snowflake UI の Data products カテゴリにある Installed apps タブに表示されます。

コネクタファイルの更新

コネクタファイルを変更したい場合は、変更したファイルをアプリケーションパッケージのステージに簡単にアップロードできます。アップロードコマンドは、どのファイルが更新されたかに依存します。

アップデートコマンドを実行する前に、 sf_build ディレクトリにコネクタの新しいファイルをコピーする必要があります。 ./gradlew copyInternalComponents

UI .pyファイルまたはコネクタ.javaファイル

snow app deploy --connection=native_sdk_connection コマンドを使用すると、現在のアプリケーション・インスタンスは再インストールせずに新しいファイルを使用します。

setup.sql または manifest.yml ファイル

snow app run --connection=native_sdk_connection コマンドを使用すると、新しいファイルがステージにアップロードされた後、現在のアプリケーションインスタンスが再インストールされます。

クリーンアップ

チュートリアルが終了した後、あるいは何らかの理由でアプリケーションとそのパッケージを削除したい場合は、コマンドを使ってアカウントから完全に削除することができます。

snow app teardown --connection=native_sdk_connection --cascade --force

--cascade オプションは、アカウント管理者に所有権を移さずに移行先データベースを削除するために必要です。実際のコネクタでは、取り込まれたデータを保持するためにデータベースを削除すべきではありません。データベースは、アカウント管理者が所有するか、アンインストール前に所有権を譲渡する必要があります。

インジェストが構成されていなくても、一時停止または削除されるまで、コネクタはクレジットを消費します!

前提条件のステップ

インストール直後、コネクタはウィザード段階にあります。このフェーズは、エンドユーザーに必要なすべての構成をガイドするいくつかのステップで構成されます。

最初のステップは前提条件のステップです。これはオプションであり、すべてのコネクタに必要なわけではありません。前提条件は通常、アプリケーション外部のユーザーから要求されるアクションで、例えば SQL ワークシートでクエリを実行したり、ソースシステム側で構成を行うなどです。

前提条件の詳細については、次をご参照ください。 前提条件

各前提条件の内容は、コネクタ内にある STATE.PREREQUISITES テーブルから直接取得されます。 setup.sql スクリプトでカスタマイズできます。ただし、 setup.sql スクリプトはアプリケーションのインストール、アップグレード、ダウングレードのたびに実行されることに注意してください。このため、以下の例のようにマージクエリを使用することをお勧めします。

MERGE INTO STATE.PREREQUISITES AS dest
USING (SELECT * FROM VALUES
           ('1',
            'Sample prerequisite',
            'Prerequisites can be used to notice the end user of the connector about external configurations. Read more in the SDK documentation below. This content can be modified inside `setup.sql` script',
            'https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/prerequisites',
            NULL,
            NULL,
            1
           )
) AS src (id, title, description, documentation_url, learnmore_url, guide_url, position)
ON dest.id = src.id
WHEN NOT MATCHED THEN
    INSERT (id, title, description, documentation_url, learnmore_url, guide_url, position)
    VALUES (src.id, src.title, src.description, src.documentation_url, src.learnmore_url, src.guide_url, src.position);
Copy

コネクタ構成のステップ

ウィザードフェーズの次のステップは、コネクタ構成のステップです。このステップでは、コネクタに必要なデータベースオブジェクトと権限を構成できます。このステップでは、次の構成プロパティを指定できます。

  • warehouse

  • operational_warehouse

  • cortex_warehouse

  • destination_database

  • destination_schema

  • global_schedule

  • data_owner_role

  • cortex_user_role

  • agent_username

  • agent_role

その他のカスタムプロパティが必要な場合は、ウィザードフェーズの次のステップのいずれかで構成できます。各プロパティの詳細については、次をご参照ください。 コネクタ構成

さらに、テンプレートで提供される Streamlit コンポーネント(streamlit/wizard/connector_config.py)は、 Native Apps Permission SDK をトリガーし、エンドユーザーに権限付与をリクエストする方法を示しています。使用可能なプロパティがコネクタのニーズを満たしている限り、バックエンドクラスを上書きする必要はありませんが、構成のステップを追加してコンポーネントと同じ方法で上書きすることもできます。

内部プロシージャとJavaオブジェクトの詳細については、次をご参照ください。 コネクタ構成リファレンス

Streamlitの例では、 manifest.yml ファイル(CREATE DATABASE および EXECUTE TASKS)で設定されたアカウントレベルの権限をリクエストすることができます。また、ユーザーはPermission SDK ポップアップからウェアハウスリファレンスを指定することができます。

テンプレートでは、ユーザーは destination_database および destination_schema のみを入力するように求められます。しかし、 streamlit/wizard/connector_configuration.pyTODO のコメントには、Streamlit UI でより多くの入力フィールドを表示するために再利用できるコメント付きコードが含まれています。

# TODO: Here you can add additional fields in connector configuration.
# For example:
st.subheader("Operational warehouse")
input_col, _ = st.columns([2, 1])
with input_col:
    st.text_input("", key="operational_warehouse", label_visibility="collapsed")
st.caption("Name of the operational warehouse to be used")
Copy

接続構成のステップ

ウィザードフェーズの次のステップは、接続構成のステップです。このステップでは、エンドユーザーはコネクタの外部接続パラメーターを構成できます。この構成には、シークレットや統合などのオブジェクトの識別子が含まれる場合があります。

この情報はコネクタが取り込むデータのソースシステムによって異なるため、ソースコードでより大きなカスタマイズを行わなければならない最初の場所です。

接続構成の詳細については、次をご参照ください。

Streamlit UI 側(streamlit/wizard/connection_config.py)から、必要なパラメーターのテキスト入力を追加する必要があります。テキスト入力の例が実装されています。このファイルのコードを検索すると、新しいフィールドのためのコメント付きコード TODO が見つかります。

# TODO: Additional configuration properties can be added to the UI like this:
st.subheader("Additional connection parameter")
input_col, _ = st.columns([2, 1])
with input_col:
    st.text_input("", key="additional_connection_property", label_visibility="collapsed")
st.caption("Some description of the additional property")
Copy

プロパティをフォームに追加したら、コネクタのバックエンドレイヤーに渡す必要があります。そのためには、Streamlitファイルの2か所を追加で変更する必要があります。1つ目は、 streamlit/wizard/connection_config.py ファイルの finish_config 関数です。新しく追加されたテキスト入力の状態は、ここで読む必要があります。さらに、必要に応じて検証し、 set_connection_configuration 関数に渡すこともできます。

たとえば、 additional_connection_property が追加された場合、編集後は次のように表示されます。

def finish_config():
try:
    # TODO: If some additional properties were specified they need to be passed to the set_connection_configuration function.
    # The properties can also be validated, for example, check whether they are not blank strings etc.
    response = set_connection_configuration(
        custom_connection_property=st.session_state["custom_connection_property"],
        additional_connection_property=st.session_state["additional_connection_property"],
    )

# rest of the method without changes
Copy

2つ目は、 streamlit/native_sdk_api/connection_config.py ファイルにある set_connection_configuration 関数を編集する必要があります。この関数は、Streamlit UI と、コネクタのバックエンドへのエントリポイントである基になる SQL プロシージャ間のプロキシです。

def set_connection_configuration(custom_connection_property: str, additional_connection_property: str):
    # TODO: this part of the code sends the config to the backend so all custom properties need to be added here
    config = {
        "custom_connection_property": escape_identifier(custom_connection_property),
        "additional_connection_property": escape_identifier(additional_connection_property),
    }

    return call_procedure(
        "PUBLIC.SET_CONNECTION_CONFIGURATION",
        [variant_argument(config)]
    )
Copy

これを行うと、新しいプロパティが、構成を含む内部コネクタテーブルに保存されます。しかし、可能なカスタマイズはこれで終わりではありません。一部のバックエンドコンポーネントはカスタマイズできます。コード内で次のコメントを探して見つけてください。

  • TODO: IMPLEMENT ME connection configuration validate

  • TODO: IMPLEMENT ME connection callback

  • TODO: IMPLEMENT ME test connection

検証部分では、 UI から受信したデータについて、追加の検証を実行できます。また、大文字と小文字を変更したり、提供されたデータをトリミングしたり、提供された名前のオブジェクトが実際にSnowflake内に存在するかどうかをチェックするなど、データを変換することもできます。

接続コールバックは、 外部統合セットアップリファレンス で説明されているソリューションを使用して、外部アクセス統合を使用する必要があるプロシージャを変更するなど、設定に基づいて追加の操作を実行できるようにする部分です。

接続テストは接続構成の最終コンポーネントで、コネクタとソース・システム間で接続が確立できるかどうかをチェックします。

内部コンポーネントの詳細については、次をご参照ください。

実装例は次のようになります。

public class TemplateConfigurationInputValidator implements ConnectionConfigurationInputValidator {

    private static final String ERROR_CODE = "INVALID_CONNECTION_CONFIGURATION";

    @Override
    public ConnectorResponse validate(Variant config) {
      // TODO: IMPLEMENT ME connection configuration validate: If the connection configuration input
      // requires some additional validation this is the place to implement this logic.
      // See more in docs:
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
      var integrationCheck = checkParameter(config, INTEGRATION_PARAM, false);
      if (!integrationCheck.isOk()) {
        return integrationCheck;
      }

      var secretCheck = checkParameter(config, SECRET_PARAM, true);
      if (!secretCheck.isOk()) {
        return ConnectorResponse.error(ERROR_CODE);
      }

      return ConnectorResponse.success();
    }
}
Copy
public class TemplateConnectionConfigurationCallback implements ConnectionConfigurationCallback {

    private static final String[] EXTERNAL_SOURCE_PROCEDURE_SIGNATURES = {
        asVarchar(format("%s.%s()", PUBLIC_SCHEMA, TEST_CONNECTION_PROCEDURE)),
        asVarchar(format("%s.%s(VARIANT)", PUBLIC_SCHEMA, FINALIZE_CONNECTOR_CONFIGURATION_PROCEDURE)),
        asVarchar(format("%s.%s(NUMBER, STRING)", PUBLIC_SCHEMA, WORKER_PROCEDURE))
      };

    private final Session session;

    public TemplateConnectionConfigurationCallback(Session session) {
      this.session = session;
    }

    @Override
    public ConnectorResponse execute(Variant config) {
      // TODO: If you need to alter some procedures with external access you can use
      // configureProcedure method or implement a similar method on your own.
      // TODO: IMPLEMENT ME connection callback: Implement the custom logic of changes in application
      // to be done after connection configuration, like altering procedures with external access.
      // See more in docs:
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
      var response = configureProceduresWithReferences();
      if (response.isNotOk()) {
         return response;
      }
      return ConnectorResponse.success();
    }

    private ConnectorResponse configureProceduresWithReferences() {
      return callProcedure(
        session,
        PUBLIC_SCHEMA,
        SETUP_EXTERNAL_INTEGRATION_WITH_NAMES_PROCEDURE,
        EXTERNAL_SOURCE_PROCEDURE_SIGNATURES);
    }
}
Copy
public class TemplateConnectionValidator {

    private static final String ERROR_CODE = "TEST_CONNECTION_FAILED";

    public static Variant testConnection(Session session) {
      // TODO: IMPLEMENT ME test connection: Implement the custom logic of testing the connection to
      // the source system here. This usually requires connection to some webservice or other external
      // system. It is suggested to perform only the basic connectivity validation here.
      // If that's the case then this procedure must be altered in TemplateConnectionConfigurationCallback first.
      // See more in docs:
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
      return test().toVariant();
    }

    private static ConnectorResponse test() {
      try {
        var response = SourceSystemHttpHelper.testEndpoint();

        if (isSuccessful(response.statusCode())) {
          return ConnectorResponse.success();
        } else {
          return ConnectorResponse.error(ERROR_CODE, "Connection to source system failed");
        }
      } catch (Exception exception) {
        return ConnectorResponse.error(ERROR_CODE, "Test connection failed");
      }
    }
}
Copy

構成ステップをファイナライズする

コネクタ構成を確定するステップは、ウィザード・フェーズの最終ステップです。このステップには複数の責任があります。

  1. ユーザーがコネクタに必要な追加構成を指定できるようにします。

  2. 必要に応じて、シンクデータベース、スキーマ、取り込まれたデータ用の追加テーブルとビューを作成します。

  3. スケジューラやタスクリアクタなどの内部コンポーネントの初期化

構成確定に関する詳細情報はこちらをご覧ください。

タスク・リアクターとスケジュールに関する詳しい情報はこちらをご覧ください。

接続構成ステップと同様に、カスタマイズは Streamlit UI で開始できます。 streamlit/wizard/finalize_config.py ファイルには、 プロパティ例を持つフォームが含まれてい ます。コネクタのニーズに応じて、さらにプロパティを追加できます。別のプロパティを追加するには、上記のファイルに新しいプロパティを追加するサンプルコードを含む TODO コメントを探します。

# TODO: Here you can add additional fields in finalize connector configuration.
# For example:
st.subheader("Some additional property")
input_col, _ = st.columns([2, 1])
with input_col:
    st.text_input("", key="some_additional_property", label_visibility="collapsed")
st.caption("Description of some new additional property")
Copy

新しいプロパティのテキスト入力を追加した後、それをバックエンド側に渡す必要があります。実行するには、同じファイル内の finalize_configuration 関数を変更します。

def finalize_configuration():
    try:
        st.session_state["show_main_error"] = False
        # TODO: If some additional properties were introduced, they need to be passed to the finalize_connector_configuration function.
        response = finalize_connector_configuration(
            st.session_state.get("custom_property"),
            st.session_state.get("some_additional_property")
        )
Copy

次に、 streamlit/native_sdk_api/finalize_config.py ファイルを開き、以下の関数に新しいプロパティを追加します:

def finalize_connector_configuration(custom_property: str, some_additional_property: str):
    # TODO: If some custom properties were configured, then they need to be specified here and passed to the FINALIZE_CONNECTOR_CONFIGURATION procedure.
    config = {
        "custom_property": custom_property,
        "some_additional_property": some_additional_property,
    }
    return call_procedure(
        "PUBLIC.FINALIZE_CONNECTOR_CONFIGURATION",
        [variant_argument(config)]
    )
Copy

接続構成ステップと同様に、このステップでもさまざまなバックエンドコンポーネントをカスタマイズすることができます。

  • TODO: IMPLEMENT ME validate source

  • TODO: IMPLEMENT ME finalize internal

ソースの検証では、ソースシステムでより高度な検証を実行します。前のテスト接続が接続が確立できることをチェックするだけであった場合、検証ソースはシステム内の特定のデータへのアクセスをチェックすることができます。

Finalize internal は、タスクリアクタとスケジューラを初期化し、シンクデータベースと必要なネストされたオブジェクトを作成する内部プロシージャです。また、ファイナライズステップ中に提供された構成を保存するためにも使用できます(この構成はデフォルトでは保存されません)。

内部コンポーネントの詳細については、次をご参照ください。

さらに、 FinalizeConnectorInputValidator インターフェイスを使用して入力を検証し、それを finalize ハンドラーに提供することができます - TemplateFinalizeConnectorConfigurationCustomHandler ファイルを確認してください。ビルダーの使用に関する詳細については、次をご参照ください。 ストアドプロシージャとハンドラーのカスタマイズ

検証ソースの実装例は次のようになります。

public class SourceSystemAccessValidator implements SourceValidator {

    @Override
    public ConnectorResponse validate(Variant variant) {
      // TODO: IMPLEMENT ME validate source: Implement the custom logic of validating the source
      // system. In some cases this can be the same validation that happened in
      // TemplateConnectionValidator.
      // However, it is suggested to perform more complex validations, like specific access rights to
      // some specific resources here.
      // See more in docs:
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/finalize_configuration_reference
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/finalize_configuration
      var finalizeProperties = Configuration.fromCustomConfig(variant);

      var httpResponse = SourceSystemHttpHelper.validateSource(finalizeProperties.get("custom_property"));
      return prepareConnectorResponse(httpResponse.statusCode());
    }

    private ConnectorResponse prepareConnectorResponse(int statusCode) {
      switch (statusCode) {
        case 200:
          return ConnectorResponse.success();
        case 401:
          return ConnectorResponse.error("Unauthorized error");
        case 404:
          return ConnectorResponse.error("Not found error");
        default:
          return ConnectorResponse.error("Unknown error");
      }
    }
}
Copy

リソースを作成する

ウィザードフェーズが完了すると、コネクタはデータの取り込みを開始する準備が整います。しかしその前に、リソースの実装と構成が必要です。リソースとは、ソース・システム内の特定のデータ・セット(テーブル、エンドポイント、ファイルなど)を記述する抽象化です。

異なるリソースシステムはリソースに関する異なる情報を必要とする場合があります。そのため、リソース定義は特定のニーズに応じてカスタマイズする必要があります。実行するには、 streamlit/daily_use/data_sync_page.py ファイルに移動します。リソースパラメーターのテキスト入力の追加については、 TODO コメントを確認できます。リソースパラメーターにより、ソースシステムからのデータの識別と取得が可能になる必要があります。これらのパラメーターは、インジェスチョン中に抽出できます。

# TODO: specify all the properties needed to define a resource in the source system. A subset of those properties should allow for a identification of a single resource, be it a table, endpoint, repository or some other data storage abstraction
st.text_input(
    "Resource name",
    key="resource_name",
)
st.text_input(
    "Some resource parameter",
    key="some_resource_parameter"
)
Copy

すべての必要なプロパティがフォームに追加されると、バックエンド側に渡すことができます。まず、テキストフィールドの状態を抽出し、 streamlit/daily_use/data_sync_page.py ファイル内の API レベル queue_resource メソッドに渡す必要があります。

def queue_resource():
    # TODO: add additional properties here and pass them to create_resource function
    resource_name = st.session_state.get("resource_name")
    some_resource_parameter = st.session_state.get("some_resource_parameter")

    if not resource_name:
        st.error("Resource name cannot be empty")
        return

    result = create_resource(resource_name, some_resource_parameter)
    if result.is_ok():
        st.success("Resource created")
    else:
        st.error(result.get_message())
Copy

それから、 streamlit/native_sdk_api/resource_management.py ファイルの create_resource 関数を更新する必要があります:

def create_resource(resource_name, some_resource_parameter):
    ingestion_config = [{
        "id": "ingestionConfig",
        "ingestionStrategy": "INCREMENTAL",
        # TODO: HINT: scheduleType and scheduleDefinition are currently not supported out of the box, due to globalSchedule being used. However, a custom implementation of the scheduler can use those fields. They need to be provided becuase they are mandatory in the resourceDefinition.
        "scheduleType": "INTERVAL",
        "scheduleDefinition": "60m"
    }]
    # TODO: HINT: resource_id should allow identification of a table, endpoint etc. in the source system. It should be unique.
    resource_id = {
        "resource_name": resource_name,
    }
    id = f"{resource_name}_{random_suffix()}"

    # TODO: if you specified some additional resource parameters then you need to put them inside resource metadata:
    resource_metadata = {
        "some_resource_parameter": some_resource_parameter
    }

    return call_procedure("PUBLIC.CREATE_RESOURCE",
                          [
                              varchar_argument(id),
                              variant_argument(resource_id),
                              variant_list_argument(ingestion_config),
                              varchar_argument(id),
                              "true",
                              variant_argument(resource_metadata)
                          ])
Copy

CREATE_RESOURCE() プロシージャ・ロジックのカスタマイズ

PUBLIC.CREATE_RESOURCE() プロシージャは、メイン実行フローのいくつかの場所にプラグインされるカスタムロジックを実装することによって、開発者がその実行をカスタマイズすることができます。SDK によって、開発者は次のことができます。

  1. リソースを作成する前に検証します。このロジックは、 PUBLIC.CREATE_RESOURCE_VALIDATE() プロシージャに実装する必要があります。

  2. リソースが作成される前にカスタムオペレーションを実行します。このロジックは、 PUBLIC.PRE_CREATE_RESOURCE() プロシージャに実装する必要があります。

  3. リソース作成後にカスタムオペレーションを実行します。このロジックは、 PUBLIC.POST_CREATE_RESOURCE() プロシージャに実装する必要があります。

PUBLIC.CREATE_RESOURCE() プロシージャのカスタマイズについての詳細は、こちらをご覧ください。

TemplateCreateResourceHandler.java

このクラスは PUBLIC.CREATE_RESOURCE() プロシージャのハンドラーです。ここでは、前述のコールバック・プロシージャのハンドラのJava実装を注入することができます。デフォルトでは、プロシージャの実行時間を長くしてしまう SQL プロシージャの呼び出しから解放されるように、テンプレートはコールバックハンドラーのモックされた Java 実装を提供します。これらのモックされた実装は、成功レスポンスを返す以外には何もしません。テンプレートによって準備されたコールバック・クラスにカスタム実装を提供するか、これらのコールバックをゼロから作成し、ハンドラー・ビルダーでメイン・プロシージャの実行フローに注入することができます。

デフォルトで呼び出されるコールバック・メソッドのカスタム・ロジックを実装するには、コード内で以下のコメントを探します。

  • TODO: IMPLEMENT ME create resource validate

  • TODO: IMPLEMENT ME pre create resource callback

  • TODO: IMPLEMENT ME post create resource callback

インジェスチョン

データのインジェスチョンを実行するには、リソース構成に基づいてソースシステムとの接続を処理し、データを取得するクラスを実装する必要があります。SchedulerモジュールとTask Reactorモジュールは、インジェスションタスクのトリガーとキューを担当します。

インジェストロジックは TemplateIngestion クラスから呼び出されます。コード内の TODO: IMPLEMENT ME ingestion のコメントを探し、ランダムなデータ生成をソースシステムからのデータ取得に置き換えます。リ ソ ース定義にカ ス タ ムプ ロ パテ ィ を追加 し た場合は、 TemplateWorkItem で利用可能な ResourceIngestionDefinitionRepository とプロパティを用いて、 内部コネクタテーブルか らこれらを取得できます。

  • resourceIngestionDefinitionId

  • ingestionConfigurationId

ウェブサービスからデータを取得する例は以下のようになる かもしれません

public final class SourceSystemHttpHelper {

  private static final String DATA_URL = "https://source_system.com/data/%s";
  private static final SourceSystemHttpClient sourceSystemClient = new SourceSystemHttpClient();
  private static final ObjectMapper objectMapper = new ObjectMapper();

  private static List<Variant> fetchData(String resourceId) {
    var response = sourceSystemClient.get(String.format(url, resourceId));
    var body = response.body();

    try {
        return Arrays.stream(objectMapper.readValue(body, Map[].class))
              .map(Variant::new)
              .collect(Collectors.toList());
    } catch (JsonProcessingException e) {
      throw new RuntimeException("Cannot parse json", e);
    }
  }
}
Copy
public class SourceSystemHttpClient {

  private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(15);

  private final HttpClient client;
  private final String secret;

  public SourceSystemHttpClient() {
    this.client = HttpClient.newHttpClient();
    this.secret =
        SnowflakeSecrets.newInstance()
            .getGenericSecretString(ConnectionConfiguration.TOKEN_NAME);
  }

  public HttpResponse<String> get(String url) {
    var request =
        HttpRequest.newBuilder()
            .uri(URI.create(url))
            .GET()
            .header("Authorization", format("Bearer %s", secret))
            .header("Content-Type", "application/json")
            .timeout(REQUEST_TIMEOUT)
            .build();

    try {
      return client.send(request, HttpResponse.BodyHandlers.ofString());
    } catch (IOException | InterruptedException ex) {
      throw new RuntimeException(format("HttpRequest failed: %s", ex.getMessage()), ex);
    }
  }
}
Copy

リソースのライフサイクル管理

リソースの作成と取り込みのロジックが実装されたら、以下のプロシージャを呼び出すことで、リソースのライフサイクルを管理することができます:

  1. PUBLIC.ENABLE_RESOURCE() 特定のリソースを有効にします。つまり、そのリソースの取り込みがスケジュールされます。

  2. PUBLIC.DISABLE_RESOURCE() 特定のリソースを無効にします。つまり、そのリソースの取り込みスケジュールを停止します。

  3. PUBLIC.UPDATE_RESOURCE() を使用すると、特定のリソースの取り込み構成を更新できます。Streamlit UI には既定では実装されていません。コネクタユーザーがインジェスト構成をカスタマイズできるようにすることは、開発者にとって望ましくない場合があるからです(このプロシージャの使用を完全に禁止するには、アプリケーションロール ADMIN へのプロシージャの許可を取り消します)。

これらのプロシージャはすべてJavaハンドラーを持ち、実行をカスタマイズできるコールバックで拡張されています。これらのハンドラー用のビルダーを使って、コールバックのカスタム実装を注入することができます。デフォルトでは、テンプレートはコールバックハンドラーのモックされたJava実装を提供します。これらのモックされた実装は、成功レスポンスを返す以外には何もしません。テンプレートによって準備されたコールバッククラスにカスタム実装を提供するか、これらのコールバックをゼロから作成し、ハンドラービルダーのメインプロシージャ実行フローに注入することができます。

TemplateEnableResourceHandler.java

このクラスは、 PUBLIC.ENABLE_RESOURCE() プロシージャのハンドラーであり、専用のコールバックで拡張できます。

  1. 有効化する前にリソースを検証します。カスタム実装を提供するために、 TODO: IMPLEMENT ME enable resource validate のコメントをコードで探してください。

  2. リソースが有効になる前にカスタムオペレーションを実行します。カスタム実装を提供するために、 TODO: IMPLEMENT ME pre enable resource のコメントをコードで探してください。

  3. リソースが有効になった後にカスタムオペレーションを実行します。カスタム実装を提供するために、 TODO: IMPLEMENT ME post enable resource のコメントをコードで探してください。

詳しくは、 PUBLIC.ENABLE_RESOURCE() プロシージャの詳細ドキュメントをご覧ください。

TemplateDisableResourceHandler.java

このクラスは、 PUBLIC.DISABLE_RESOURCE() プロシージャのハンドラーであり、専用のコールバックで拡張できます。

  1. 無効にする前にリソースを検証します。カスタム実装を提供するために、 TODO: IMPLEMENT ME disable resource validate のコメントをコードで探してください。

  2. リソースが無効になる前にカスタムオペレーションを実行します。カスタム実装をプロバイダーするために、 TODO: IMPLEMENT ME pre disable resource のコメントを探してください。

詳しくは、 PUBLIC.DISABLE_RESOURCE() プロシージャの詳細ドキュメントをご覧ください。

TemplateUpdateResourceHandler.java

このクラスは、 PUBLIC.UPDATE_RESOURCE() プロシージャのハンドラーであり、専用のコールバックで拡張できます。

  1. リソースを更新する前に検証します。カスタム実装を提供するために、 TODO: IMPLEMENT ME update resource validate のコメントをコードで探してください。

  2. リソースが更新される前にカスタムオペレーションを実行します。カスタム実装を提供するために、 TODO: IMPLEMENT ME pre update resource のコメントをコードで探してください。

  3. リソースの更新後にカスタムオペレーションを実行します。カスタム実装を提供するために、 TODO: IMPLEMENT ME post update resource のコメントをコードで探してください。

詳しくは、 PUBLIC.UPDATE_RESOURCE() プロシージャの詳細ドキュメントをご覧ください。

設定

テンプレートには、以前に実行されたすべての構成を表示できる設定タブが含まれています。しかし、構成プロパティがカスタマイズされている場合、この表示にもカスタマイズが必要です。設定タブのコードは streamlit/daily_use/settings_page.py ファイルにあります。

カスタマイズするには、それぞれの構成で追加されたキーの構成から値を抽出します。例えば、以前の additional_connection_property が接続構成ステップで追加された場合、設定表示では次のように追加できます。

def connection_config_page():
    current_config = get_connection_configuration()

    # TODO: implement the display for all the custom properties defined in the connection configuration step
    custom_property = current_config.get("custom_connection_property", "")
    additional_connection_property = current_config.get("additional_connection_property", "")


    st.header("Connector configuration")
    st.caption("Here you can see the connector connection configuration saved during the connection configuration step "
               "of the Wizard. If some new property was introduced it has to be added here to display.")
    st.divider()

    st.text_input(
        "Custom connection property:",
        value=custom_property,
        disabled=True
    )
    st.text_input(
        "Additional connection property:",
        value=additional_connection_property,
        disabled=True
    )
    st.divider()
Copy