Tutorial: Snowflake Native SDK for Connectors Java-Konnektor-Vorlage

Einführung

Willkommen bei unserem Tutorial zur Verwendung einer Konnektorvorlage mit Snowflake Native SDK for Connectors. Diese Anleitung hilft Ihnen bei der Einrichtung einer einfachen nativen Konnektoranwendung.

In diesem Tutorial lernen Sie Folgendes:

  • Bereitstellen einer nativen Konnektoranwendung

  • Konfigurieren einer Konnektorvorlage für die Datenaufnahme (Ingestion)

  • Anpassen einer Konnektorvorlage an Ihre Anforderungen

Die Vorlage enthält verschiedene hilfreiche Kommentare im Code, die es Ihnen erleichtern, bestimmte Dateien zu finden, die geändert werden müssen. Achten Sie auf Kommentare mit den folgenden Stichwörtern, diese werden Sie leiten und Ihnen helfen, Ihren eigenen Konnektor zu implementieren:

  • TODO

  • TODO: HINT

  • TODO: IMPLEMENT ME

Bevor Sie mit diesem Tutorial beginnen, sollten Sie sich mit den folgenden empfohlenen Inhalten vertraut machen:

Voraussetzungen

Bevor Sie beginnen, müssen Sie sicherstellen, dass Sie folgende Voraussetzungen erfüllen:

Bereiten Sie Ihre lokale Umgebung vor

Bevor Sie fortfahren, müssen Sie sicherstellen, dass alle erforderlichen Programme auf Ihrem Computer installiert sind, und die Konnektorvorlage klonen.

Java-Installation

Snowflake Native SDK for Connectors erfordert Java LTS (Long-Term Support) Version 11 oder höher. Wenn die minimal erforderliche Version von Java nicht auf Ihrem Rechner installiert ist, müssen Sie entweder Oracle Java oder OpenJDK installieren.

Oracle Java

Das neueste LTS-Release der JDK kann kostenlos unter der Oracle NFTC heruntergeladen und verwendet werden. Anleitungen zum Download und zur Installation finden Sie auf der Oracle-Seite.

OpenJDK

OpenJDK ist eine Open-Source-Implementierung von Java. Anleitungen zum Download und zur Installation finden Sie unter openjdk.org und jdk.java.net.

Sie können auch eine OpenJDK-Version eines Drittanbieters verwenden, wie Eclipse Temurin oder Amazon Corretto.

Snowflake-CLI-Konfiguration

Das Tool Snowflake CLI ist erforderlich, um den Konnektor zu erstellen, bereitzustellen und zu installieren. Wenn Sie Snowflake CLI nicht auf Ihrem Computer haben, installieren Sie es gemäß den Anweisungen, die Sie hier https://docs.snowflake.com/en/developer-guide/snowflake-cli/installation/installation finden.

Nachdem das Tool installiert ist, müssen Sie in Ihrer Konfigurationsdatei eine Verbindung zu Snowflake konfigurieren.

Wenn Sie noch keine Verbindung konfiguriert haben, erstellen Sie eine neue mit dem Namen native_sdk_connection. Eine Beispielverbindung finden Sie in der Datei deployment/snowflake.toml.

Wenn Sie bereits eine Verbindung konfiguriert haben und diese mit dem Konnektor verwenden möchten, verwenden Sie den Namen der Verbindung anstelle von native_sdk_connection, wenn diese in diesem Tutorial verwendet wird.

Klonen von Vorlagen

Um die Konnektorvorlage zu klonen, verwenden Sie den folgenden Befehl:

snow init <project_dir> \
  --template-source https://github.com/snowflakedb/connectors-native-sdk \
  --template templates/connectors-native-sdk-template

Geben Sie anstelle von <project_dir> den Namen des Verzeichnisses ein (es darf nicht bereits existieren), in dem das Java-Projekt Ihres Konnektors erstellt werden soll.

Nach dem Ausführen des Befehls werden Sie aufgefordert, zusätzliche Informationen für die Konfiguration der Anwendungsinstanz und des Stagingbereichs anzugeben. Sie können beliebige Namen angeben, solange es sich um gültige, nicht in Anführungszeichen gesetzte Snowflake-Bezeichner handelt, oder auf „Eingabe“ klicken, um die Standardwerte zu verwenden, die in eckigen Klammern angezeigt werden.

Ein Beispiel für die Ausführung eines Befehls, der benutzerdefinierte Anwendungs- und Stagingbereiche bereitstellt:

$ snow init my_connector \
    --template-source https://github.com/snowflakedb/connectors-native-sdk \
    --template templates/connectors-native-sdk-template

Name of the application instance which will be created in Snowflake [connectors-native-sdk-template]: MY_CONNECTOR
Name of the schema in which the connector files stage will be created [TEST_SCHEMA]:
Name of the stage used to store connector files in the application package [TEST_STAGE]: CUSTOM_STAGE_NAME
Initialized the new project in my_connector

Erstellen, Bereitstellen und Bereinigen des Konnektors

Die Vorlage kann sofort eingesetzt werden, noch vor jeglicher Änderung. In den folgenden Abschnitten erfahren Sie, wie Sie den Konnektor erstellen, bereitstellen und installieren.

Erstellen des Konnektors

Das Erstellen eines Konnektors, der mit Snowflake Native SDK for Connectors erstellt wurde, unterscheidet sich ein wenig vom Erstellen einer typischen Java-Anwendung. Neben der Erstellung der .jar-Archive aus den Quellen gibt es noch einige andere Dinge, die erledigt werden müssen. Das Erstellen der Anwendung besteht aus den folgenden Schritten:

  1. Kopieren benutzerdefinierter interner Komponenten in das Build-Verzeichnis

  2. Kopieren der SDK-Komponenten in das Build-Verzeichnis

Kopieren interner Komponenten

In diesem Schritt wird die .jar-Datei für den Konnektor erstellt und dann (zusammen mit den UI-, Manifest- und Setup-Dateien) in das Verzeichnis sf_build kopiert.

Um diesen Schritt auszuführen, führen Sie den Befehl: ./gradlew copyInternalComponents aus.

Kopieren der SDK-Komponenten

Dieser Schritt kopiert die SDK. jar-Datei (die als Abhängigkeit zum Gradle-Modul des Konnektors hinzugefügt wurde) in das sf_build-Verzeichnis und extrahiert gebündelte .sql-Dateien aus dem .jar-Archiv.

Diese .sql-Dateien ermöglichen die Anpassung der bereitgestellten Objekte, die bei der Installation der Anwendung erstellt werden. Für Erstbenutzer wird eine Anpassung nicht empfohlen, da das Weglassen von Objekten bei falscher Ausführung zu Fehlern in bestimmten Funktionen führen kann. Die Anwendung „Konnektorvorlage“ verwendet die Datei all.sql, die alle empfohlenen SDK-Objekte erstellt.

Um diesen Schritt auszuführen, führen Sie den Befehl: ./gradlew copySdkComponents aus.

Konnektor bereitstellen

Zur Bereitstellung einer nativen App muss ein Anwendungspaket in Snowflake erstellt werden. Danach müssen alle Dateien aus dem sf_build-Verzeichnis auf Snowflake hochgeladen werden.

Beachten Sie, dass für Entwicklungszwecke die Erstellung von Versionen optional ist. Eine Anwendungsinstanz kann direkt aus Stagingdateien erstellt werden. Auf diese Weise können Sie Änderungen in den meisten Konnektorateien sehen, ohne die Version und die Anwendungsinstanz neu erstellen zu müssen.

Die folgenden Operationen werden durchgeführt:

  1. Erstellen eines neuen Anwendungspakets, wenn es noch nicht existiert

  2. Erstellen eines Stagingbereichs für Schema und Datei innerhalb des Pakets

  3. Hochladen von Dateien aus dem sf_build-Verzeichnis in den Stagingbereich (dieser Schritt kann einige Zeit in Anspruch nehmen)

Um den Konnektor bereitzustellen, führen Sie den Befehl: snow app deploy --connection=native_sdk_connection aus.

Weitere Informationen zum Befehl snow app deploy finden Sie unter snow app deploy.

Das erstellte Anwendungspaket wird nun auf der Registerkarte App packages in der Kategorie Data products in der Snowflake-UI Ihres Kontos angezeigt.

Konnektor installieren

Die Installation der Anwendung ist der letzte Schritt des Prozesses. Er erstellt eine Anwendung aus dem zuvor erstellten Anwendungspaket.

Um den Konnektor zu installieren, führen Sie den Befehl: snow app run --connection=native_sdk_connection aus.

Weitere Informationen zum Befehl snow app run finden Sie unter snow app run.

Die installierte Anwendung wird nun auf der Registerkarte Installed apps in der Kategorie Data products in der Snowflake-UI Ihres Kontos angezeigt.

Aktualisieren der Konnektordateien

Wenn Sie zu irgendeinem Zeitpunkt eine der Konnektordateien ändern möchten, können Sie die geänderten Dateien einfach in den Stagingbereich des Anwendungspakets hochladen. Der Befehl zum Hochladen hängt davon ab, welche Dateien aktualisiert wurden.

Bevor einer der Update-Befehle ausgeführt wird, müssen Sie die neuen Dateien Ihres Konnektors in das Verzeichnis sf_build kopieren, und zwar mit folgende Befehl: ./gradlew copyInternalComponents

UI. py-Dateien oder connector .java-Dateien

Verwenden Sie den Befehl snow app deploy --connection=native_sdk_connection. Die aktuelle Anwendungsinstanz wird die neuen Dateien ohne Neuinstallation verwenden.

setup.sql- oder manifest.yml-Dateien

Verwenden Sie den Befehl snow app run --connection=native_sdk_connection; die aktuelle Anwendungsinstanz wird neu installiert, nachdem die neuen Dateien in den Stagingbereich hochgeladen wurden.

Bereinigen

Nach Abschluss des Tutorials oder wenn Sie aus irgendeinem Grund die Anwendung und ihr Paket entfernen möchten, können Sie sie mit folgendem Befehl vollständig aus Ihrem Konto entfernen:

snow app teardown --connection=native_sdk_connection --cascade --force

Die Option --cascade wird benötigt, um die Zieldatenbank zu entfernen, ohne die Eigentümerschaft an den Administrator des Kontos zu übertragen. Bei echten Konnektoren sollte die Datenbank nicht entfernt werden, um die aufgenommenen Daten zu erhalten. Sie sollte entweder dem Kontoadministrator gehören oder die Eigentümerschaft sollte vor der Deinstallation übertragen werden.

Bitte beachten Sie, dass der Konnektor so lange Credits verbraucht, bis er pausiert oder entfernt wird, auch wenn keine Aufnahme konfiguriert wurde!

Schritt „Voraussetzungen“

Direkt nach der Installation befindet sich der Konnektor in der Assistentenphase. Diese Phase besteht aus einigen Schritten, die den Endbenutzer durch alle notwendigen Konfigurationen führen.

Der erste Schritt ist der Schritt „Voraussetzungen“. Dieser ist optional und möglicherweise nicht für jeden Konnektor erforderlich. Bei den Voraussetzungen handelt es sich in der Regel um Aktionen, die der Benutzer außerhalb der Anwendung durchführen muss, z. B. Ausführen von Abfragen im SQL-Arbeitsblatt, Durchführen von Konfigurationen auf der Seite des Quellsystems usw.

Weitere Informationen zu den Voraussetzungen finden Sie unter: Voraussetzungen

Der Inhalt jeder Voraussetzung wird direkt aus der Tabelle STATE.PREREQUISITES abgerufen, die sich innerhalb des Konnektors befindet. Sie können über das Skript setup.sql angepasst werden. Beachten Sie jedoch, dass das Skript setup.sql bei jeder Installation, bei jedem Upgrade und bei jedem Downgrade der Anwendung ausgeführt wird. Die Einfügungen müssen idempotent sein, daher empfiehlt sich die Verwendung einer Merge-Abfrage wie im folgenden Beispiel:

MERGE INTO STATE.PREREQUISITES AS dest
USING (SELECT * FROM VALUES
           ('1',
            'Sample prerequisite',
            'Prerequisites can be used to notice the end user of the connector about external configurations. Read more in the SDK documentation below. This content can be modified inside `setup.sql` script',
            'https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/prerequisites',
            NULL,
            NULL,
            1
           )
) AS src (id, title, description, documentation_url, learnmore_url, guide_url, position)
ON dest.id = src.id
WHEN NOT MATCHED THEN
    INSERT (id, title, description, documentation_url, learnmore_url, guide_url, position)
    VALUES (src.id, src.title, src.description, src.documentation_url, src.learnmore_url, src.guide_url, src.position);
Copy

Schritt „Konnektorkonfiguration“

Der nächste Schritt der Assistentenphase ist der Konnektorkonfigurationsschritt. In diesem Schritt können Sie die für den Konnektor erforderlichen Datenbankobjekte und Berechtigungen konfigurieren. In diesem Schritt können Sie die folgenden Konfigurationseigenschaften festlegen:

  • warehouse

  • operational_warehouse

  • cortex_warehouse

  • destination_database

  • destination_schema

  • global_schedule

  • data_owner_role

  • cortex_user_role

  • agent_username

  • agent_role

Wenn Sie weitere kundenspezifische Eigenschaften benötigen, können Sie diese in einem der nächsten Schritte der Assistentenphase konfigurieren. Weitere Informationen zu den einzelnen Eigenschaften finden Sie unter: Konnektorkonfiguration

Darüber hinaus zeigt die in der Vorlage enthaltene Streamlit-Komponente (streamlit/wizard/connector_config.py), wie Sie Native Apps Permission SDK triggern und Berechtigungen beim Endbenutzer anfordern können. Solange die verfügbaren Eigenschaften den Anforderungen des Konnektors genügen, müssen Sie keine der Backend-Klassen überschreiben, obwohl dies in den weiteren Schritten der Konfiguration auf die gleiche Weise wie bei den Komponenten möglich ist.

Weitere Informationen zu interne Prozeduren und Java-Objekten finden Sie unter: Referenz zur Konnektorkonfiguration

Das bereitgestellte Streamlit-Beispiel ermöglicht das Anfordern von Berechtigungen auf Kontoebene, die in der manifest.yml-Datei – CREATE DATABASE und EXECUTE TASKS konfiguriert sind. Außerdem wird dem Benutzer die Möglichkeit gegeben, über das Berechtigungs-SDK-Popup eine Warehouse-Referenz anzugeben.

In der Vorlage wird der Benutzer lediglich aufgefordert, Werte für destination_database und destination_schema anzugeben. Ein TODO-Kommentar in streamlit/wizard/connector_configuration.py enthält jedoch kommentierten Code, der wiederverwendet werden kann, um weitere Eingabefelder im Streamlit-UI anzuzeigen.

# TODO: Here you can add additional fields in connector configuration.
# For example:
st.subheader("Operational warehouse")
input_col, _ = st.columns([2, 1])
with input_col:
    st.text_input("", key="operational_warehouse", label_visibility="collapsed")
st.caption("Name of the operational warehouse to be used")
Copy

Schritt „Verbindungskonfiguration“

Der nächste Schritt der Assistentenphase ist der Verbindungskonfigurationsschritt. In diesem Schritt kann der Benutzer die Parameter für die externe Konnektivität des Konnektors konfigurieren. Diese Konfiguration kann Bezeichner von Objekten wie Geheimnissen, Integrationen usw. enthalten.

Da diese Informationen je nach Quellsystem für die vom Konnektor aufgenommenen Daten unterschiedlich ist, ist dies die erste Stelle, an der größere Anpassungen im Quellcode vorgenommen werden müssen.

Weitere Informationen zur Verbindungskonfiguration finden Sie unter:

Beginnend mit der Streamlit-UI-Seite (streamlit/wizard/connection_config.py) müssen Sie Texteingaben für alle benötigten Parameter hinzufügen. Eine Beispiel-Texteingabe ist für Sie implementiert, und wenn Sie den Code in dieser Datei durchsuchen, finden Sie eine TODO mit kommentiertem Code für ein neues Feld.

# TODO: Additional configuration properties can be added to the UI like this:
st.subheader("Additional connection parameter")
input_col, _ = st.columns([2, 1])
with input_col:
    st.text_input("", key="additional_connection_property", label_visibility="collapsed")
st.caption("Some description of the additional property")
Copy

Nachdem die Eigenschaften zum Formular hinzugefügt wurden, müssen sie an die Backend-Schicht des Konnektors übergeben werden. Dazu müssen zwei zusätzliche Stellen in den Streamlit-Dateien geändert werden. Die erste ist die Funktion finish_config in der Datei streamlit/wizard/connection_config.py. Der Status der neu hinzugefügten Texteingaben muss hier gelesen werden. Außerdem kann er bei Bedarf validiert und dann an die Funktion set_connection_configuration übergeben werden.

Wenn zum Beispiel additional_connection_property hinzugefügt wurde, würde die Stelle nach den Änderungen wie folgt aussehen:

def finish_config():
try:
    # TODO: If some additional properties were specified they need to be passed to the set_connection_configuration function.
    # The properties can also be validated, for example, check whether they are not blank strings etc.
    response = set_connection_configuration(
        custom_connection_property=st.session_state["custom_connection_property"],
        additional_connection_property=st.session_state["additional_connection_property"],
    )

# rest of the method without changes
Copy

Dann muss die Funktion set_connection_configuration bearbeitet werden, die Sie in der Datei streamlit/native_sdk_api/connection_config.py finden. Diese Funktion ist ein Proxy zwischen Streamlit-UI und der zugrunde liegenden SQL-Prozedur, die ein Einstiegspunkt in das Backend des Konnektors ist.

def set_connection_configuration(custom_connection_property: str, additional_connection_property: str):
    # TODO: this part of the code sends the config to the backend so all custom properties need to be added here
    config = {
        "custom_connection_property": escape_identifier(custom_connection_property),
        "additional_connection_property": escape_identifier(additional_connection_property),
    }

    return call_procedure(
        "PUBLIC.SET_CONNECTION_CONFIGURATION",
        [variant_argument(config)]
    )
Copy

Danach wird die neue Eigenschaft in der internen Konnektortabelle gespeichert, die die Konfiguration enthält. Dies ist jedoch nicht das Ende der möglichen Anpassungen. Einige Backend-Komponenten können ebenfalls angepasst werden. Achten Sie auf die folgenden Kommentare im Code, um sie zu finden:

  • TODO: IMPLEMENT ME connection configuration validate

  • TODO: IMPLEMENT ME connection callback

  • TODO: IMPLEMENT ME test connection

Der Validierungsteil ermöglicht eine zusätzliche Validierung der Daten, die über die UI erhaltenen wurden. Er kann die Daten auch umwandeln, z. B. die Groß- und Kleinschreibung ändern, die bereitgestellten Daten kürzen, oder prüfen, ob Objekte mit den angegebenen Namen tatsächlich in Snowflake existieren.

Der Verbindungs-Callback ist ein Teil, mit dem Sie zusätzliche Operationen auf Basis der Konfiguration ausführen können, z. B. Änderungsprozeduren, bei denen Integrationen für den externen Zugriff verwendet werden müssen, unter Verwendung einer Lösung, die unter Referenz für die Einrichtung der externen Integration beschrieben wird.

Die Testverbindung ist eine letzte Komponente der Verbindungskonfiguration. Sie prüft, ob die Verbindung zwischen dem Konnektor und dem Quellsystem hergestellt werden kann.

Weitere Informationen zu diesen internen Komponenten finden Sie unter:

Beispielimplementierungen könnten wie folgt aussehen:

public class TemplateConfigurationInputValidator implements ConnectionConfigurationInputValidator {

    private static final String ERROR_CODE = "INVALID_CONNECTION_CONFIGURATION";

    @Override
    public ConnectorResponse validate(Variant config) {
      // TODO: IMPLEMENT ME connection configuration validate: If the connection configuration input
      // requires some additional validation this is the place to implement this logic.
      // See more in docs:
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
      var integrationCheck = checkParameter(config, INTEGRATION_PARAM, false);
      if (!integrationCheck.isOk()) {
        return integrationCheck;
      }

      var secretCheck = checkParameter(config, SECRET_PARAM, true);
      if (!secretCheck.isOk()) {
        return ConnectorResponse.error(ERROR_CODE);
      }

      return ConnectorResponse.success();
    }
}
Copy
public class TemplateConnectionConfigurationCallback implements ConnectionConfigurationCallback {

    private static final String[] EXTERNAL_SOURCE_PROCEDURE_SIGNATURES = {
        asVarchar(format("%s.%s()", PUBLIC_SCHEMA, TEST_CONNECTION_PROCEDURE)),
        asVarchar(format("%s.%s(VARIANT)", PUBLIC_SCHEMA, FINALIZE_CONNECTOR_CONFIGURATION_PROCEDURE)),
        asVarchar(format("%s.%s(NUMBER, STRING)", PUBLIC_SCHEMA, WORKER_PROCEDURE))
      };

    private final Session session;

    public TemplateConnectionConfigurationCallback(Session session) {
      this.session = session;
    }

    @Override
    public ConnectorResponse execute(Variant config) {
      // TODO: If you need to alter some procedures with external access you can use
      // configureProcedure method or implement a similar method on your own.
      // TODO: IMPLEMENT ME connection callback: Implement the custom logic of changes in application
      // to be done after connection configuration, like altering procedures with external access.
      // See more in docs:
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
      var response = configureProceduresWithReferences();
      if (response.isNotOk()) {
         return response;
      }
      return ConnectorResponse.success();
    }

    private ConnectorResponse configureProceduresWithReferences() {
      return callProcedure(
        session,
        PUBLIC_SCHEMA,
        SETUP_EXTERNAL_INTEGRATION_WITH_NAMES_PROCEDURE,
        EXTERNAL_SOURCE_PROCEDURE_SIGNATURES);
    }
}
Copy
public class TemplateConnectionValidator {

    private static final String ERROR_CODE = "TEST_CONNECTION_FAILED";

    public static Variant testConnection(Session session) {
      // TODO: IMPLEMENT ME test connection: Implement the custom logic of testing the connection to
      // the source system here. This usually requires connection to some webservice or other external
      // system. It is suggested to perform only the basic connectivity validation here.
      // If that's the case then this procedure must be altered in TemplateConnectionConfigurationCallback first.
      // See more in docs:
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
      return test().toVariant();
    }

    private static ConnectorResponse test() {
      try {
        var response = SourceSystemHttpHelper.testEndpoint();

        if (isSuccessful(response.statusCode())) {
          return ConnectorResponse.success();
        } else {
          return ConnectorResponse.error(ERROR_CODE, "Connection to source system failed");
        }
      } catch (Exception exception) {
        return ConnectorResponse.error(ERROR_CODE, "Test connection failed");
      }
    }
}
Copy

Schritt „Konfiguration abschließen“

Der letzte Schritt der Assistentenphase ist die Fertigstellung der Verbindungskonfiguration. Dieser Schritt umfasst mehrere Teilschritte:

  1. Ermöglicht dem Benutzer die Angabe zusätzlicher Konfigurationen, die der Konnektor benötigt

  2. Erstellt die Senkendatenbank, ein Schema und bei Bedarf zusätzliche Tabellen und Ansichten für die aufgenommenen Daten

  3. Initialisiert interne Komponenten wie den Scheduler und den Task Reactor

Weitere Informationen zum Abschluss der Konfiguration finden Sie unter:

Weitere Informationen über den Task Reactor und die Zeitplanung finden Sie unter:

Ähnlich wie bei der Verbindungskonfiguration kann die Anpassung mit dem Streamlit-UI gestartet werden. Die Datei streamlit/wizard/finalize_config.py enthält ein Formular mit einer Beispieleigenschaft. Weitere Eigenschaften können je nach Bedarf des Konnektor hinzugefügt werden. Um eine weitere Eigenschaft hinzuzufügen, suchen Sie nach einem TODO-Kommentar, der Beispielcode für das Hinzufügen einer neuen Eigenschaft in der genannten Datei enthält.

# TODO: Here you can add additional fields in finalize connector configuration.
# For example:
st.subheader("Some additional property")
input_col, _ = st.columns([2, 1])
with input_col:
    st.text_input("", key="some_additional_property", label_visibility="collapsed")
st.caption("Description of some new additional property")
Copy

Nachdem Sie die Texteingabe für eine neue Eigenschaft hinzugefügt haben, muss diese an die Backend-Seite übergeben werden. Ändern Sie dazu die Funktion finalize_configuration in derselben Datei:

def finalize_configuration():
    try:
        st.session_state["show_main_error"] = False
        # TODO: If some additional properties were introduced, they need to be passed to the finalize_connector_configuration function.
        response = finalize_connector_configuration(
            st.session_state.get("custom_property"),
            st.session_state.get("some_additional_property")
        )
Copy

Öffnen Sie danach die Datei streamlit/native_sdk_api/finalize_config.py und fügen Sie die neue Eigenschaft zur folgenden Funktion hinzu:

def finalize_connector_configuration(custom_property: str, some_additional_property: str):
    # TODO: If some custom properties were configured, then they need to be specified here and passed to the FINALIZE_CONNECTOR_CONFIGURATION procedure.
    config = {
        "custom_property": custom_property,
        "some_additional_property": some_additional_property,
    }
    return call_procedure(
        "PUBLIC.FINALIZE_CONNECTOR_CONFIGURATION",
        [variant_argument(config)]
    )
Copy

Ähnlich wie bei der Verbindungskonfiguration ermöglicht auch dieser Schritt die Anpassung verschiedener Backend-Komponenten, die Sie über die folgenden Kommentare im Quellcode finden:

  • TODO: IMPLEMENT ME validate source

  • TODO: IMPLEMENT ME finalize internal

Im Teilschritt zur Validierung der Quelle geht es um die Durchführung anspruchsvollerer Validierungen der Quellsysteme. Wenn bei der vorherigen Testverbindung nur geprüft wurde, ob eine Verbindung hergestellt werden kann, könnte die Quellenvalidierung den Zugriff auf bestimmte Daten im System prüfen, z. B. durch Extraktion eines einzelnen Datensatzes.

Die interne Finalisierung ist eine interne Prozedur, die für die Initialisierung von Task Reactor und Scheduler und das Erstellen einer Senkendatenbank und der notwendigen verschachtelten Objekte verantwortlich ist. Sie kann auch verwendet werden, um die während des Finalisierungsschritts bereitgestellte Konfiguration zu speichern (diese Konfiguration wird standardmäßig nicht gespeichert).

Weitere Informationen zu den internen Komponenten finden Sie unter:

Zusätzlich können Eingaben über die Schnittstelle FinalizeConnectorInputValidator validiert und an den Finalisierungs-Handler übergeben werden (siehe TemplateFinalizeConnectorConfigurationCustomHandler). Weitere Informationen zur Verwendung von Buildern finden Sie unter: Anpassung von gespeicherten Prozeduren und Handlern.

Die Implementierung der Quellenvalidierung könnte beispielsweise wie folgt aussehen:

public class SourceSystemAccessValidator implements SourceValidator {

    @Override
    public ConnectorResponse validate(Variant variant) {
      // TODO: IMPLEMENT ME validate source: Implement the custom logic of validating the source
      // system. In some cases this can be the same validation that happened in
      // TemplateConnectionValidator.
      // However, it is suggested to perform more complex validations, like specific access rights to
      // some specific resources here.
      // See more in docs:
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/finalize_configuration_reference
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/finalize_configuration
      var finalizeProperties = Configuration.fromCustomConfig(variant);

      var httpResponse = SourceSystemHttpHelper.validateSource(finalizeProperties.get("custom_property"));
      return prepareConnectorResponse(httpResponse.statusCode());
    }

    private ConnectorResponse prepareConnectorResponse(int statusCode) {
      switch (statusCode) {
        case 200:
          return ConnectorResponse.success();
        case 401:
          return ConnectorResponse.error("Unauthorized error");
        case 404:
          return ConnectorResponse.error("Not found error");
        default:
          return ConnectorResponse.error("Unknown error");
      }
    }
}
Copy

Ressourcen erstellen

Nachdem die Assistentenphase abgeschlossen ist, ist der Konnektor bereit, mit der Aufnahme von Daten zu beginnen. Aber zuerst müssen die Ressourcen implementiert und konfiguriert werden. Eine Ressource ist eine Abstraktion, die eine bestimmte Menge von Daten im Quellsystem beschreibt, z. B. eine Tabelle, einen Endpunkt oder eine Datei.

Verschiedene Quellsysteme benötigen möglicherweise unterschiedliche Informationen zu einer Ressource. Aus diesem Grund muss eine Ressourcendefinition entsprechend den spezifischen Anforderungen angepasst werden. Um dies zu tun, rufen Sie die Datei streamlit/daily_use/data_sync_page.py auf. Dort finden Sie einen TODO-Kommentar zum Hinzufügen von Texteingaben für Ressourcenparameter. Die Ressourcenparameter sollten die Identifizierung und das Abrufen von Daten aus dem Quellsystem ermöglichen. Diese Parameter können dann während der Datenaufnahme extrahiert werden:

# TODO: specify all the properties needed to define a resource in the source system. A subset of those properties should allow for a identification of a single resource, be it a table, endpoint, repository or some other data storage abstraction
st.text_input(
    "Resource name",
    key="resource_name",
)
st.text_input(
    "Some resource parameter",
    key="some_resource_parameter"
)
Copy

Sobald alle erforderlichen Eigenschaften zum Formular hinzugefügt wurden, können sie an die Backend-Seite übergeben werden. Zunächst muss der Zustand der Textfelder extrahiert und auf API-Ebene an die Methode queue_resource in der streamlit/daily_use/data_sync_page.py-Datei übergeben werden:

def queue_resource():
    # TODO: add additional properties here and pass them to create_resource function
    resource_name = st.session_state.get("resource_name")
    some_resource_parameter = st.session_state.get("some_resource_parameter")

    if not resource_name:
        st.error("Resource name cannot be empty")
        return

    result = create_resource(resource_name, some_resource_parameter)
    if result.is_ok():
        st.success("Resource created")
    else:
        st.error(result.get_message())
Copy

Dann muss die Funktion create_resource aus der Datei streamlit/native_sdk_api/resource_management.py aktualisiert werden:

def create_resource(resource_name, some_resource_parameter):
    ingestion_config = [{
        "id": "ingestionConfig",
        "ingestionStrategy": "INCREMENTAL",
        # TODO: HINT: scheduleType and scheduleDefinition are currently not supported out of the box, due to globalSchedule being used. However, a custom implementation of the scheduler can use those fields. They need to be provided becuase they are mandatory in the resourceDefinition.
        "scheduleType": "INTERVAL",
        "scheduleDefinition": "60m"
    }]
    # TODO: HINT: resource_id should allow identification of a table, endpoint etc. in the source system. It should be unique.
    resource_id = {
        "resource_name": resource_name,
    }
    id = f"{resource_name}_{random_suffix()}"

    # TODO: if you specified some additional resource parameters then you need to put them inside resource metadata:
    resource_metadata = {
        "some_resource_parameter": some_resource_parameter
    }

    return call_procedure("PUBLIC.CREATE_RESOURCE",
                          [
                              varchar_argument(id),
                              variant_argument(resource_id),
                              variant_list_argument(ingestion_config),
                              varchar_argument(id),
                              "true",
                              variant_argument(resource_metadata)
                          ])
Copy

Anpassen der CREATE_RESOURCE()-Prozedurlogik

Die PUBLIC.CREATE_RESOURCE()-Prozedur ermöglicht es dem Entwickler, ihre Ausführung durch die Implementierung einer benutzerdefinierten Logik anzupassen, die an mehreren Stellen des Hauptausführungsablaufs eingesteckt wird. Die SDK ermöglicht es dem Entwickler:

  1. Validieren Sie die Ressource, bevor sie erstellt wird. Die Logik sollte in der PUBLIC.CREATE_RESOURCE_VALIDATE()-Prozedur implementiert werden.

  2. Führen Sie benutzerdefinierte Vorgänge durch, bevor die Ressource erstellt wird. Die Logik sollte in der PUBLIC.PRE_CREATE_RESOURCE()-Prozedur implementiert werden.

  3. Führen Sie benutzerdefinierte Vorgänge aus, nachdem die Ressource erstellt wurde. Die Logik sollte in der PUBLIC.POST_CREATE_RESOURCE()-Prozedur implementiert werden.

Weitere Informationen über die Anpassung der Prozedur unter PUBLIC.CREATE_RESOURCE() finden Sie hier:

TemplateCreateResourceHandler.java

Diese Klasse ist ein Handler für die PUBLIC.CREATE_RESOURCE()-Prozedur. Hier können Sie die Java-Implementierungen der Handler für die bereits erwähnten Callback-Prozeduren injizieren. Standardmäßig bietet die Vorlage gespiegelte Java-Implementierungen von Callback-Handlern, um den Aufruf von SQL-Prozeduren zu vermeiden, der die Ausführungszeit der Prozedur verlängern würde – Java-Implementierungen machen die Ausführung schneller. Diese nachgebildeten Implementierungen tun nichts anderes, als eine erfolgreiche Antwort zurückzugeben. Sie können entweder die benutzerdefinierte Implementierung für die von der Vorlage vorbereiteten Callback-Klassen bereitstellen oder diese Callbacks von Grund auf neu erstellen und sie in den Hauptablauf der Ausführung der Prozedur im Handler-Builder einfügen.

Um die benutzerdefinierte Logik in Callback-Methoden zu implementieren, die standardmäßig aufgerufen werden, suchen Sie nach den folgenden Kommentare im Code:

  • TODO: IMPLEMENT ME create resource validate

  • TODO: IMPLEMENT ME pre create resource callback

  • TODO: IMPLEMENT ME post create resource callback

Datenaufnahme (Ingestion)

Um die Aufnahme von Daten durchzuführen, müssen Sie eine Klasse implementieren, die die Verbindung mit dem Quellsystem herstellt und die Daten auf der Grundlage der Ressourcenkonfiguration abruft. Die Module Scheduler und Task Reactor sorgen dafür, dass Datenaufnahmeaufgaben ausgelöst und zur Warteschlange hinzugefügt werden.

Die Datenaufnahmelogik wird von der Klasse TemplateIngestion aufgerufen. Suchen Sie den Kommentar TODO: IMPLEMENT ME ingestion im Code und ersetzen Sie die zufällige Datengenerierung durch den Datenabruf aus dem Quellsystem. Wenn Sie der Ressourcendefinition kundenspezifische Eigenschaften hinzugefügt haben, können diese aus den internen Konnektortabellen mithilfe von ResourceIngestionDefinitionRepository und den in TemplateWorkItem verfügbaren Eigenschaften abgerufen werden:

  • resourceIngestionDefinitionId

  • ingestionConfigurationId

Ein Beispiel für den Abruf von Daten von einem Webservice könnte folgendermaßen aussehen:

public final class SourceSystemHttpHelper {

  private static final String DATA_URL = "https://source_system.com/data/%s";
  private static final SourceSystemHttpClient sourceSystemClient = new SourceSystemHttpClient();
  private static final ObjectMapper objectMapper = new ObjectMapper();

  private static List<Variant> fetchData(String resourceId) {
    var response = sourceSystemClient.get(String.format(url, resourceId));
    var body = response.body();

    try {
        return Arrays.stream(objectMapper.readValue(body, Map[].class))
              .map(Variant::new)
              .collect(Collectors.toList());
    } catch (JsonProcessingException e) {
      throw new RuntimeException("Cannot parse json", e);
    }
  }
}
Copy
public class SourceSystemHttpClient {

  private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(15);

  private final HttpClient client;
  private final String secret;

  public SourceSystemHttpClient() {
    this.client = HttpClient.newHttpClient();
    this.secret =
        SnowflakeSecrets.newInstance()
            .getGenericSecretString(ConnectionConfiguration.TOKEN_NAME);
  }

  public HttpResponse<String> get(String url) {
    var request =
        HttpRequest.newBuilder()
            .uri(URI.create(url))
            .GET()
            .header("Authorization", format("Bearer %s", secret))
            .header("Content-Type", "application/json")
            .timeout(REQUEST_TIMEOUT)
            .build();

    try {
      return client.send(request, HttpResponse.BodyHandlers.ofString());
    } catch (IOException | InterruptedException ex) {
      throw new RuntimeException(format("HttpRequest failed: %s", ex.getMessage()), ex);
    }
  }
}
Copy

Lebenszyklus von Ressourcen verwalten

Sobald die Logik zur Erstellung von Ressourcen und deren Aufnahme implementiert ist, können Sie deren Lebenszyklus verwalten, indem Sie die folgenden Prozeduren aufrufen:

  1. PUBLIC.ENABLE_RESOURCE() aktiviert eine bestimmte Ressource, was bedeutet, dass sie für die Datenaufnahme eingeplant wird

  2. PUBLIC.DISABLE_RESOURCE() deaktiviert eine bestimmte Ressource, was bedeutet, dass ihre Aufnahmeplanung gestoppt wird

  3. PUBLIC.UPDATE_RESOURCE() ermöglicht es Ihnen, die Datenaufnahmekonfigurationen einer bestimmten Ressource zu aktualisieren. Diese Funktion ist nicht standardmäßig in der Streamlit-UI implementiert, da es manchmal unerwünscht sein kann, dass der Benutzer des Konnektors die Konfiguration der Datenaufnahme (Ingestion) anpassen kann (widerrufen Sie die Berechtigungen für diese Prozedur für die ADMIN-Anwendungsrolle, um ihre Verwendung vollständig zu unterbinden).

Alle diese Prozeduren verfügen über Java-Handler und werden durch Rückrufe erweitert, mit denen Sie ihre Ausführung anpassen können. Sie können benutzerdefinierte Implementierungen von Rückrufen einfügen, indem Sie die Builder für diese Handler verwenden. Standardmäßig stellt die Vorlage simulierte Java-Implementierungen von Callback-Handlern bereit. Diese nachgebildeten Implementierungen tun nichts anderes, als eine erfolgreiche Antwort zurückzugeben. Sie können entweder die benutzerdefinierte Implementierung für die von der Vorlage vorbereiteten Callback-Klassen bereitstellen oder diese Callbacks von Grund auf neu erstellen und sie in den Hauptablauf der Ausführung der Prozedur in den Handler-Builders einfügen.

TemplateEnableResourceHandler.java

Bei dieser Klasse handelt es sich um einen Handler für die PUBLIC.ENABLE_RESOURCE()-Prozedur, der um die entsprechenden Callbacks erweitert werden kann:

  1. Validieren Sie die Ressource, bevor sie aktiviert wird. Achten Sie auf den Kommentar TODO: IMPLEMENT ME enable resource validate im Code, um die benutzerdefinierte Implementierung bereitzustellen.

  2. Führen Sie benutzerdefinierte Vorgänge durch, bevor die Ressource aktiviert wird. Achten Sie auf den Kommentar TODO: IMPLEMENT ME pre enable resource im Code, um die benutzerdefinierte Implementierung bereitzustellen.

  3. Führen Sie benutzerdefinierte Vorgänge aus, nachdem die Ressource aktiviert wurde. Achten Sie auf den Kommentar TODO: IMPLEMENT ME post enable resource im Code, um die benutzerdefinierte Implementierung bereitzustellen.

Erfahren Sie mehr in den ausführlichen Dokumentationen der PUBLIC.ENABLE_RESOURCE()-Prozedur:

TemplateDisableResourceHandler.java

Bei dieser Klasse handelt es sich um einen Handler für die PUBLIC.DISABLE_RESOURCE()-Prozedur, der um die entsprechenden Callbacks erweitert werden kann:

  1. Validieren Sie die Ressource, bevor sie deaktiviert wird. Achten Sie auf den Kommentar TODO: IMPLEMENT ME disable resource validate im Code, um die benutzerdefinierte Implementierung bereitzustellen.

  2. Führen Sie benutzerdefinierte Vorgänge durch, bevor die Ressource deaktiviert wird. Achten Sie auf den Kommentar TODO: IMPLEMENT ME pre disable resource im Code, um die benutzerdefinierte Implementierung bereitzustellen.

Erfahren Sie mehr in den ausführlichen Dokumentationen der PUBLIC.DISABLE_RESOURCE()-Prozedur:

TemplateUpdateResourceHandler.java

Bei dieser Klasse handelt es sich um einen Handler für die PUBLIC.UPDATE_RESOURCE()-Prozedur, der um die entsprechenden Callbacks erweitert werden kann:

  1. Validieren Sie die Ressource, bevor sie aktualisiert wird. Achten Sie auf den Kommentar TODO: IMPLEMENT ME update resource validate im Code, um die benutzerdefinierte Implementierung bereitzustellen.

  2. Führen Sie benutzerdefinierte Vorgänge aus, bevor die Ressource aktualisiert wird. Achten Sie auf den Kommentar TODO: IMPLEMENT ME pre update resource im Code, um die benutzerdefinierte Implementierung bereitzustellen.

  3. Führen Sie benutzerdefinierte Vorgänge aus, nachdem die Ressource aktualisiert wurde. Achten Sie auf den Kommentar TODO: IMPLEMENT ME post update resource im Code, um die benutzerdefinierte Implementierung bereitzustellen.

Erfahren Sie mehr in den ausführlichen Dokumentationen der PUBLIC.UPDATE_RESOURCE()-Prozedur:

Einstellungen

Die Vorlage enthält eine Registerkarte für Einstellungen, auf der Sie alle zuvor vorgenommenen Konfigurationen einsehen können. Wenn jedoch die Konfigurationseigenschaften angepasst wurden, dann muss auch diese Ansicht angepasst werden. Den Code für die Registerkarte mit den Einstellungen finden Sie in der Datei streamlit/daily_use/settings_page.py.

Zum Anpassen extrahieren Sie einfach die Werte aus der Konfiguration für die Schlüssel, die in den jeweiligen Konfigurationen hinzugefügt wurden. Wenn beispielsweise zuvor im Schritt Verbindungskonfiguration additional_connection_property hinzugefügt wurde, könnte es in der Einstellungsansicht wie folgt hinzugefügt werden:

def connection_config_page():
    current_config = get_connection_configuration()

    # TODO: implement the display for all the custom properties defined in the connection configuration step
    custom_property = current_config.get("custom_connection_property", "")
    additional_connection_property = current_config.get("additional_connection_property", "")


    st.header("Connector configuration")
    st.caption("Here you can see the connector connection configuration saved during the connection configuration step "
               "of the Wizard. If some new property was introduced it has to be added here to display.")
    st.divider()

    st.text_input(
        "Custom connection property:",
        value=custom_property,
        disabled=True
    )
    st.text_input(
        "Additional connection property:",
        value=additional_connection_property,
        disabled=True
    )
    st.divider()
Copy