Tutoriel : Modèle de connecteur Java Snowflake Native SDK for Connectors

Introduction

Bienvenue dans notre tutoriel sur l’utilisation d’un modèle de connecteur à l’aide de Snowflake Native SDK for Connectors. Ce guide vous aidera à configurer une application native Connector simple.

Dans ce tutoriel, vous apprendrez à :

  • Déployer une application native Connector

  • Configurer un modèle de connecteur pour ingérer des données

  • Personnaliser un modèle de connecteur en fonction de vos propres besoins

Le modèle contient divers commentaires utiles dans le code pour vous permettre de trouver plus facilement les fichiers spécifiques qui doivent être modifiés. Cherchez les commentaires avec les mots-clés suivants, ils vous guideront et aideront à mettre en œuvre votre propre connecteur :

  • TODO

  • TODO: HINT

  • TODO: IMPLEMENT ME

Avant de commencer ce tutoriel, vous devez vous préparer en consultant le contenu recommandé ci-dessous :

Conditions préalables

Avant de commencer, assurez-vous que vous remplissez les conditions suivantes :

Préparer votre environnement local

Avant de poursuivre, vous devez vous assurer que tous les logiciels nécessaires sont installés sur votre machine et cloner le modèle de connecteur.

Installation de Java

Snowflake Native SDK for Connectors exige la version 11 ou supérieure de Java LTS (Long-Term Support). Si la version minimale requise de Java n’est pas installée sur votre machine, vous devez installer soit Oracle Java, soit OpenJDK.

Oracle Java

La dernière version de LTS de JDK peut être téléchargée et utilisée gratuitement sous Oracle NFTC. Pour les instructions de téléchargement et d’installation, consultez la page Oracle.

OpenJDK

OpenJDK est une implémentation open-source de Java. Pour le téléchargement et les instructions d’installation, consultez openjdk.org et jdk.java.net.

Vous pouvez également utiliser une version tierce OpenJDK, telle que Eclipse Temurin ou Amazon Corretto.

Configuration de la CLI Snowflake

L’outil de la CLI Snowflake est une exigence pour concevoir, déployer et installer le connecteur. Si vous n’avez pas la CLI Snowflake sur votre machine - installez-la selon les instructions disponibles ici.

Une fois l’outil installé, vous devez configurer une connexion à Snowflake dans votre fichier de configuration.

Si aucune connexion n’est configurée, créez-en une nouvelle nommée native_sdk_connection. Vous trouverez un exemple de connexion dans le fichier deployment/snowflake.toml.

Si vous avez déjà configuré une connexion et que vous souhaitez l’utiliser avec le connecteur - utilisez son nom au lieu de native_sdk_connection chaque fois que cette connexion est utilisée dans ce tutoriel.

Clonage de modèles

Pour cloner le modèle de connecteur, utilisez la commande suivante :

snow init <project_dir> \
  --template-source https://github.com/snowflakedb/connectors-native-sdk \
  --template templates/connectors-native-sdk-template

À la place de <project_dir> entrez le nom du répertoire (il ne doit pas exister) dans lequel le projet Java de votre connecteur sera créé.

Après l’exécution de la commande, il vous sera demandé de fournir des informations supplémentaires pour la configuration de l’instance de l’application et du nom de la zone de préparation. Vous pouvez fournir n’importe quel nom, à condition qu’il s’agisse d’identificateurs Snowflake sans guillemets valides, ou cliquer sur Entrée pour utiliser les valeurs par défaut, qui sont indiquées entre crochets.

Un exemple d’exécution de commande, fournissant des noms d’application et de zone de préparation personnalisés :

$ snow init my_connector \
    --template-source https://github.com/snowflakedb/connectors-native-sdk \
    --template templates/connectors-native-sdk-template

Name of the application instance which will be created in Snowflake [connectors-native-sdk-template]: MY_CONNECTOR
Name of the schema in which the connector files stage will be created [TEST_SCHEMA]:
Name of the stage used to store connector files in the application package [TEST_STAGE]: CUSTOM_STAGE_NAME
Initialized the new project in my_connector

Conception, déploiement et nettoyage du connecteur

Le modèle peut être déployé dès sa sortie de l’emballage, avant même toute modification. Les sections suivantes vous montreront comment concevoir, déployer et installer le connecteur.

Créer le connecteur

La conception d’un connecteur créé à l’aide de Snowflake Native SDK for Connectors est un peu différente de celle d’une application Java classique. Il y a d’autres choses à faire que de concevoir les archives .jar à partir des sources. La conception de l’application comprend les étapes suivantes :

  1. Copier des composants internes personnalisés dans le répertoire de conception

  2. Copier des composants SDK dans le répertoire de conception

Copier des composants internes

Cette étape permet de créer le fichier .jar du connecteur et de le copier (avec les fichiers UI, manifest et setup) dans le répertoire sf_build.

Pour exécuter cette étape, exécutez la commande : ./gradlew copyInternalComponents.

Copier les composants SDK

Cette étape copie le fichier .jar SDK (ajouté en tant que dépendance au module Gradle du connecteur) dans le répertoire sf_build et extrait les fichiers .sql regroupés de l’archive .jar.

Ces fichiers .sql permettent de personnaliser les objets fournis qui seront créés lors de l’installation de l’application. Pour les premiers utilisateurs, la personnalisation n’est pas recommandée, car l’omission d’objets peut entraîner l’échec de certaines fonctionnalités si elle n’est pas effectuée correctement. L’application du connecteur de modèles utilise le fichier all.sql, qui crée tous les objets SDK recommandés.

Pour exécuter cette étape, exécutez la commande : ./gradlew copySdkComponents.

Déployer le connecteur

Pour déployer une Native App, un paquet d’application doit être créé dans Snowflake. Ensuite, tous les fichiers du répertoire sf_build doivent être téléchargés vers Snowflake.

Veuillez noter qu’à des fins de développement, la création de versions est facultative. Une instance d’application peut être créée directement à partir de fichiers en zone de préparation. Cette approche vous permet de voir les modifications apportées à la plupart des fichiers du connecteur sans avoir à recréer la version et l’instance de l’application.

Les opérations suivantes seront effectuées :

  1. Créer un nouveau paquet d’application, s’il n’existe pas déjà

  2. Créer un schéma et une zone de préparation des fichiers à l’intérieur du paquet

  3. Télécharger les fichiers du répertoire sf_build vers la zone de préparation (cette étape peut prendre un certain temps)

Pour déployer le connecteur, exécutez la commande : snow app deploy --connection=native_sdk_connection.

Pour plus d’informations sur la commande snow app deploy, consultez snow app deploy.

Le paquet d’application créé sera désormais visible dans l’onglet App packages, dans la catégorie Data products, dans le compte Snowflake UI de votre compte.

Installer le connecteur

L’installation de l’application est la dernière étape du processus. Il crée une application à partir du paquet d’application créé précédemment.

Pour installer le connecteur, exécutez la commande : snow app run --connection=native_sdk_connection.

Pour plus d’informations sur la commande snow app run, consultez snow app run.

L’application installée sera désormais visible dans l’onglet Installed apps, dans la catégorie Data products, dans le compte Snowflake UI de votre compte.

Mettre à jour les fichiers des connecteurs

Si, à un moment donné, vous souhaitez modifier l’un des fichiers du connecteur, vous pouvez facilement télécharger les fichiers modifiés dans la zone de préparation du paquet d’application. La commande de téléchargement dépend des fichiers qui ont été mis à jour.

Avant d’exécuter les commandes de mise à jour, vous devez copier les nouveaux fichiers de votre connecteur dans le répertoire sf_build en exécutant la commande : ./gradlew copyInternalComponents

Fichiers UI .py ou fichiers .java du connecteur

Utilisez la commande snow app deploy --connection=native_sdk_connection, l’instance actuelle de l’application utilisera les nouveaux fichiers sans réinstallation.

Fichiers setup.sql ou manifest.yml

Utilisez la commande snow app run --connection=native_sdk_connection, l’instance de l’application en cours sera réinstallée après le téléchargement des nouveaux fichiers dans la zone de préparation.

Nettoyage

Une fois le tutoriel terminé, ou si pour une raison quelconque vous souhaitez supprimer l’application et son paquet, vous pouvez les supprimer complètement de votre compte à l’aide de la commande :

snow app teardown --connection=native_sdk_connection --cascade --force

L’option --cascade est nécessaire pour supprimer la base de données de destination sans en transférer la propriété au compte admin. Dans les connecteurs réels, la base de données ne doit pas être supprimée pour préserver les données ingérées. Elle doit être la propriété de l’administrateur du compte ou la propriété doit être transférée avant la désinstallation.

Veuillez noter que le connecteur consommera des crédits jusqu’à ce qu’il soit mis en pause ou supprimé, même si aucune ingestion n’a été configurée !

Étape préalable

Juste après l’installation, le connecteur est en phase d’assistant. Cette phase consiste en quelques étapes qui guident l’utilisateur final à travers toutes les configurations nécessaires.

La première étape est celle des conditions préalables. Elle est facultative et peut ne pas être nécessaire pour tous les connecteurs. Les conditions préalables sont généralement des actions requises par l’utilisateur en dehors de l’application, par exemple l’exécution de requêtes dans la feuille de calcul SQL, la configuration du côté du système source, etc.

En savoir plus sur les conditions préalables : Conditions préalables

Le contenu de chaque prérequis est récupéré directement dans la table STATE.PREREQUISITES, située à l’intérieur du connecteur. Elles peuvent être personnalisées à l’aide du script setup.sql. Cependant, n’oubliez pas que le script setup.sql est exécuté à chaque installation, mise à niveau et rétrogradation de l’application. Les insertions doivent être idempotentes, c’est pourquoi il est recommandé d’utiliser une requête de fusion comme dans l’exemple ci-dessous :

MERGE INTO STATE.PREREQUISITES AS dest
USING (SELECT * FROM VALUES
           ('1',
            'Sample prerequisite',
            'Prerequisites can be used to notice the end user of the connector about external configurations. Read more in the SDK documentation below. This content can be modified inside `setup.sql` script',
            'https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/prerequisites',
            NULL,
            NULL,
            1
           )
) AS src (id, title, description, documentation_url, learnmore_url, guide_url, position)
ON dest.id = src.id
WHEN NOT MATCHED THEN
    INSERT (id, title, description, documentation_url, learnmore_url, guide_url, position)
    VALUES (src.id, src.title, src.description, src.documentation_url, src.learnmore_url, src.guide_url, src.position);
Copy

Étape de configuration du connecteur

L’étape suivante de la phase de l’assistant est celle de la configuration du connecteur. Au cours de cette étape, vous pouvez configurer les objets de la base de données et les autorisations requises par le connecteur. Cette étape permet de spécifier les propriétés de configuration suivantes :

  • warehouse

  • operational_warehouse

  • cortex_warehouse

  • destination_database

  • destination_schema

  • global_schedule

  • data_owner_role

  • cortex_user_role

  • agent_username

  • agent_role

Si vous avez besoin d’autres propriétés personnalisées, elles peuvent être configurées dans l’une des étapes suivantes de la phase de l’assistant. Pour plus d’informations sur chacune des propriétés, voir : Configuration du connecteur

En outre, le composant Streamlit (streamlit/wizard/connector_config.py) fourni dans le modèle montre comment déclencher l”autorisation Native Apps SDK et demande l’octroi de privilèges à l’utilisateur final. Tant que les propriétés disponibles répondent aux besoins du connecteur, il n’est pas nécessaire d’écraser les classes du backend, bien que cela soit possible de la même manière que pour les composants dans les étapes suivantes de la configuration.

Pour plus d’informations sur les procédures internes et les objets Java, voir : Référence de la configuration des connecteurs

L’exemple Streamlit fourni permet de demander des privilèges au niveau du compte configuré dans le fichier manifest.yml - CREATE DATABASE et EXECUTE TASKS. Il permet également à l’utilisateur de spécifier une référence d’entrepôt par le biais de la fenêtre contextuelle SDK d’autorisation.

Dans le modèle, il est demandé à l’utilisateur de ne fournir que destination_database et destination_schema. Cependant, un commentaire TODO dans streamlit/wizard/connector_configuration.py contient du code commenté qui peut être réutilisé pour afficher plus de champs d’entrée dans l’UI de Streamlit.

# TODO: Here you can add additional fields in connector configuration.
# For example:
st.subheader("Operational warehouse")
input_col, _ = st.columns([2, 1])
with input_col:
    st.text_input("", key="operational_warehouse", label_visibility="collapsed")
st.caption("Name of the operational warehouse to be used")
Copy

Étape de configuration de connexion

L’étape suivante de la phase de l’assistant est celle de la configuration de la connexion. Cette étape permet à l’utilisateur final de configurer les paramètres de connectivité externe pour le connecteur. Cette configuration peut inclure des identificateurs d’objets tels que des secrets, des intégrations, etc.

Comme ces informations varient en fonction du système source des données ingérées par le connecteur, c’est le premier endroit où des personnalisations plus importantes doivent être effectuées dans le code source.

Pour plus d’informations sur la configuration des connexions, voir :

En commençant par le côté UI de Streamlit (streamlit/wizard/connection_config.py), vous devez ajouter des entrées de texte pour tous les paramètres nécessaires. Un exemple d’entrée de texte est implémenté pour vous et si vous recherchez le code dans ce fichier, vous trouverez un TODO avec du code commenté pour un nouveau champ.

# TODO: Additional configuration properties can be added to the UI like this:
st.subheader("Additional connection parameter")
input_col, _ = st.columns([2, 1])
with input_col:
    st.text_input("", key="additional_connection_property", label_visibility="collapsed")
st.caption("Some description of the additional property")
Copy

Une fois les propriétés ajoutées au formulaire, elles doivent être transmises à la couche dorsale du connecteur. Pour ce faire, deux endroits supplémentaires doivent être modifiés dans les fichiers Streamlit. Le premier concerne la fonction finish_config dans le fichier streamlit/wizard/connection_config.py. L’état des entrées de texte nouvellement ajoutées doit être lu ici. En outre, il peut être validé si nécessaire, puis transmis à la fonction set_connection_configuration.

Par exemple, si additional_connection_property a été ajouté, il se présentera comme suit après les modifications :

def finish_config():
try:
    # TODO: If some additional properties were specified they need to be passed to the set_connection_configuration function.
    # The properties can also be validated, for example, check whether they are not blank strings etc.
    response = set_connection_configuration(
        custom_connection_property=st.session_state["custom_connection_property"],
        additional_connection_property=st.session_state["additional_connection_property"],
    )

# rest of the method without changes
Copy

Ensuite, la fonction set_connection_configuration doit être éditée, elle se trouve dans le fichier streamlit/native_sdk_api/connection_config.py. Cette fonction est un proxy entre l’UI de Streamlit et la procédure SQL sous-jacente, qui est un point d’entrée vers le backend du connecteur.

def set_connection_configuration(custom_connection_property: str, additional_connection_property: str):
    # TODO: this part of the code sends the config to the backend so all custom properties need to be added here
    config = {
        "custom_connection_property": escape_identifier(custom_connection_property),
        "additional_connection_property": escape_identifier(additional_connection_property),
    }

    return call_procedure(
        "PUBLIC.SET_CONNECTION_CONFIGURATION",
        [variant_argument(config)]
    )
Copy

Ensuite, la nouvelle propriété est enregistrée dans la table du connecteur interne, qui contient la configuration. Cependant, les possibilités de personnalisation ne s’arrêtent pas là. Certains composants du backend peuvent également être personnalisés, recherchez les commentaires suivants dans le code pour les trouver :

  • TODO: IMPLEMENT ME connection configuration validate

  • TODO: IMPLEMENT ME connection callback

  • TODO: IMPLEMENT ME test connection

La partie « valider » permet d’effectuer toute validation supplémentaire sur les données reçues de l’UI. Il peut également transformer les données, par exemple changer la casse des caractères, découper les données fournies ou vérifier si les objets portant les noms fournis existent réellement à l’intérieur de Snowflake.

Le rappel de connexion est une partie qui vous permet d’effectuer toute opération supplémentaire basée sur la configuration, par exemple modifier les procédures qui doivent utiliser des intégrations d’accès externes, à l’aide d’une solution décrite dans Référence de mise en place d’intégration externe.

Le test de connexion est le dernier élément de la configuration de la connexion. Il permet de vérifier si la connexion peut être établie entre le connecteur et le système source.

Pour plus d’informations sur ces composants internes, voir :

Voici des exemples de mises en œuvre possibles :

public class TemplateConfigurationInputValidator implements ConnectionConfigurationInputValidator {

    private static final String ERROR_CODE = "INVALID_CONNECTION_CONFIGURATION";

    @Override
    public ConnectorResponse validate(Variant config) {
      // TODO: IMPLEMENT ME connection configuration validate: If the connection configuration input
      // requires some additional validation this is the place to implement this logic.
      // See more in docs:
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
      var integrationCheck = checkParameter(config, INTEGRATION_PARAM, false);
      if (!integrationCheck.isOk()) {
        return integrationCheck;
      }

      var secretCheck = checkParameter(config, SECRET_PARAM, true);
      if (!secretCheck.isOk()) {
        return ConnectorResponse.error(ERROR_CODE);
      }

      return ConnectorResponse.success();
    }
}
Copy
public class TemplateConnectionConfigurationCallback implements ConnectionConfigurationCallback {

    private static final String[] EXTERNAL_SOURCE_PROCEDURE_SIGNATURES = {
        asVarchar(format("%s.%s()", PUBLIC_SCHEMA, TEST_CONNECTION_PROCEDURE)),
        asVarchar(format("%s.%s(VARIANT)", PUBLIC_SCHEMA, FINALIZE_CONNECTOR_CONFIGURATION_PROCEDURE)),
        asVarchar(format("%s.%s(NUMBER, STRING)", PUBLIC_SCHEMA, WORKER_PROCEDURE))
      };

    private final Session session;

    public TemplateConnectionConfigurationCallback(Session session) {
      this.session = session;
    }

    @Override
    public ConnectorResponse execute(Variant config) {
      // TODO: If you need to alter some procedures with external access you can use
      // configureProcedure method or implement a similar method on your own.
      // TODO: IMPLEMENT ME connection callback: Implement the custom logic of changes in application
      // to be done after connection configuration, like altering procedures with external access.
      // See more in docs:
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
      var response = configureProceduresWithReferences();
      if (response.isNotOk()) {
         return response;
      }
      return ConnectorResponse.success();
    }

    private ConnectorResponse configureProceduresWithReferences() {
      return callProcedure(
        session,
        PUBLIC_SCHEMA,
        SETUP_EXTERNAL_INTEGRATION_WITH_NAMES_PROCEDURE,
        EXTERNAL_SOURCE_PROCEDURE_SIGNATURES);
    }
}
Copy
public class TemplateConnectionValidator {

    private static final String ERROR_CODE = "TEST_CONNECTION_FAILED";

    public static Variant testConnection(Session session) {
      // TODO: IMPLEMENT ME test connection: Implement the custom logic of testing the connection to
      // the source system here. This usually requires connection to some webservice or other external
      // system. It is suggested to perform only the basic connectivity validation here.
      // If that's the case then this procedure must be altered in TemplateConnectionConfigurationCallback first.
      // See more in docs:
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
      return test().toVariant();
    }

    private static ConnectorResponse test() {
      try {
        var response = SourceSystemHttpHelper.testEndpoint();

        if (isSuccessful(response.statusCode())) {
          return ConnectorResponse.success();
        } else {
          return ConnectorResponse.error(ERROR_CODE, "Connection to source system failed");
        }
      } catch (Exception exception) {
        return ConnectorResponse.error(ERROR_CODE, "Test connection failed");
      }
    }
}
Copy

Finaliser l’étape de configuration

L’étape de finalisation de la configuration du connecteur est la dernière étape de la phase de l’assistant. Cette étape comporte de multiples responsabilités :

  1. Permet à l’utilisateur de spécifier toute configuration supplémentaire nécessaire au connecteur

  2. Crée la base de données réceptrice, le schéma et les tables et vues supplémentaires pour les données ingérées, si nécessaire

  3. Initialise les composants internes tels que le planificateur et le réacteur de tâches

Pour plus d’informations sur la finalisation de la configuration, voir :

Pour plus d’informations sur le réacteur de tâches et la planification, voir :

De la même manière que pour la configuration de la connexion, la personnalisation peut être lancée à l’aide de l’UI de Streamlit. Le fichier streamlit/wizard/finalize_config.py contient une forme avec un exemple de propriété. D’autres propriétés peuvent être ajoutées en fonction des besoins du connecteur. Pour ajouter une autre propriété, recherchez un commentaire TODO, qui contient un exemple de code pour l’ajout d’une nouvelle propriété dans le fichier mentionné.

# TODO: Here you can add additional fields in finalize connector configuration.
# For example:
st.subheader("Some additional property")
input_col, _ = st.columns([2, 1])
with input_col:
    st.text_input("", key="some_additional_property", label_visibility="collapsed")
st.caption("Description of some new additional property")
Copy

Après avoir ajouté l’entrée de texte pour une nouvelle propriété, il faut la transmettre au backend. Pour ce faire, modifiez la fonction finalize_configuration dans le même fichier :

def finalize_configuration():
    try:
        st.session_state["show_main_error"] = False
        # TODO: If some additional properties were introduced, they need to be passed to the finalize_connector_configuration function.
        response = finalize_connector_configuration(
            st.session_state.get("custom_property"),
            st.session_state.get("some_additional_property")
        )
Copy

Ensuite, ouvrez le fichier streamlit/native_sdk_api/finalize_config.py et ajoutez la nouvelle propriété à la fonction suivante :

def finalize_connector_configuration(custom_property: str, some_additional_property: str):
    # TODO: If some custom properties were configured, then they need to be specified here and passed to the FINALIZE_CONNECTOR_CONFIGURATION procedure.
    config = {
        "custom_property": custom_property,
        "some_additional_property": some_additional_property,
    }
    return call_procedure(
        "PUBLIC.FINALIZE_CONNECTOR_CONFIGURATION",
        [variant_argument(config)]
    )
Copy

De même que pour la configuration de la connexion, cette étape permet également de personnaliser divers composants du backend. Vous pouvez les trouver dans le code source à l’aide des commentaires suivants :

  • TODO: IMPLEMENT ME validate source

  • TODO: IMPLEMENT ME finalize internal

La partie « validate source » est chargée d’effectuer des validations plus sophistiquées sur les systèmes sources. Si le test de connexion précédent a seulement vérifié qu’une connexion peut être établie, la validation de la source pourrait vérifier l’accès à des données spécifiques dans le système, par exemple en extrayant un seul enregistrement de données.

« Finalize internal » est une procédure interne chargée d’initialiser le réacteur de tâches et le planificateur, de créer une base de données réceptrice et tous les objets imbriqués nécessaires. Elle peut également être utilisée pour enregistrer la configuration fournie lors de l’étape de finalisation (cette configuration n’est pas enregistrée par défaut).

Vous trouverez de plus amples informations sur les composants internes dans :

En outre, les entrées peuvent être validées en utilisant l’interface FinalizeConnectorInputValidator et en les fournissant au gestionnaire finalize - consultez le fichier TemplateFinalizeConnectorConfigurationCustomHandler. Vous trouverez de plus amples informations sur l’utilisation des constructeurs dans la rubrique : Personnalisation des procédures stockées et des gestionnaires (handlers)

Voici un exemple de mise en œuvre de « validate source » :

public class SourceSystemAccessValidator implements SourceValidator {

    @Override
    public ConnectorResponse validate(Variant variant) {
      // TODO: IMPLEMENT ME validate source: Implement the custom logic of validating the source
      // system. In some cases this can be the same validation that happened in
      // TemplateConnectionValidator.
      // However, it is suggested to perform more complex validations, like specific access rights to
      // some specific resources here.
      // See more in docs:
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/finalize_configuration_reference
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/finalize_configuration
      var finalizeProperties = Configuration.fromCustomConfig(variant);

      var httpResponse = SourceSystemHttpHelper.validateSource(finalizeProperties.get("custom_property"));
      return prepareConnectorResponse(httpResponse.statusCode());
    }

    private ConnectorResponse prepareConnectorResponse(int statusCode) {
      switch (statusCode) {
        case 200:
          return ConnectorResponse.success();
        case 401:
          return ConnectorResponse.error("Unauthorized error");
        case 404:
          return ConnectorResponse.error("Not found error");
        default:
          return ConnectorResponse.error("Unknown error");
      }
    }
}
Copy

Créer des ressources

Une fois la phase de l’assistant terminée, le connecteur est prêt à commencer à ingérer des données. Mais avant cela, les ressources doivent être mises en œuvre et configurées. Une ressource est une abstraction décrivant un ensemble spécifique de données dans le système source, par exemple une table, un point de terminaison, un fichier, etc.

Différents systèmes sources peuvent avoir besoin d’informations différentes sur une ressource. C’est pourquoi la définition d’une ressource doit être adaptée aux besoins spécifiques. Pour ce faire, accédez au fichier streamlit/daily_use/data_sync_page.py. Vous y trouverez un commentaire TODO sur l’ajout d’entrées de texte pour les paramètres des ressources. Les paramètres des ressources doivent permettre l’identification et la récupération des données du système source. Ces paramètres peuvent ensuite être extraits lors de l’ingestion.

# TODO: specify all the properties needed to define a resource in the source system. A subset of those properties should allow for a identification of a single resource, be it a table, endpoint, repository or some other data storage abstraction
st.text_input(
    "Resource name",
    key="resource_name",
)
st.text_input(
    "Some resource parameter",
    key="some_resource_parameter"
)
Copy

Une fois que toutes les propriétés nécessaires ont été ajoutées à la forme, elles peuvent être transmises au backend. Tout d’abord, l’état des champs de texte doit être extrait et transmis à la méthode queue_resource au niveau de l’API dans le fichier streamlit/daily_use/data_sync_page.py :

def queue_resource():
    # TODO: add additional properties here and pass them to create_resource function
    resource_name = st.session_state.get("resource_name")
    some_resource_parameter = st.session_state.get("some_resource_parameter")

    if not resource_name:
        st.error("Resource name cannot be empty")
        return

    result = create_resource(resource_name, some_resource_parameter)
    if result.is_ok():
        st.success("Resource created")
    else:
        st.error(result.get_message())
Copy

La fonction create_resource du fichier streamlit/native_sdk_api/resource_management.py doit alors être mise à jour :

def create_resource(resource_name, some_resource_parameter):
    ingestion_config = [{
        "id": "ingestionConfig",
        "ingestionStrategy": "INCREMENTAL",
        # TODO: HINT: scheduleType and scheduleDefinition are currently not supported out of the box, due to globalSchedule being used. However, a custom implementation of the scheduler can use those fields. They need to be provided becuase they are mandatory in the resourceDefinition.
        "scheduleType": "INTERVAL",
        "scheduleDefinition": "60m"
    }]
    # TODO: HINT: resource_id should allow identification of a table, endpoint etc. in the source system. It should be unique.
    resource_id = {
        "resource_name": resource_name,
    }
    id = f"{resource_name}_{random_suffix()}"

    # TODO: if you specified some additional resource parameters then you need to put them inside resource metadata:
    resource_metadata = {
        "some_resource_parameter": some_resource_parameter
    }

    return call_procedure("PUBLIC.CREATE_RESOURCE",
                          [
                              varchar_argument(id),
                              variant_argument(resource_id),
                              variant_list_argument(ingestion_config),
                              varchar_argument(id),
                              "true",
                              variant_argument(resource_metadata)
                          ])
Copy

Personnalisation de la logique de procédure CREATE_RESOURCE()

La procédure PUBLIC.CREATE_RESOURCE() permet au développeur de personnaliser son exécution en mettant en œuvre une logique personnalisée qui s’insère à plusieurs endroits du flux d’exécution principal. Le SDK permet au développeur les opérations suivantes :

  1. Validez la ressource avant sa création. La logique doit être mise en œuvre dans la procédure PUBLIC.CREATE_RESOURCE_VALIDATE().

  2. Effectuez des opérations personnalisées avant la création de la ressource. La logique doit être mise en œuvre dans la procédure PUBLIC.PRE_CREATE_RESOURCE().

  3. Effectuez des opérations personnalisées après la création de la ressource. La logique doit être mise en œuvre dans la procédure PUBLIC.POST_CREATE_RESOURCE().

Plus d’informations sur la personnalisation de la procédure PUBLIC.CREATE_RESOURCE() sont disponibles ici :

TemplateCreateResourceHandler.java

Cette classe est un gestionnaire pour la procédure PUBLIC.CREATE_RESOURCE(). Ici, vous pouvez injecter les implémentations Java des gestionnaires pour les procédures de rappel mentionnées précédemment. Par défaut, le modèle fournit des implémentations Java simulées des gestionnaires de rappel afin d’éviter d’appeler les procédures SQL, ce qui allongerait le temps d’exécution de la procédure - les implémentations Java accélèrent l’exécution. Ces implémentations simulées ne font rien d’autre que renvoyer une réponse de réussite. Vous pouvez soit fournir l’implémentation personnalisée aux classes de rappel préparées par le modèle, soit créer ces rappels à partir de zéro et les injecter dans le flux d’exécution de la procédure principale dans le générateur de gestionnaires.

Afin d’implémenter la logique personnalisée des méthodes de rappel appelées par défaut, recherchez les commentaires suivants dans le code :

  • TODO: IMPLEMENT ME create resource validate

  • TODO: IMPLEMENT ME pre create resource callback

  • TODO: IMPLEMENT ME post create resource callback

Ingestion

Pour réaliser l’ingestion de données, vous devez mettre en œuvre une classe qui gérera la connexion avec le système source et récupérera les données, en fonction de la configuration de la ressource. Les modules Planificateur et Task Reactor se chargeront du déclenchement et de la mise en file d’attente des tâches d’ingestion.

La logique d’ingestion est invoquée à partir de la classe TemplateIngestion. Recherchez le commentaire TODO: IMPLEMENT ME ingestion dans le code et remplacez la génération aléatoire de données par la récupération de données à partir du système source. Si vous avez ajouté des propriétés personnalisées à la définition de la ressource, elles peuvent être extraites des tables des connecteurs internes à l’aide de ResourceIngestionDefinitionRepository et des propriétés disponibles sur TemplateWorkItem :

  • resourceIngestionDefinitionId

  • ingestionConfigurationId

Un exemple de récupération de données à partir d’un service web pourrait ressembler à ceci :

public final class SourceSystemHttpHelper {

  private static final String DATA_URL = "https://source_system.com/data/%s";
  private static final SourceSystemHttpClient sourceSystemClient = new SourceSystemHttpClient();
  private static final ObjectMapper objectMapper = new ObjectMapper();

  private static List<Variant> fetchData(String resourceId) {
    var response = sourceSystemClient.get(String.format(url, resourceId));
    var body = response.body();

    try {
        return Arrays.stream(objectMapper.readValue(body, Map[].class))
              .map(Variant::new)
              .collect(Collectors.toList());
    } catch (JsonProcessingException e) {
      throw new RuntimeException("Cannot parse json", e);
    }
  }
}
Copy
public class SourceSystemHttpClient {

  private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(15);

  private final HttpClient client;
  private final String secret;

  public SourceSystemHttpClient() {
    this.client = HttpClient.newHttpClient();
    this.secret =
        SnowflakeSecrets.newInstance()
            .getGenericSecretString(ConnectionConfiguration.TOKEN_NAME);
  }

  public HttpResponse<String> get(String url) {
    var request =
        HttpRequest.newBuilder()
            .uri(URI.create(url))
            .GET()
            .header("Authorization", format("Bearer %s", secret))
            .header("Content-Type", "application/json")
            .timeout(REQUEST_TIMEOUT)
            .build();

    try {
      return client.send(request, HttpResponse.BodyHandlers.ofString());
    } catch (IOException | InterruptedException ex) {
      throw new RuntimeException(format("HttpRequest failed: %s", ex.getMessage()), ex);
    }
  }
}
Copy

Gérer le cycle de vie des ressources

Une fois la logique de création des ressources et leur ingestion implémentée, vous pouvez gérer leur cycle de vie en appelant les procédures suivantes :

  1. PUBLIC.ENABLE_RESOURCE() active une ressource particulière, ce qui signifie qu’elle sera planifiée pour l’ingestion

  2. PUBLIC.DISABLE_RESOURCE() désactive une ressource particulière, ce qui signifie que sa planification d’ingestion sera arrêtée

  3. PUBLIC.UPDATE_RESOURCE() vous permet de mettre à jour les configurations d’ingestion d’une ressource particulière. Elle n’est pas implémentée dans l’UI Streamlit par défaut car cela peut parfois être indésirable pour le développeur de permettre à l’utilisateur du connecteur de personnaliser la configuration d’ingestion (révoquer les autorisations sur cette procédure du rôle d’application ADMIN afin d’interdire complètement son utilisation).

Toutes ces procédures disposent de gestionnaires Java et sont complétées par des rappels qui vous permettent de personnaliser leur exécution. Vous pouvez injecter des implémentations personnalisées de rappels en utilisant les constructeurs de ces gestionnaires. Par défaut, le modèle fournit des implémentations Java simulées des gestionnaires de rappel. Ces implémentations simulées ne font rien d’autre que renvoyer une réponse de réussite. Vous pouvez soit fournir l’implémentation personnalisée aux classes de rappel préparées par le modèle, soit créer ces rappels à partir de zéro et les injecter dans le flux d’exécution de la procédure principale dans les générateurs de gestionnaires.

TemplateEnableResourceHandler.java

Cette classe est un gestionnaire pour la procédure PUBLIC.ENABLE_RESOURCE(), qui peut être étendue avec les rappels dédiés à :

  1. Validez la ressource avant qu’elle ne soit activée. Recherchez le commentaire TODO: IMPLEMENT ME enable resource validate dans le code pour fournir l’implémentation personnalisée.

  2. Effectuez des opérations personnalisées avant que la ressource ne soit activée. Recherchez le commentaire TODO: IMPLEMENT ME pre enable resource dans le code pour fournir l’implémentation personnalisée.

  3. Effectuez des opérations personnalisées après l’activation de la ressource. Recherchez le commentaire TODO: IMPLEMENT ME post enable resource dans le code pour fournir l’implémentation personnalisée.

Apprenez-en avec les documentations détaillées sur la procédure PUBLIC.ENABLE_RESOURCE() :

TemplateDisableResourceHandler.java

Cette classe est un gestionnaire pour la procédure PUBLIC.DISABLE_RESOURCE(), qui peut être étendue avec les rappels dédiés à :

  1. Validez la ressource avant qu’elle ne soit désactivée. Recherchez le commentaire TODO: IMPLEMENT ME disable resource validate dans le code pour fournir l’implémentation personnalisée.

  2. Effectuez des opérations personnalisées avant que la ressource ne soit désactivée. Recherchez le commentaire TODO: IMPLEMENT ME pre disable resource dans le code afin de fournir l’implémentation personnalisée.

Apprenez-en avec les documentations détaillées sur la procédure PUBLIC.DISABLE_RESOURCE() :

TemplateUpdateResourceHandler.java

Cette classe est un gestionnaire pour la procédure PUBLIC.UPDATE_RESOURCE(), qui peut être étendue avec les rappels dédiés à :

  1. Validez la ressource avant sa mise à jour. Recherchez le commentaire TODO: IMPLEMENT ME update resource validate dans le code pour fournir l’implémentation personnalisée.

  2. Effectuez des opérations personnalisées avant la mise à jour de la ressource. Recherchez le commentaire TODO: IMPLEMENT ME pre update resource dans le code pour fournir l’implémentation personnalisée.

  3. Effectuez des opérations personnalisées après la mise à jour de la ressource. Recherchez le commentaire TODO: IMPLEMENT ME post update resource dans le code pour fournir l’implémentation personnalisée.

Apprenez-en avec les documentations détaillées sur la procédure PUBLIC.UPDATE_RESOURCE() :

Paramètres

Le modèle contient un onglet « Paramètres » qui vous permet de voir toutes les configurations effectuées auparavant. Toutefois, si les propriétés de configuration ont été personnalisées, cette vue doit également l’être. Le code de l’onglet Paramètres se trouve dans le fichier streamlit/daily_use/settings_page.py.

Pour le personnaliser, il suffit d’extraire les valeurs de la configuration pour les clés qui ont été ajoutées dans les configurations respectives. Par exemple, si la propriété antérieure additional_connection_property a été ajoutée plus tôt dans l’étape de configuration de la connexion, elle pourrait être ajoutée comme suit :

def connection_config_page():
    current_config = get_connection_configuration()

    # TODO: implement the display for all the custom properties defined in the connection configuration step
    custom_property = current_config.get("custom_connection_property", "")
    additional_connection_property = current_config.get("additional_connection_property", "")


    st.header("Connector configuration")
    st.caption("Here you can see the connector connection configuration saved during the connection configuration step "
               "of the Wizard. If some new property was introduced it has to be added here to display.")
    st.divider()

    st.text_input(
        "Custom connection property:",
        value=custom_property,
        disabled=True
    )
    st.text_input(
        "Additional connection property:",
        value=additional_connection_property,
        disabled=True
    )
    st.divider()
Copy