Tutorial: Vorlage für Native SDK for Connectors Java

Einführung

Willkommen bei unserem Tutorial zur Verwendung einer Konnektorvorlage mit Snowflake Native SDK for Connectors. Diese Anleitung hilft Ihnen bei der Einrichtung einer einfachen nativen Konnektoranwendung.

In diesem Tutorial lernen Sie Folgendes:

  • Bereitstellen einer nativen Konnektoranwendung

  • Konfigurieren einer Konnektorvorlage für die Datenaufnahme (Ingestion)

  • Anpassen einer Konnektorvorlage an Ihre Anforderungen

Die Vorlage enthält verschiedene hilfreiche Kommentare im Code, die es Ihnen erleichtern, bestimmte Dateien zu finden, die geändert werden müssen. Achten Sie auf Kommentare mit den folgenden Stichwörtern, diese werden Sie leiten und Ihnen helfen, Ihren eigenen Konnektor zu implementieren:

  • TODO

  • TODO: HINT

  • TODO: IMPLEMENT ME

Bevor Sie mit diesem Tutorial beginnen, sollten Sie sich mit den folgenden empfohlenen Inhalten vertraut machen:

Voraussetzungen

Bevor Sie beginnen, müssen Sie sicherstellen, dass Sie folgende Voraussetzungen erfüllen:

  • Java 11 installiert

  • Zugriff auf das Snowflake-Konto mit der Rolle ACCOUNTADMIN

  • Tool SnowSQL (CLI-Client) mit variable_substitution und exit_on_error auf Ihrem lokalen Computer konfiguriert

  • Lesen Sie die Dokumentationsseite: Snowflake Native SDK für Konnektoren und halten Sie sie online geöffnet, oder drucken Sie sie aus Ihrem Browser aus. Lesen Sie diesen Quickstart: Connector Native Java SDK (optional, aber empfohlen). Der Schnellstart verwendet einen Beispielkonnektor, der auf einer Vorlage basiert und auf den Sie verweisen können, um sich Beispielimplementierungen verschiedener Komponenten anzusehen.

Initialisierung und Bereitstellung

Um ein Projekt zu initialisieren, klonen Sie das Native SDK for Connectors-Repository von GitHub und kopieren das Verzeichnis /templates/native-sdk-connectors-java-template an den gewünschten Projektspeicherort. Diese Vorlage enthält den gesamten Code, der für die Bereitstellung einer funktionierenden Connector Native-Anwendung erforderlich ist. Sobald dies erfolgt ist, ist die Vorlage bereit für die Bereitstellung.

Bereitstellung

Die Vorlage ist bereit für die Bereitstellung und bietet ein komfortables Skript, das den gesamten Prozess für Sie übernimmt. Bevor Sie den Konnektor einsetzen, muss eine snowsql-Verbindung angegeben werden. Öffnen Sie dazu Makefile, und tragen Sie den Namen der Verbindung in die Umgebungsvariable CONNECTION ein.

Um die Anwendung schnell bereitzustellen, wechseln Sie in das Hauptverzeichnis der Vorlage und führen Sie den folgenden Befehl aus:

make reinstall_application_from_version_dir
Copy

Dieser Befehl führt Folgendes aus:

  • Entfernt bestehende APPLICATION- und APPLICATION PACKAGE-Instanzen aus dem Snowflake-Konto.

  • Kopiert die JAR- und SQL-Dateien des SDK, die aus der JAR-Datei extrahiert wurden, in das Zielverzeichnis sf_build.

  • Kopiert die kundenspezifischen Streamlit- und Java-Komponenten der Anwendung in das Verzeichnis sf_build.

  • Erstellt eine neue APPLICATION PACKAGE-Instanz aus den Dateien im Verzeichnis sf_build innerhalb eines Snowflake-Kontos.

  • Erstellt eine neue APPLICATION-Instanz innerhalb eines Snowflake-Kontos.

Dieser Prozess dauert etwa 2-3 Minuten. Navigieren Sie anschließend in Snowflake zur Registerkarte Data Products -> Apps, wo Ihr Konnektor angezeigt werden sollte. Wenn Sie viele Anwendungen haben und Schwierigkeiten haben, den Konnektor zu finden, versuchen Sie, NATIVE_SDK_CONNECTOR_TEMPLATE in die Suchleiste einzugeben, oder im Falle eines kundenspezifische APPLICATION-Namens verwenden Sie stattdessen den kundenspezifischen Namen. Dieser Konnektor ist bereit für die Konfiguration. Die folgenden Schritte führen Sie durch den Prozess und erklären, wie Sie die einzelnen Schritte anpassen können.

Wenn Sie Ihren Konnektor in einem der Schritte dieses Tutorials erneut bereitstellen müssen, um z. B. Ihre Änderungen zu testen, führen Sie einfach den obigen Befehl erneut aus.

Schritt „Voraussetzungen“

Direkt nach der Bereitstellung befindet sich der Konnektor in der Assistentenphase. Diese Phase besteht aus einigen Schritten, die den Endbenutzer durch alle notwendigen Konfigurationen führen. Der erste Schritt ist der Schritt „Voraussetzungen“. Dieser ist optional und möglicherweise nicht für jeden Konnektor erforderlich. Bei den Voraussetzungen handelt es sich in der Regel um Aktionen, die der Benutzer außerhalb der Anwendung durchführen muss, z. B. Ausführen von Abfragen über das Arbeitsblatt, Durchführen einiger Konfigurationen auf Seiten des Quellsystems usw.

Weitere Informationen zu den Voraussetzungen finden Sie unter:

Die Inhalte der einzelnen Voraussetzungen werden direkt aus der internen Tabelle (STATE.PREREQUISITES) innerhalb des Konnektors abgerufen. Sie können über das Skript setup.sql angepasst werden. Beachten Sie jedoch, dass das Skript setup.sql bei jeder Installation, bei jedem Upgrade und bei jedem Downgrade der Anwendung ausgeführt wird. Die Einfügungen müssen idempotent sein, daher empfiehlt sich die Verwendung einer Merge-Abfrage wie im folgenden Beispiel:

MERGE INTO STATE.PREREQUISITES AS dest
USING (SELECT * FROM VALUES
           ('1',
            'Sample prerequisite',
            'Prerequisites can be used to notice the end user of the connector about external configurations. Read more in the SDK documentation below. This content can be modified inside `setup.sql` script',
            'https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/prerequisites',
            NULL,
            NULL,
            1
           )
) AS src (id, title, description, documentation_url, learnmore_url, guide_url, position)
ON dest.id = src.id
WHEN NOT MATCHED THEN
    INSERT (id, title, description, documentation_url, learnmore_url, guide_url, position)
    VALUES (src.id, src.title, src.description, src.documentation_url, src.learnmore_url, src.guide_url, src.position);
Copy

Schritt „Konnektorkonfiguration“

Der nächste Schritt der Assistentenphase ist der Konnektorkonfigurationsschritt. In diesem Schritt können Sie die für den Konnektor erforderlichen Datenbankobjekte und Berechtigungen konfigurieren. In diesem Schritt können Sie die folgenden Konfigurationseigenschaften festlegen:

  • warehouse

  • destination_database

  • destination_schema

  • operational_warehouse

  • global_schedule

  • data_owner_role

  • agent_username

  • agent_role

Wenn Sie weitere kundenspezifische Eigenschaften benötigen, können Sie diese in einem der nächsten Schritte der Assistentenphase konfigurieren. Weitere Informationen zu den einzelnen Eigenschaften finden Sie unter:

Darüber hinaus zeigt die in der Vorlage enthaltene Streamlit-Komponente (streamlit/wizard/connector_config.py), wie Sie permissions-sdk triggern und beim Endbenutzer Berechtigungen anfordern können. Solange die verfügbaren Eigenschaften den Anforderungen des Konnektors genügen, müssen Sie keine der Backend-Klassen überschreiben, obwohl dies in den weiteren Schritten der Konfiguration auf die gleiche Weise wie bei den Komponenten möglich ist.

Weitere Informationen zu interne Prozeduren und Java-Objekten finden Sie unter:

Das bereitgestellte Streamlit-Beispiel ermöglicht das Anfordern von Berechtigungen auf Kontoebene wie create database und execute tasks. Außerdem wird dem Benutzer die Möglichkeit gegeben, über das permissions-sdk-Popup eine Warehouse-Referenz anzugeben.

In der Vorlage wird der Benutzer lediglich aufgefordert, Werte für destination_database und destination_schema anzugeben. Ein TODO-Kommentar in streamlit/wizard/connector_configuration.py enthält jedoch kommentierten Code, der wiederverwendet werden kann, um weitere Eingabefelder im Streamlit-UI anzuzeigen.

# TODO: Here you can add additional fields in connector configuration. Supported values are the following: warehouse, operational_warehouse, data_owner_role, agent_role, agent_username
# For example:
st.subheader("Operational warehouse")
input_col, _ = st.columns([2, 1])
with input_col:
    st.text_input("", key="operational_warehouse", label_visibility="collapsed")
st.caption("Name of the operational warehouse to be used")
Copy

Schritt „Verbindungskonfiguration“

Der nächste Schritt der Assistentenphase ist der Verbindungskonfigurationsschritt. In diesem Schritt kann der Benutzer die Parameter für die externe Konnektivität des Konnektors konfigurieren. Diese Konfiguration kann Bezeichner von Objekten wie Geheimnissen, Integrationen usw. enthalten. Da dies je nach Quellsystem für die vom Konnektor aufgenommenen Daten unterschiedlich ist, ist dies die erste Stelle, an der größere Anpassungen im Quellcode vorgenommen werden müssen.

Weitere Informationen zur Verbindungskonfiguration finden Sie unter:

Beginnend mit der Streamlit-UI-Seite (Datei streamlit/wizard/connection_config.py) müssen Sie Textfelder für alle benötigten Parameter hinzufügen. Ein Beispiel-Textfeld ist für Sie implementiert, und wenn Sie den Code in dieser Datei durchsuchen, finden Sie ein TODO mit kommentiertem Code für ein neues Feld.

# TODO: Additional configuration properties can be added to the UI like this:
st.subheader("Additional connection parameter")
input_col, _ = st.columns([2, 1])
with input_col:
    st.text_input("", key="additional_connection_property", label_visibility="collapsed")
st.caption("Some description of the additional property")
Copy

Nachdem die Eigenschaften zum Formular hinzugefügt wurden, müssen sie an die Backend-Schicht des Konnektors übergeben werden. Dazu müssen zwei zusätzliche Stellen in den Streamlit-Dateien geändert werden. Die erste ist die Funktion finish_config in der Datei streamlit/wizard/connection_config.py. Der Zustand der neu hinzugefügten Textfelder muss hier gelesen werden. Außerdem kann er bei Bedarf validiert und dann an die Funktion set_connection_configuration übergeben werden. Wenn zum Beispiel additional_connection_property hinzugefügt wurde, würde die Stelle nach den Änderungen wie folgt aussehen:

def finish_config():
try:
    # TODO: If some additional properties were specified they need to be passed to the set_connection_configuration function.
    # The properties can also be validated, for example, check whether they are not blank strings etc.
    response = set_connection_configuration(
        custom_connection_property=st.session_state["custom_connection_property"],
        additional_connection_property=st.session_state["additional_connection_property"],
    )

# rest of the method without changes
Copy

Dann muss die Funktion set_connection_configuration bearbeitet werden, die Sie in der Datei streamlit/native_sdk_api/connection_config.py finden. Diese Funktion ist ein Proxy zwischen Streamlit-UI und der zugrunde liegenden SQL-Prozedur, die ein Einstiegspunkt in das Backend des Konnektors ist.

def set_connection_configuration(custom_connection_property: str, additional_connection_property: str):
    # TODO: this part of the code sends the config to the backend so all custom properties need to be added here
    config = {
        "custom_connection_property": escape_identifier(custom_connection_property),
        "additional_connection_property": escape_identifier(additional_connection_property),
    }

    return call_procedure(
        "PUBLIC.SET_CONNECTION_CONFIGURATION",
        [variant_argument(config)]
    )
Copy

Danach wird die neue Eigenschaft in der internen Konnektortabelle gespeichert, die die Konfiguration enthält. Dies ist jedoch nicht das Ende der möglichen Anpassungen. Einige Backend-Komponenten können ebenfalls angepasst werden. Achten Sie auf die folgenden Kommentare im Code, um sie zu finden:

  • TODO: IMPLEMENT ME connection configuration validate

  • TODO: IMPLEMENT ME connection callback

  • TODO: IMPLEMENT ME test connection

Der Validierungsteil ermöglicht eine zusätzliche Validierung der Daten, die über die UI erhaltenen wurden. Zudem können die Daten umgewandelt werden, z. B. in Kleinbuchstaben, gekürzt werden oder es kann überprüft werden, ob Objekte mit den angegebenen Namen tatsächlich in Snowflake existieren.

Der Verbindungs-Callback ist ein Teil, mit dem Sie zusätzliche Operationen auf Basis der Konfiguration ausführen können, z. B. Änderungsprozeduren, bei denen Integrationen für den externen Zugriff verwendet werden müssen.

Die Testverbindung ist eine letzte Komponente der Verbindungskonfiguration. Sie prüft, ob die Verbindung zwischen dem Konnektor und dem Quellsystem hergestellt werden kann.

Weitere Informationen zu diesen internen Komponenten finden Sie unter:

Beispielimplementierungen könnten wie folgt aussehen:

public class TemplateConfigurationInputValidator implements ConnectionConfigurationInputValidator {

    private static final String ERROR_CODE = "INVALID_CONNECTION_CONFIGURATION";

    @Override
    public ConnectorResponse validate(Variant config) {
      // TODO: IMPLEMENT ME connection configuration validate: If the connection configuration input
      // requires some additional validation this is the place to implement this logic.
      // See more in docs:
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
      var integrationCheck = checkParameter(config, INTEGRATION_PARAM, false);
      if (!integrationCheck.isOk()) {
        return integrationCheck;
      }

      var secretCheck = checkParameter(config, SECRET_PARAM, true);
      if (!secretCheck.isOk()) {
        return ConnectorResponse.error(ERROR_CODE);
      }

      return ConnectorResponse.success();
    }
}
Copy
public class TemplateConnectionConfigurationCallback implements ConnectionConfigurationCallback {

    private final Session session;

    public TemplateConnectionConfigurationCallback(Session session) {
      this.session = session;
    }

    @Override
    public ConnectorResponse execute(Variant config) {
      // TODO: If you need to alter some procedures with external access you can use
      // configureProcedure method or implement a similar method on your own.
      // TODO: IMPLEMENT ME connection callback: Implement the custom logic of changes in application
      // to be done after connection configuration, like altering procedures with external access.
      // See more in docs:
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
      configureProcedure(format("PUBLIC.TEST_CONNECTION()"), config);

      return ConnectorResponse.success();
    }
}
Copy
public class TemplateConnectionValidator {

    private static final String ERROR_CODE = "TEST_CONNECTION_FAILED";

    public static Variant testConnection(Session session) {
      // TODO: IMPLEMENT ME test connection: Implement the custom logic of testing the connection to
      // the source system here. This usually requires connection to some webservice or other external
      // system. It is suggested to perform only the basic connectivity validation here.
      // If that's the case then this procedure must be altered in TemplateConnectionConfigurationCallback first.
      // See more in docs:
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
      return test().toVariant();
    }

    private static ConnectorResponse test() {
      try {
        var response = SourceSystemHttpHelper.testEndpoint();

        if (isSuccessful(response.statusCode())) {
          return ConnectorResponse.success();
        } else {
          return ConnectorResponse.error(ERROR_CODE, "Connection to source system failed");
        }
      } catch (Exception exception) {
        return ConnectorResponse.error(ERROR_CODE, "Test connection failed");
      }
    }
}
Copy

Schritt „Konfiguration abschließen“

Der Schritt zum Abschließen der Konnektorkonfiguration ist der letzte Schritt der Assistentenphase. Dieser Schritt umfasst mehrere Teilschritte. Erstens hat der Benutzer die Möglichkeit, alle zusätzlichen Konfigurationen anzugeben, die der Konnektor benötigt. Zweitens werden eine Senkendatenbank, ein Schema und bei Bedarf einige Tabellen und Ansichten für die aufgenommenen Daten erstellt. Und schließlich werden die internen Komponenten wie Scheduler und Task Reactor initialisiert.

Weitere Informationen zum Abschließen der Konfiguration finden Sie unter:

Weitere Informationen zu Task Reactor und Scheduler finden Sie unter:

Ähnlich wie bei der Verbindungskonfiguration kann die Anpassung mit der Streamlit-UI gestartet werden. streamlit/wizard/finalize_config.py enthält ein Formular mit einer Beispieleigenschaft. Weitere Eigenschaften können je nach Bedarf des Konnektor hinzugefügt werden. Um eine weitere Eigenschaft hinzuzufügen, suchen Sie nach einem TODO-Kommentar, der Beispielcode für das Hinzufügen einer neuen Eigenschaft in der genannten Datei enthält.

# TODO: Here you can add additional fields in finalize connector configuration.
# For example:
st.subheader("Some additional property")
input_col, _ = st.columns([2, 1])
with input_col:
    st.text_input("", key="some_additional_property", label_visibility="collapsed")
st.caption("Description of some new additional property")
Copy

Nachdem Sie das Textfeld für eine neue Eigenschaft hinzugefügt haben, muss es an das Backend übergeben werden. Ändern Sie dazu die Funktion finalize_configuration in derselben Datei:

def finalize_configuration():
    try:
        st.session_state["show_main_error"] = False
        # TODO: If some additional properties were introduced, they need to be passed to the finalize_connector_configuration function.
        response = finalize_connector_configuration(
            st.session_state.get("custom_property"),
            st.session_state.get("some_additional_property")
        )
Copy

Öffnen Sie danach streamlit/native_sdk_api/finalize_config.py, und fügen Sie es der folgenden Funktion hinzu:

def finalize_connector_configuration(custom_property: str, some_additional_property: str):
    # TODO: If some custom properties were configured, then they need to be specified here and passed to the FINALIZE_CONNECTOR_CONFIGURATION procedure.
    config = {
        "custom_property": custom_property,
        "some_additional_property": some_additional_property,
    }
    return call_procedure(
        "PUBLIC.FINALIZE_CONNECTOR_CONFIGURATION",
        [variant_argument(config)]
    )
Copy

Ähnlich wie bei der Verbindungskonfiguration ermöglicht auch dieser Schritt die Anpassung verschiedener Backend-Komponenten, die Sie mit den folgenden Phrasen im Code finden:

  • TODO: IMPLEMENT ME validate source

  • TODO: IMPLEMENT ME finalize internal

Im Teilschritt zur Validierung der Quelle geht es um die Durchführung anspruchsvollerer Validierungen der Quellsysteme. Wenn bei der vorherigen Testverbindung nur geprüft wurde, ob eine Verbindung hergestellt werden kann, könnte die Quellenvalidierung den Zugriff auf bestimmte Daten im System prüfen, z. B. durch Extraktion eines einzelnen Datensatzes.

Die interne Finalisierung ist eine interne Prozedur, die für die Initialisierung von Task Reactor und Scheduler und das Erstellen einer Senkendatenbank und der notwendigen verschachtelten Objekte verantwortlich ist. Sie kann auch verwendet werden, um die während des Finalisierungsschritts bereitgestellte Konfiguration zu speichern (diese Konfiguration wird standardmäßig nicht gespeichert).

Weitere Informationen zu den internen Komponenten finden Sie unter:

Zusätzlich können Eingaben über die Schnittstelle FinalizeConnectorInputValidator validiert und an den Finalisierungs-Handler übergeben werden (siehe TemplateFinalizeConnectorConfigurationCustomHandler). Weitere Informationen zur Verwendung von Buildern finden Sie unter:

Die Implementierung der Quellenvalidierung könnte beispielsweise wie folgt aussehen:

public class SourceSystemAccessValidator implements SourceValidator {

    @Override
    public ConnectorResponse validate(Variant variant) {
      // TODO: IMPLEMENT ME validate source: Implement the custom logic of validating the source
      // system. In some cases this can be the same validation that happened in
      // TemplateConnectionValidator.
      // However, it is suggested to perform more complex validations, like specific access rights to
      // some specific resources here.
      // See more in docs:
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/finalize_configuration_reference
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/finalize_configuration
      var finalizeProperties = Configuration.fromCustomConfig(variant);

      var httpResponse = SourceSystemHttpHelper.validateSource(finalizeProperties.get("custom_property"));
      return prepareConnectorResponse(httpResponse.statusCode());
    }

    private ConnectorResponse prepareConnectorResponse(int statusCode) {
      switch (statusCode) {
        case 200:
          return ConnectorResponse.success();
        case 401:
          return ConnectorResponse.error("Unauthorized error");
        case 404:
          return ConnectorResponse.error("Not found error");
        default:
          return ConnectorResponse.error("Unknown error");
      }
    }
}
Copy

Ressourcen erstellen

Nachdem die Assistentenphase abgeschlossen ist, ist der Konnektor bereit, mit der Aufnahme von Daten zu beginnen. Aber zuerst müssen die Ressourcen implementiert und konfiguriert werden. Eine Ressource ist eine Abstraktion, die eine bestimmte Menge von Daten im Quellsystem beschreibt, z. B. eine Tabelle, einen Endpunkt oder eine Datei.

Verschiedene Quellsysteme benötigen möglicherweise unterschiedliche Informationen zu einer Ressource. Aus diesem Grund muss eine Ressourcendefinition entsprechend den spezifischen Anforderungen angepasst werden. Um dies zu tun, rufen Sie die Datei streamlit/daily_use/data_sync_page.py auf. Dort finden Sie ein TODO über das Hinzufügen von Textfeldern für Ressourcenparameter. Die Ressourcenparameter sollten die Identifizierung und das Abrufen von Daten aus dem Quellsystem ermöglichen. Diese Parameter können dann während der Datenaufnahme extrahiert werden:

# TODO: specify all the properties needed to define a resource in the source system. A subset of those properties should allow for a identification of a single resource, be it a table, endpoint, repository or some other data storage abstraction
st.text_input(
    "Resource name",
    key="resource_name",
)
st.text_input(
    "Some resource parameter",
    key="some_resource_parameter"
)
Copy

Sobald alle erforderlichen Eigenschaften zum Formular hinzugefügt wurden, können sie an das Backend übergeben werden. Zunächst muss der Zustand der Textfelder extrahiert und auf API-Ebene an die Methode queue_resource in streamlit/daily_use/data_sync_page.py übergeben werden:

def queue_resource():
    # TODO: add additional properties here and pass them to create_resource function
    resource_name = st.session_state.get("resource_name")
    some_resource_parameter = st.session_state.get("some_resource_parameter)

    if not resource_name:
        st.error("Resource name cannot be empty")
        return

    result = create_resource(resource_name, some_resource_parameter)
    if result.is_ok():
        st.success("Resource created")
    else:
        st.error(result.get_message())
Copy

Dann muss die Funktion create_resource der Datei streamlit/native_sdk_api/resource_management.py aktualisiert werden:

def create_resource(resource_name, some_resource_parameter):
    ingestion_config = [{
        "id": "ingestionConfig",
        "ingestionStrategy": "INCREMENTAL",
        # TODO: HINT: scheduleType and scheduleDefinition are currently not supported out of the box, due to globalSchedule being used. However, a custom implementation of the scheduler can use those fields. They need to be provided becuase they are mandatory in the resourceDefinition.
        "scheduleType": "INTERVAL",
        "scheduleDefinition": "60m"
    }]
    # TODO: HINT: resource_id should allow identification of a table, endpoint etc. in the source system. It should be unique.
    resource_id = {
        "resource_name": resource_name,
    }
    id = f"{resource_name}_{random_suffix()}"

    # TODO: if you specified some additional resource parameters then you need to put them inside resource metadata:
    resource_metadata = {
        "some_resource_parameter": some_resource_parameter
    }

    return call_procedure("PUBLIC.CREATE_RESOURCE",
                          [
                              varchar_argument(id),
                              variant_argument(resource_id),
                              variant_list_argument(ingestion_config),
                              varchar_argument(id),
                              "true",
                              variant_argument(resource_metadata)
                          ])
Copy

Anpassen der CREATE_RESOURCE()-Prozedurlogik

Die PUBLIC.CREATE_RESOURCE()-Prozedur ermöglicht es dem Entwickler, ihre Ausführung durch die Implementierung einer eigenen Logik anzupassen, die an mehreren Stellen des Hauptausführungsablaufs eingesteckt wird. Die SDK ermöglicht es dem Entwickler:

  1. Validieren Sie die Ressource, bevor sie erstellt wird. Die Logik sollte in der PUBLIC.CREATE_RESOURCE_VALIDATE()-Prozedur implementiert werden.

  2. Führen Sie einige benutzerdefinierte Operationen durch, bevor die Ressource erstellt wird. Die Logik sollte in der PUBLIC.PRE_CREATE_RESOURCE()-Prozedur implementiert werden.

  3. Führen Sie einige benutzerdefinierte Operationen aus, nachdem die Ressource erstellt wurde. Die Logik sollte in der PUBLIC.POST_CREATE_RESOURCE()-Prozedur implementiert werden.

Weitere Informationen über die Anpassung der Prozedur unter PUBLIC.CREATE_RESOURCE() finden Sie hier:

TemplateCreateResourceHandler.java

Diese Klasse ist ein Handler für die PUBLIC.CREATE_RESOURCE()-Prozedur. Hier können Sie die Java-Implementierungen der Handler für die bereits erwähnten Callback-Prozeduren injizieren. Standardmäßig bietet die Vorlage gespottete Java-Implementierungen von Callback-Handlern, um den Aufruf von SQL-Prozeduren zu vermeiden, die die gesamte Ausführungszeit der Prozedur verlängern. Java-Implementierungen machen die Ausführung schneller. Diese nachgebildeten Implementierungen tun nichts anderes, als eine erfolgreiche Antwort zurückzugeben. Sie können entweder die benutzerdefinierte Implementierung für die von der Vorlage vorbereiteten Callback-Klassen bereitstellen oder diese Callbacks von Grund auf neu erstellen und sie in den Hauptablauf der Ausführung der Prozedur im Handler-Builder einfügen.

Um die benutzerdefinierte Logik in Callback-Methoden zu implementieren, die standardmäßig aufgerufen werden, suchen Sie nach den folgenden Phrasen im Code:

  • TODO: IMPLEMENT ME create resource validate

  • TODO: IMPLEMENT ME pre create resource callback

  • TODO: IMPLEMENT ME post create resource callback

Datenaufnahme (Ingestion)

Um die Aufnahme von Daten durchzuführen, müssen Sie eine Klasse implementieren, die die Verbindung mit dem Quellsystem herstellt und die Daten auf der Grundlage der Ressourcenkonfiguration abruft. Die Module Scheduler und Task Reactor sorgen dafür, dass Datenaufnahmeaufgaben ausgelöst und zur Warteschlange hinzugefügt werden.

Die Datenaufnahmelogik wird von der Klasse TemplateIngestion aufgerufen. Suchen Sie im Code nach TODO: IMPLEMENT ME ingestion, und ersetzen Sie die zufällige Datengenerierung durch den Datenabruf aus dem Quellsystem. Wenn Sie der Ressourcendefinition einige kundenspezifische Eigenschaften hinzugefügt haben, können diese aus den internen Konnektortabellen mithilfe von ResourceIngestionDefinitionRepository und den in TemplateWorkItem verfügbaren Eigenschaften abgerufen werden:

  • resourceIngestionDefinitionId

  • ingestionConfigurationId

Das Abrufen von Daten von einem Webservice könnte zum Beispiel wie folgt aussehen:

public final class SourceSystemHttpHelper {

  private static final String DATA_URL = "https://source_system.com/data/%s";
  private static final SourceSystemHttpClient sourceSystemClient = new SourceSystemHttpClient();
  private static final ObjectMapper objectMapper = new ObjectMapper();

  private static List<Variant> fetchData(String resourceId) {
    var response = sourceSystemClient.get(String.format(url, resourceId));
    var body = response.body();

    try {
        return Arrays.stream(objectMapper.readValue(body, Map[].class))
              .map(Variant::new)
              .collect(Collectors.toList());
    } catch (JsonProcessingException e) {
      throw new RuntimeException("Cannot parse json", e);
    }
  }
}
Copy
public class SourceSystemHttpClient {

  private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(15);

  private final HttpClient client;
  private final String secret;

  public SourceSystemHttpClient() {
    this.client = HttpClient.newHttpClient();
    this.secret =
        SnowflakeSecrets.newInstance()
            .getGenericSecretString(ConnectionConfiguration.TOKEN_NAME);
  }

  public HttpResponse<String> get(String url) {
    var request =
        HttpRequest.newBuilder()
            .uri(URI.create(url))
            .GET()
            .header("Authorization", format("Bearer %s", secret))
            .header("Content-Type", "application/json")
            .timeout(REQUEST_TIMEOUT)
            .build();

    try {
      return client.send(request, HttpResponse.BodyHandlers.ofString());
    } catch (IOException | InterruptedException ex) {
      throw new RuntimeException(format("HttpRequest failed: %s", ex.getMessage()), ex);
    }
  }
}
Copy

Lebenszyklus von Ressourcen verwalten

Sobald die Logik zur Erstellung von Ressourcen und deren Ingestion implementiert ist, können Sie deren Lebenszyklus mit den folgenden Prozeduren verwalten:

  1. PUBLIC.ENABLE_RESOURCE() - diese Prozedur aktiviert eine bestimmte Ressource, d. h. sie wird für die Ingestion geplant

  2. PUBLIC.DISABLE_RESOURCE() - diese Prozedur deaktiviert eine bestimmte Ressource, d. h. die Zeitpläne für die Aufnahme werden gestoppt

  3. PUBLIC.UPDATE_RESOURCE() - mit dieser Prozedur können Sie die Ingestion-Konfigurationen einer bestimmten Ressource aktualisieren. Diese Funktion ist nicht standardmäßig in Streamlit UI implementiert, da es manchmal unerwünscht sein kann, dass der Benutzer des Konnektors die Konfiguration des Ingestion anpassen kann (widerrufen Sie die Berechtigungen für diese Prozedur für die ACCOUNTADMIN-Anwendungsrolle, um ihre Verwendung vollständig zu unterbinden).

Alle diese Prozeduren verfügen über Java-Handler und werden durch Rückrufe erweitert, mit denen Sie ihre Ausführung anpassen können. Sie können benutzerdefinierte Implementierungen von Callbacks mit Hilfe des Builders dieser Handler einfügen. Standardmäßig bietet die Vorlage gespottete Java-Implementierungen von Callback-Handlern, um den Aufruf von SQL-Prozeduren zu vermeiden, die die gesamte Ausführungszeit der genannten Prozeduren verlängern. Diese nachgebildeten Implementierungen tun nichts anderes, als eine erfolgreiche Antwort zurückzugeben. Sie können entweder die benutzerdefinierte Implementierung für die von der Vorlage vorbereiteten Callback-Klassen bereitstellen oder diese Callbacks von Grund auf neu erstellen und sie in den Hauptablauf der Ausführung der Prozedur in den Handler-Builders einfügen.

TemplateEnableResourceHandler.java

Bei dieser Klasse handelt es sich um einen Handler für die PUBLIC.ENABLE_RESOURCE()-Prozedur, der um die entsprechenden Callbacks erweitert werden kann:

  1. Validieren Sie die Ressource, bevor sie aktiviert wird. Suchen Sie im Code nach TODO: IMPLEMENT ME enable resource validate, um die benutzerdefinierte Implementierung bereitzustellen.

  2. Führen Sie einige benutzerdefinierte Operationen durch, bevor die Ressource aktiviert wird. Suchen Sie im Code nach TODO: IMPLEMENT ME pre enable resource, um die benutzerdefinierte Implementierung bereitzustellen.

  3. Führen Sie einige benutzerdefinierte Operationen durch, nachdem die Ressource aktiviert wurde. Suchen Sie im Code nach TODO: IMPLEMENT ME post enable resource, um die benutzerdefinierte Implementierung bereitzustellen.

Erfahren Sie mehr in den ausführlichen Dokumentationen der PUBLIC.ENABLE_RESOURCE()-Prozedur:

TemplateDisableResourceHandler.java

Bei dieser Klasse handelt es sich um einen Handler für die PUBLIC.DISABLE_RESOURCE()-Prozedur, der um die entsprechenden Callbacks erweitert werden kann:

  1. Validieren Sie die Ressource, bevor sie deaktiviert wird. Suchen Sie im Code nach TODO: IMPLEMENT ME disable resource validate, um die benutzerdefinierte Implementierung bereitzustellen.

  2. Führen Sie einige benutzerdefinierte Operationen durch, bevor die Ressource deaktiviert wird. Suchen Sie im Code nach TODO: IMPLEMENT ME pre disable resource, um die benutzerdefinierte Implementierung bereitzustellen.

Erfahren Sie mehr in den ausführlichen Dokumentationen der PUBLIC.DISABLE_RESOURCE()-Prozedur:

TemplateUpdateResourceHandler.java

Bei dieser Klasse handelt es sich um einen Handler für die PUBLIC.UPDATE_RESOURCE()-Prozedur, der um die entsprechenden Callbacks erweitert werden kann:

  1. Validieren Sie die Ressource, bevor sie aktualisiert wird. Suchen Sie im Code nach TODO: IMPLEMENT ME update resource validate, um die benutzerdefinierte Implementierung bereitzustellen.

  2. Führen Sie einige benutzerdefinierte Operationen durch, bevor die Ressource aktualisiert wird. Suchen Sie im Code nach TODO: IMPLEMENT ME pre update resource, um die benutzerdefinierte Implementierung bereitzustellen.

  3. Führen Sie einige benutzerdefinierte Operationen aus, nachdem die Ressource aktualisiert wurde. Suchen Sie im Code nach TODO: IMPLEMENT ME post update resource, um die benutzerdefinierte Implementierung bereitzustellen.

Erfahren Sie mehr in den ausführlichen Dokumentationen der PUBLIC.UPDATE_RESOURCE()-Prozedur:

Einstellungen

Die Vorlage enthält eine Registerkarte für Einstellungen, auf der Sie alle zuvor vorgenommenen Konfigurationen einsehen können. Wenn jedoch Konfigurationseigenschaften angepasst wurden, dann muss auch diese Ansicht angepasst werden. Den Code für die Registerkarte mit den Einstellungen finden Sie in der Datei streamlit/daily_use/settings_page.py. Zum Anpassen extrahieren Sie einfach die Werte aus der Konfiguration für die Schlüssel, die in den jeweiligen Konfigurationen hinzugefügt wurden.

Wenn zuvor im Schritt „Verbindungskonfiguration“ additional_connection_property hinzugefügt wurde, dann könnte die Anpassung wie folgt aussehen:

def connection_config_page():
    current_config = get_connection_configuration()

    # TODO: implement the display for all the custom properties defined in the connection configuration step
    custom_property = current_config.get("custom_connection_property", "")
    additional_connection_property = current_config.get("additional_connection_property", "")


    st.header("Connector configuration")
    st.caption("Here you can see the connector connection configuration saved during the connection configuration step "
               "of the Wizard. If some new property was introduced it has to be added here to display.")
    st.divider()

    st.text_input(
        "Custom connection property:",
        value=custom_property,
        disabled=True
    )
    st.text_input(
        "Additional connection property:",
        value=additional_connection_property,
        disabled=True
    )
    st.divider()
Copy