チュートリアル: Native SDK for ConnectorsのJavaテンプレート

概要

Snowflake Native SDK for Connectors を使用したコネクタテンプレートのチュートリアルへようこそ。このガイドでは、シンプルなConnector Native Applicationの設定について説明します。

このチュートリアルでは、次の方法を学習します。

  • Connector Native Applicationのデプロイ

  • データを取り込むためのテンプレートコネクタを構成する

  • テンプレートコネクタを自分のニーズに合わせてカスタマイズする

テンプレートにはコード内にさまざまな役立つコメントが含まれており、変更が必要な特定のファイルを簡単に見つけることができます。次のキーワードを含むコメントを探します。そのキーワードを使用すると、独自のコネクタを実装するのに役立ちます。

  • TODO

  • TODO: HINT

  • TODO: IMPLEMENT ME

このチュートリアルを始める前に、次の推奨コンテンツを確認して準備してください。

前提条件

開始する前に、次の要件を満たしていることを確認してください。

  • Java 11のインストール

  • ACCOUNTADMIN ロールを使用したSnowflakeアカウントへのアクセス

  • ローカルマシンで variable_substitution および exit_on_error が構成された SnowSQL (CLI クライアント) ツール

  • コネクタ用Snowflake Native SDK のドキュメントページを確認し、オンラインで開いたままにしておくか、ブラウザーから印刷してください。 Connector Native Java SDK のクイックスタートを確認してください(オプションですが推奨)。クイックスタートでは、テンプレートに基づくサンプルコネクタが使用され、これを参照してさまざまなコンポーネントのサンプル実装を確認できます。

初期化とデプロイ

プロジェクトを初期化するには、 GitHub <https://github.com/snowflakedb/connectors-native-sdk>`_ から Native SDK for Connectorsリポジトリをクローンし、 `/templates/native-sdk-connectors-java-template`` ディレクトリを目的のプロジェクトの場所にコピーします。このテンプレートには、動作するConnector Native Applicationのデプロイに必要なすべてのコードが含まれています。これが完了すると、テンプレートをデプロイする準備が整います。

デプロイ

テンプレートはすぐにデプロイできる状態になっており、プロセス全体を処理する便利なスクリプトが提供されます。Connectorをデプロイする前に、 snowsql 接続を指定する必要があります。実行するには、 Makefile を開き、 CONNECTION 環境変数に接続名を入力します。

アプリケーションをすばやくデプロイするには、テンプレートのメインディレクトリに移動し、次のコマンドを実行します。

make reinstall_application_from_version_dir
Copy

このコマンドは次を実行します。

  • 以前に存在していた APPLICATION および APPLICATION PACKAGE をSnowflakeアカウントから削除します。

  • jarから抽出された SDK jarおよびsqlファイルをターゲットの sf_build ディレクトリにコピーします。

  • アプリケーションのカスタムstreamlitおよびjavaコンポーネントを sf_build ディレクトリにコピーします。

  • Snowflakeアカウント内の sf_build ディレクトリにあるファイルから新しい APPLICATION PACKAGE を作成します。

  • Snowflakeアカウント内に新しい APPLICATION インスタンスを作成します。

このプロセスが完了するまでに2~3分ほどかかります。完了後、Snowflake内の Data Products -> Apps タブに移動すると、Connectorが表示されます。アプリケーションの数が多くて見つけるのが困難な場合は、検索バーに NATIVE_SDK_CONNECTOR_TEMPLATE と入力するか、カスタム APPLICATION 名の場合には、代わりにカスタム名を使用してください。このConnectorは構成する準備ができています。次のステップでは、プロセスについて紹介し、その過程で各ステップをカスタマイズする方法を説明します。

このチュートリアルのステップの途中でコネクタを再デプロイする必要がある場合(変更をテストする場合など)は、上記のコマンドを再実行してください。

前提条件のステップ

デプロイ直後のConnectorはウィザードフェーズになります。このフェーズは、エンドユーザーに必要なすべての構成をガイドするいくつかのステップで構成されます。最初のステップは前提条件のステップです。これはオプションであり、すべてのコネクタに必要なわけではありません。前提条件は通常、ワークシートを介してクエリを実行したり、ソースシステム側で何らかの構成を実行したりするなど、アプリケーション外部のユーザーから要求されるアクションです。

前提条件の詳細については、次をご参照ください。

各前提条件の内容は、コネクタ内の内部のテーブル(STATE.PREREQUISITES)から直接取得されます。 setup.sql スクリプトでカスタマイズできます。ただし、 setup.sql スクリプトはアプリケーションのインストール、アップグレード、ダウングレードのたびに実行されることに注意してください。挿入はべき等性を持つ必要があるため、次の例のようにマージクエリを使用することをお勧めします。

MERGE INTO STATE.PREREQUISITES AS dest
USING (SELECT * FROM VALUES
           ('1',
            'Sample prerequisite',
            'Prerequisites can be used to notice the end user of the connector about external configurations. Read more in the SDK documentation below. This content can be modified inside `setup.sql` script',
            'https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/prerequisites',
            NULL,
            NULL,
            1
           )
) AS src (id, title, description, documentation_url, learnmore_url, guide_url, position)
ON dest.id = src.id
WHEN NOT MATCHED THEN
    INSERT (id, title, description, documentation_url, learnmore_url, guide_url, position)
    VALUES (src.id, src.title, src.description, src.documentation_url, src.learnmore_url, src.guide_url, src.position);
Copy

コネクタ構成のステップ

ウィザードフェーズの次のステップは、コネクタ構成のステップです。このステップでは、コネクタに必要なデータベースオブジェクトと権限を構成できます。このステップでは、次の構成プロパティを指定できます。

  • warehouse

  • destination_database

  • destination_schema

  • operational_warehouse

  • global_schedule

  • data_owner_role

  • agent_username

  • agent_role

その他のカスタムプロパティが必要な場合は、ウィザードフェーズの次のステップのいずれかで構成できます。各プロパティの詳細については、次をご参照ください。

さらに、テンプレートで提供されるStreamlitコンポーネント(streamlit/wizard/connector_config.py)は、 permissions-sdk をトリガーする方法を示し、エンドユーザーからいくつかの許可を要求します。使用可能なプロパティがコネクタのニーズを満たしている限り、バックエンドクラスを上書きする必要はありませんが、構成のステップを追加してコンポーネントと同じ方法で上書きすることもできます。

内部プロシージャとJavaオブジェクトの詳細については、次をご参照ください。

提供されたStreamlitの例では、 create databaseexecute tasks などのアカウントレベルの許可をリクエストできます。また、ユーザーは permissions-sdk ポップアップからウェアハウス参照を指定することもできます。

テンプレートでは、ユーザーは destination_database および destination_schema のみを入力するように求められます。ただし、 streamlit/wizard/connector_configuration.py 内の TODO のコメントには、Streamlit UI により多くの入力ボックスを表示するために再利用できるコメント付きコードが含まれています。

# TODO: Here you can add additional fields in connector configuration. Supported values are the following: warehouse, operational_warehouse, data_owner_role, agent_role, agent_username
# For example:
st.subheader("Operational warehouse")
input_col, _ = st.columns([2, 1])
with input_col:
    st.text_input("", key="operational_warehouse", label_visibility="collapsed")
st.caption("Name of the operational warehouse to be used")
Copy

接続構成のステップ

ウィザードフェーズの次のステップは、接続構成のステップです。このステップでは、エンドユーザーはコネクタの外部接続パラメーターを構成できます。この構成には、シークレットや統合などのオブジェクトの識別子が含まれる場合があります。これは、コネクタによって取り込まれるデータのソースシステムによって異なるため、ソースコードで大規模なカスタマイズを行う必要がある最初の場所です。

接続構成の詳細については、次をご参照ください。

Streamlit UI 側(streamlit/wizard/connection_config.py ファイル)から始めて、必要なすべてのパラメーターのテキストボックスを追加する必要があります。サンプルのテキストボックスが実装されており、このファイル内のコードを検索すると、新しいフィールドのコメント付きコードを含む TODO が見つかります。

# TODO: Additional configuration properties can be added to the UI like this:
st.subheader("Additional connection parameter")
input_col, _ = st.columns([2, 1])
with input_col:
    st.text_input("", key="additional_connection_property", label_visibility="collapsed")
st.caption("Some description of the additional property")
Copy

プロパティをフォームに追加したら、コネクタのバックエンドレイヤーに渡す必要があります。そのためには、Streamlitファイルの2か所を追加で変更する必要があります。1つ目は、 streamlit/wizard/connection_config.py ファイルの finish_config 関数です。新しく追加されたテキストボックスの状態をここで読み取る必要があります。さらに、必要に応じて検証し、 set_connection_configuration 関数に渡すこともできます。たとえば、 additional_connection_property が追加された場合、編集後は次のように表示されます。

def finish_config():
try:
    # TODO: If some additional properties were specified they need to be passed to the set_connection_configuration function.
    # The properties can also be validated, for example, check whether they are not blank strings etc.
    response = set_connection_configuration(
        custom_connection_property=st.session_state["custom_connection_property"],
        additional_connection_property=st.session_state["additional_connection_property"],
    )

# rest of the method without changes
Copy

2つ目は、 streamlit/native_sdk_api/connection_config.py ファイルにある set_connection_configuration 関数を編集する必要があります。この関数は、Streamlit UI と、コネクタのバックエンドへのエントリポイントである基礎となる SQL プロシージャ間のプロキシです。

def set_connection_configuration(custom_connection_property: str, additional_connection_property: str):
    # TODO: this part of the code sends the config to the backend so all custom properties need to be added here
    config = {
        "custom_connection_property": escape_identifier(custom_connection_property),
        "additional_connection_property": escape_identifier(additional_connection_property),
    }

    return call_procedure(
        "PUBLIC.SET_CONNECTION_CONFIGURATION",
        [variant_argument(config)]
    )
Copy

これを実行すると、新しいプロパティが構成を含む内部コネクタテーブルに保存されます。しかし、可能なカスタマイズはこれで終わりではありません。一部のバックエンドコンポーネントはカスタマイズできます。コード内で次のコメントを探して見つけてください。

  • TODO: IMPLEMENT ME connection configuration validate

  • TODO: IMPLEMENT ME connection callback

  • TODO: IMPLEMENT ME test connection

検証部分では、 UI から受信したデータについて、追加の検証を実行できます。また、データを小文字にしたり、トリミングしたり、指定された名前のオブジェクトが実際にSnowflake内に存在するかどうかを確認したりするなど、データを変換することもできます。

接続コールバックは、外部アクセス統合を使用する必要があるプロシージャの変更など、構成に基づいて追加の操作を実行できる部分です。

テスト接続は、接続構成の最後のコンポーネントであり、コネクタとソースシステム間で接続を確立できるかどうかを確認します。

内部コンポーネントの詳細については、次をご参照ください。

実装例は次のようになります。

public class TemplateConfigurationInputValidator implements ConnectionConfigurationInputValidator {

    private static final String ERROR_CODE = "INVALID_CONNECTION_CONFIGURATION";

    @Override
    public ConnectorResponse validate(Variant config) {
      // TODO: IMPLEMENT ME connection configuration validate: If the connection configuration input
      // requires some additional validation this is the place to implement this logic.
      // See more in docs:
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
      var integrationCheck = checkParameter(config, INTEGRATION_PARAM, false);
      if (!integrationCheck.isOk()) {
        return integrationCheck;
      }

      var secretCheck = checkParameter(config, SECRET_PARAM, true);
      if (!secretCheck.isOk()) {
        return ConnectorResponse.error(ERROR_CODE);
      }

      return ConnectorResponse.success();
    }
}
Copy
public class TemplateConnectionConfigurationCallback implements ConnectionConfigurationCallback {

    private final Session session;

    public TemplateConnectionConfigurationCallback(Session session) {
      this.session = session;
    }

    @Override
    public ConnectorResponse execute(Variant config) {
      // TODO: If you need to alter some procedures with external access you can use
      // configureProcedure method or implement a similar method on your own.
      // TODO: IMPLEMENT ME connection callback: Implement the custom logic of changes in application
      // to be done after connection configuration, like altering procedures with external access.
      // See more in docs:
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
      configureProcedure(format("PUBLIC.TEST_CONNECTION()"), config);

      return ConnectorResponse.success();
    }
}
Copy
public class TemplateConnectionValidator {

    private static final String ERROR_CODE = "TEST_CONNECTION_FAILED";

    public static Variant testConnection(Session session) {
      // TODO: IMPLEMENT ME test connection: Implement the custom logic of testing the connection to
      // the source system here. This usually requires connection to some webservice or other external
      // system. It is suggested to perform only the basic connectivity validation here.
      // If that's the case then this procedure must be altered in TemplateConnectionConfigurationCallback first.
      // See more in docs:
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
      return test().toVariant();
    }

    private static ConnectorResponse test() {
      try {
        var response = SourceSystemHttpHelper.testEndpoint();

        if (isSuccessful(response.statusCode())) {
          return ConnectorResponse.success();
        } else {
          return ConnectorResponse.error(ERROR_CODE, "Connection to source system failed");
        }
      } catch (Exception exception) {
        return ConnectorResponse.error(ERROR_CODE, "Test connection failed");
      }
    }
}
Copy

構成ステップをファイナライズする

コネクタ構成をファイナライズするステップは、ウィザードフェーズの最終ステップです。このステップには複数の責任があります。まず、ユーザーはコネクタに必要な追加の構成を指定できます。次に、シンクデータベース、スキーマ、および必要に応じて取り込まれたデータ用のテーブルとビューを作成します。最後に、スケジューラーやタスクリアクターなどの内部コンポーネントを初期化します。

構成のファイナライズの詳細については、次をご参照ください。

タスクリアクターとスケジューリングの詳細については、次をご参照ください。

接続構成ステップと同様に、カスタマイズはStreamlit UI を使用して開始できます。 streamlit/wizard/finalize_config.py には、サンプルプロパティを含むフォームが含まれています。コネクタのニーズに応じて、さらにプロパティを追加できます。別のプロパティを追加するには、上記のファイルに新しいプロパティを追加するサンプルコードを含む TODO コメントを探します。

# TODO: Here you can add additional fields in finalize connector configuration.
# For example:
st.subheader("Some additional property")
input_col, _ = st.columns([2, 1])
with input_col:
    st.text_input("", key="some_additional_property", label_visibility="collapsed")
st.caption("Description of some new additional property")
Copy

新しいプロパティのテキストボックスを追加した後、それをバックエンドに渡す必要があります。実行するには、同じファイル内の finalize_configuration 関数を変更します。

def finalize_configuration():
    try:
        st.session_state["show_main_error"] = False
        # TODO: If some additional properties were introduced, they need to be passed to the finalize_connector_configuration function.
        response = finalize_connector_configuration(
            st.session_state.get("custom_property"),
            st.session_state.get("some_additional_property")
        )
Copy

次に、 streamlit/native_sdk_api/finalize_config.py を開いて次の関数に追加します。

def finalize_connector_configuration(custom_property: str, some_additional_property: str):
    # TODO: If some custom properties were configured, then they need to be specified here and passed to the FINALIZE_CONNECTOR_CONFIGURATION procedure.
    config = {
        "custom_property": custom_property,
        "some_additional_property": some_additional_property,
    }
    return call_procedure(
        "PUBLIC.FINALIZE_CONNECTOR_CONFIGURATION",
        [variant_argument(config)]
    )
Copy

繰り返しになりますが、接続構成ステップと同様に、このステップでもさまざまなバックエンドコンポーネントのカスタマイズが可能であり、コード内で次のフレーズを使用して見つけることができます。

  • TODO: IMPLEMENT ME validate source

  • TODO: IMPLEMENT ME finalize internal

ソースの検証では、ソースシステムでより高度な検証を実行します。以前のテスト接続で接続を確立できることのみがチェックされた場合、ソースの検証では、たとえば単一のデータ記録を抽出するなど、システム内の特定のデータへのアクセスをチェックできます。

内部のファイナライズは、タスクリアクターとスケジューラーを初期化し、シンクデータベースと必要なネストされたオブジェクトを作成する内部プロシージャです。また、ファイナライズステップ中に提供された構成を保存するためにも使用できます(この構成はデフォルトでは保存されません)。

内部コンポーネントの詳細については、次をご参照ください。

さらに、入力は FinalizeConnectorInputValidator インターフェイスを使用して検証し、それをファイナライズハンドラーに提供することもできます(TemplateFinalizeConnectorConfigurationCustomHandler を確認してください)。ビルダーの使用に関する詳細については、次をご参照ください。

検証ソースの実装例は次のようになります。

public class SourceSystemAccessValidator implements SourceValidator {

    @Override
    public ConnectorResponse validate(Variant variant) {
      // TODO: IMPLEMENT ME validate source: Implement the custom logic of validating the source
      // system. In some cases this can be the same validation that happened in
      // TemplateConnectionValidator.
      // However, it is suggested to perform more complex validations, like specific access rights to
      // some specific resources here.
      // See more in docs:
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/finalize_configuration_reference
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/finalize_configuration
      var finalizeProperties = Configuration.fromCustomConfig(variant);

      var httpResponse = SourceSystemHttpHelper.validateSource(finalizeProperties.get("custom_property"));
      return prepareConnectorResponse(httpResponse.statusCode());
    }

    private ConnectorResponse prepareConnectorResponse(int statusCode) {
      switch (statusCode) {
        case 200:
          return ConnectorResponse.success();
        case 401:
          return ConnectorResponse.error("Unauthorized error");
        case 404:
          return ConnectorResponse.error("Not found error");
        default:
          return ConnectorResponse.error("Unknown error");
      }
    }
}
Copy

リソースを作成する

ウィザードフェーズが完了すると、コネクタはデータの取り込みを開始する準備が整います。ただし、最初にリソースを実装して構成する必要があります。リソースは、テーブル、エンドポイント、ファイルなど、ソースシステム内の特定のデータセットを記述する抽象化です。

さまざまなソースシステムでは、リソースに関するさまざまな情報が必要になる場合があります。そのため、リソース定義は特定のニーズに応じてカスタマイズする必要があります。実行するには、 streamlit/daily_use/data_sync_page.py ファイルに移動します。リソースパラメーター用のテキストボックスを追加する方法に関する TODO を確認できます。リソースパラメーターにより、ソースシステムからのデータの識別と取得が可能になる必要があります。これらのパラメーターは、取り込み中に抽出できます。

# TODO: specify all the properties needed to define a resource in the source system. A subset of those properties should allow for a identification of a single resource, be it a table, endpoint, repository or some other data storage abstraction
st.text_input(
    "Resource name",
    key="resource_name",
)
st.text_input(
    "Some resource parameter",
    key="some_resource_parameter"
)
Copy

必要なプロパティがすべてフォームに追加されると、それらをバックエンドに渡すことができます。まず、テキストフィールドの状態を抽出し、 streamlit/daily_use/data_sync_page.py の API レベル queue_resource メソッドに渡す必要があります。

def queue_resource():
    # TODO: add additional properties here and pass them to create_resource function
    resource_name = st.session_state.get("resource_name")
    some_resource_parameter = st.session_state.get("some_resource_parameter)

    if not resource_name:
        st.error("Resource name cannot be empty")
        return

    result = create_resource(resource_name, some_resource_parameter)
    if result.is_ok():
        st.success("Resource created")
    else:
        st.error(result.get_message())
Copy

次に、 streamlit/native_sdk_api/resource_management.pycreate_resource 関数を更新する必要があります。

def create_resource(resource_name, some_resource_parameter):
    ingestion_config = [{
        "id": "ingestionConfig",
        "ingestionStrategy": "INCREMENTAL",
        # TODO: HINT: scheduleType and scheduleDefinition are currently not supported out of the box, due to globalSchedule being used. However, a custom implementation of the scheduler can use those fields. They need to be provided becuase they are mandatory in the resourceDefinition.
        "scheduleType": "INTERVAL",
        "scheduleDefinition": "60m"
    }]
    # TODO: HINT: resource_id should allow identification of a table, endpoint etc. in the source system. It should be unique.
    resource_id = {
        "resource_name": resource_name,
    }
    id = f"{resource_name}_{random_suffix()}"

    # TODO: if you specified some additional resource parameters then you need to put them inside resource metadata:
    resource_metadata = {
        "some_resource_parameter": some_resource_parameter
    }

    return call_procedure("PUBLIC.CREATE_RESOURCE",
                          [
                              varchar_argument(id),
                              variant_argument(resource_id),
                              variant_list_argument(ingestion_config),
                              varchar_argument(id),
                              "true",
                              variant_argument(resource_metadata)
                          ])
Copy

CREATE_RESOURCE() プロシージャ・ロジックのカスタマイズ

PUBLIC.CREATE_RESOURCE() プロシージャは、メインの実行フローのいくつかの場所にプラグインされる独自のロジックを実装することによって、開発者がその実行をカスタマイズすることができます。SDK によって、開発者は次のことができます。

  1. リソースを作成する前に検証します。このロジックは、 PUBLIC.CREATE_RESOURCE_VALIDATE() プロシージャーで実装されるべきです。

  2. リソースが作成される前に、いくつかのカスタム操作を行います。このロジックは、 PUBLIC.PRE_CREATE_RESOURCE() プロシージャーで実装されるべきです。

  3. リソースが作成された後、いくつかのカスタム操作を行います。このロジックは、 PUBLIC.POST_CREATE_RESOURCE() プロシージャーで実装されるべきです。

PUBLIC.CREATE_RESOURCE() プロシージャのカスタマイズについての詳細は、こちらをご覧ください。

TemplateCreateResourceHandler.java

このクラスは PUBLIC.CREATE_RESOURCE() プロシージャのハンドラーです。ここでは、前述のコールバック・プロシージャのハンドラのJava実装を注入することができます。デフォルトでは、TemplateはコールバックハンドラのモックされたJava実装を提供し、プロシージャ全体の実行時間を延長する SQL プロシージャの呼び出しから解放されます。Javaの実装は実行を高速化します。これらのモックされた実装は、成功レスポンスを返す以外には何もしません。テンプレートによって準備されたコールバック・クラスにカスタム実装を提供するか、これらのコールバックをゼロから作成し、ハンドラー・ビルダーでメイン・プロシージャの実行フローに注入することができます。

デフォルトで呼び出されるコールバック・メソッドにカスタム・ロジックを実装するには、コード内で以下のフレーズを探します。

  • TODO: IMPLEMENT ME create resource validate

  • TODO: IMPLEMENT ME pre create resource callback

  • TODO: IMPLEMENT ME post create resource callback

インジェスチョン

データのインジェスチョンを実行するには、リソース構成に基づいてソースシステムとの接続を処理し、データを取得するクラスを実装する必要があります。スケジューラーモジュールとタスクリアクターモジュールは、インジェスチョンタスクのトリガーとキューイングを処理します。

インジェスチョンロジックは TemplateIngestion クラスから呼び出されます。コード内で TODO: IMPLEMENT ME ingestion を探し、ランダムなデータ生成をソースシステムからのデータ取得に置き換えます。リソース定義にカスタムプロパティを追加した場合は、 ResourceIngestionDefinitionRepository および TemplateWorkItem で使用可能なプロパティを使用して、内部コネクタテーブルから取得できます。

  • resourceIngestionDefinitionId

  • ingestionConfigurationId

たとえば、あるウェブサービス MIGHT からデータを取得する場合は次のようになります。

public final class SourceSystemHttpHelper {

  private static final String DATA_URL = "https://source_system.com/data/%s";
  private static final SourceSystemHttpClient sourceSystemClient = new SourceSystemHttpClient();
  private static final ObjectMapper objectMapper = new ObjectMapper();

  private static List<Variant> fetchData(String resourceId) {
    var response = sourceSystemClient.get(String.format(url, resourceId));
    var body = response.body();

    try {
        return Arrays.stream(objectMapper.readValue(body, Map[].class))
              .map(Variant::new)
              .collect(Collectors.toList());
    } catch (JsonProcessingException e) {
      throw new RuntimeException("Cannot parse json", e);
    }
  }
}
Copy
public class SourceSystemHttpClient {

  private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(15);

  private final HttpClient client;
  private final String secret;

  public SourceSystemHttpClient() {
    this.client = HttpClient.newHttpClient();
    this.secret =
        SnowflakeSecrets.newInstance()
            .getGenericSecretString(ConnectionConfiguration.TOKEN_NAME);
  }

  public HttpResponse<String> get(String url) {
    var request =
        HttpRequest.newBuilder()
            .uri(URI.create(url))
            .GET()
            .header("Authorization", format("Bearer %s", secret))
            .header("Content-Type", "application/json")
            .timeout(REQUEST_TIMEOUT)
            .build();

    try {
      return client.send(request, HttpResponse.BodyHandlers.ofString());
    } catch (IOException | InterruptedException ex) {
      throw new RuntimeException(format("HttpRequest failed: %s", ex.getMessage()), ex);
    }
  }
}
Copy

リソースのライフサイクル管理

リソースの作成とインジェスチョンのロジックが実装されたら、以下の手順でリソースのライフサイクルを管理することができます。

  1. PUBLIC.ENABLE_RESOURCE() - このプロシージャは、特定のリソースを有効にします。つまり、そのリソースのインジェスチョンがスケジュールされることを意味します。

  2. PUBLIC.DISABLE_RESOURCE() - このプロシージャは特定のリソースを無効にします。つまり、そのリソースのインジェスチョンスケジューリングが停止されます。

  3. PUBLIC.UPDATE_RESOURCE() - この手順により、特定のリソースのインジェスト設定を更新することができます。これはデフォルトでは Streamlit UI には実装されていません。なぜなら、コネクタのユーザーがインジェスト設定をカスタマイズできるようにすることが、開発者にとって望ましくない場合があるからです(このプロシージャの使用を完全に禁止するには、アプリケーションロール ACCOUNTADMIN への付与を取り消します)。

これらのプロシージャはすべてJavaハンドラーを持ち、その実行をカスタマイズできるコールバックで拡張されています。これらのハンドラーのビルダーを使って、コールバックのカスタム実装を注入することができます。デフォルトでは、TemplateはコールバックハンドラのモックJava実装を提供し、 SQL プロシージャを呼び出すことで、プロシージャの実行時間が長くなるのを防ぎます。これらのモックされた実装は、成功レスポンスを返す以外には何もしません。テンプレートによって準備されたコールバッククラスにカスタム実装を提供するか、これらのコールバックをゼロから作成し、ハンドラービルダーのメインプロシージャ実行フローに注入することができます。

TemplateEnableResourceHandler.java

このクラスは、 PUBLIC.ENABLE_RESOURCE() プロシージャのハンドラーであり、専用のコールバックで拡張できます。

  1. 有効化する前にリソースを検証します。カスタム実装を提供するために、コード内の TODO: IMPLEMENT ME enable resource validate フレーズを探します。

  2. リソースが有効になる前に、いくつかのカスタム操作を行います。カスタム実装を提供するために、コード内の TODO: IMPLEMENT ME pre enable resource フレーズを探します。

  3. リソースが有効になった後、いくつかのカスタム操作を行います。カスタム実装を提供するために、コード内の TODO: IMPLEMENT ME post enable resource フレーズを探します。

詳しくは、 PUBLIC.ENABLE_RESOURCE() プロシージャの詳細ドキュメントをご覧ください。

TemplateDisableResourceHandler.java

このクラスは、 PUBLIC.DISABLE_RESOURCE() プロシージャのハンドラーであり、専用のコールバックで拡張できます。

  1. 無効にする前にリソースを検証します。カスタム実装を提供するために、コード内の TODO: IMPLEMENT ME disable resource validate フレーズを探します。

  2. リソースが無効になる前に、いくつかのカスタム操作を行います。カスタム実装を提供するために、コード内の TODO: IMPLEMENT ME pre disable resource フレーズを探します。

詳しくは、 PUBLIC.DISABLE_RESOURCE() プロシージャの詳細ドキュメントをご覧ください。

TemplateUpdateResourceHandler.java

このクラスは、 PUBLIC.UPDATE_RESOURCE() プロシージャのハンドラーであり、専用のコールバックで拡張できます。

  1. リソースを更新する前に検証します。カスタム実装を提供するために、コード内の TODO: IMPLEMENT ME update resource validate フレーズを探します。

  2. リソースが更新される前に、いくつかのカスタム操作を行います。カスタム実装を提供するために、コード内の TODO: IMPLEMENT ME pre update resource フレーズを探します。

  3. リソースが更新された後、いくつかのカスタム操作を行います。カスタム実装を提供するために、コード内の TODO: IMPLEMENT ME post update resource フレーズを探します。

詳しくは、 PUBLIC.UPDATE_RESOURCE() プロシージャの詳細ドキュメントをご覧ください。

設定

テンプレートには、以前に実行されたすべての構成を表示できる設定タブが含まれています。ただし、構成プロパティがカスタマイズされている場合は、このビューもカスタマイズする必要があります。設定タブのコードは streamlit/daily_use/settings_page.py ファイルにあります。カスタマイズするには、それぞれの構成で追加されたキーの構成から値を抽出します。

たとえば、前に接続構成ステップで additional_connection_property を追加した場合は、次のように追加できます。

def connection_config_page():
    current_config = get_connection_configuration()

    # TODO: implement the display for all the custom properties defined in the connection configuration step
    custom_property = current_config.get("custom_connection_property", "")
    additional_connection_property = current_config.get("additional_connection_property", "")


    st.header("Connector configuration")
    st.caption("Here you can see the connector connection configuration saved during the connection configuration step "
               "of the Wizard. If some new property was introduced it has to be added here to display.")
    st.divider()

    st.text_input(
        "Custom connection property:",
        value=custom_property,
        disabled=True
    )
    st.text_input(
        "Additional connection property:",
        value=additional_connection_property,
        disabled=True
    )
    st.divider()
Copy