Tutoriel : SDK natif pour les modèles Java pour les connecteurs

Introduction

Bienvenue dans notre tutoriel sur l’utilisation d’un modèle de connecteur à l’aide de Snowflake Native SDK for Connectors. Ce guide vous aidera à configurer une application native Connector simple.

Dans ce tutoriel, vous apprendrez à :

  • Déployer une application native Connector

  • Configurer un modèle de connecteur pour ingérer des données

  • Personnaliser un modèle de connecteur en fonction de vos propres besoins

Le modèle contient divers commentaires utiles dans le code pour vous permettre de trouver plus facilement les fichiers spécifiques qui doivent être modifiés. Cherchez les commentaires avec les mots-clés suivants, ils vous guideront et aideront à mettre en œuvre votre propre connecteur :

  • TODO

  • TODO: HINT

  • TODO: IMPLEMENT ME

Avant de commencer ce tutoriel, vous devez vous préparer en consultant le contenu recommandé ci-dessous :

Conditions préalables

Avant de commencer, assurez-vous que vous remplissez les conditions suivantes :

  • Java 11 installé

  • Accès au compte Snowflake avec le rôle ACCOUNTADMIN

  • Outil SnowSQL (CLI client) avec variable_substitution et exit_on_error configurés sur votre machine locale

  • Vous avez consulté cette page de documentation : SDK natif Snowflake pour les connecteurs et vous la gardez ouverte en ligne ou imprimée à partir de votre navigateur. Vous avez consulté ce quickstart : SDK Connector Native Java (facultatif, mais recommandé). Le quickstart utilise un exemple de connecteur basé sur un modèle et il peut être référencé pour vérifier des exemples d’implémentation de divers composants.

Initialisation et déploiement

Pour initialiser un projet, clonez le référentiel Native SDK for Connectors à partir de GitHub et copiez le répertoire /templates/native-sdk-connectors-java-template à l’emplacement souhaité pour le projet. Ce modèle contient tout le code nécessaire pour déployer une application native Connector fonctionnelle. Une fois cette opération effectuée, le modèle est prêt à être déployé.

Déploiement

Le modèle est prêt à être déployé et fournit un script de commodité qui gère l’ensemble du processus pour vous. Avant de déployer le connecteur, une connexion snowsql doit être spécifiée. Pour ce faire, ouvrez Makefile et indiquez le nom de la connexion dans la variable d’environnement CONNECTION.

Pour déployer rapidement l’application, allez dans le répertoire principal du modèle et exécutez la commande suivante :

make reinstall_application_from_version_dir
Copy

Cette commande permet d’effectuer les opérations suivantes :

  • Supprime les comptes APPLICATION et APPLICATION PACKAGE qui existaient précédemment dans le compte Snowflake.

  • Copie le fichier jar du SDK et les fichiers sql extraits du jar dans le répertoire cible de sf_build.

  • Copie les composants streamlit et java personnalisés de l’application dans le répertoire sf_build.

  • Crée un nouveau APPLICATION PACKAGE à partir des fichiers du répertoire sf_build à l’intérieur d’un compte Snowflake.

  • Crée une nouvelle instance APPLICATION à l’intérieur d’un compte Snowflake.

Ce processus prend environ 2 à 3 minutes. Une fois qu’il est terminé, naviguez vers l’onglet Data Products -> Apps à l’intérieur de Snowflake, votre connecteur devrait y être visible. Si vous avez beaucoup d’applications et que vous avez du mal à les trouver, essayez de taper NATIVE_SDK_CONNECTOR_TEMPLATE dans la barre de recherche ou, dans le cas d’un nom personnalisé d”APPLICATION, utilisez le nom personnalisé à la place. Ce connecteur est prêt à être configuré. Les étapes suivantes vous guident tout au long du processus et vous expliquent comment personnaliser chacune d’entre elles.

Si vous avez besoin de redéployer votre connecteur pendant l’une des étapes de ce tutoriel, par exemple pour tester vos modifications, il vous suffit de réexécuter la commande ci-dessus.

Étape préalable

Juste après le déploiement, le connecteur est dans sa phase d’assistant. Cette phase consiste en quelques étapes qui guident l’utilisateur final à travers toutes les configurations nécessaires. La première étape est celle des conditions préalables. Elle est facultative et peut ne pas être nécessaire pour tous les connecteurs. Les conditions préalables sont généralement des actions requises par l’utilisateur en dehors de l’application, par exemple l’exécution de requêtes dans la feuille de calcul, la réalisation de certaines configurations du côté du système source, etc.

En savoir plus sur les conditions préalables :

Le contenu de chaque condition préalable est extrait directement de la table interne (STATE.PREREQUISITES) du connecteur. Elles peuvent être personnalisées à l’aide du script setup.sql. Cependant, n’oubliez pas que le script setup.sql est exécuté à chaque installation, mise à niveau et rétrogradation de l’application. Les insertions doivent être idempotentes, c’est pourquoi il est recommandé d’utiliser une requête de fusion comme dans l’exemple ci-dessous :

MERGE INTO STATE.PREREQUISITES AS dest
USING (SELECT * FROM VALUES
           ('1',
            'Sample prerequisite',
            'Prerequisites can be used to notice the end user of the connector about external configurations. Read more in the SDK documentation below. This content can be modified inside `setup.sql` script',
            'https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/prerequisites',
            NULL,
            NULL,
            1
           )
) AS src (id, title, description, documentation_url, learnmore_url, guide_url, position)
ON dest.id = src.id
WHEN NOT MATCHED THEN
    INSERT (id, title, description, documentation_url, learnmore_url, guide_url, position)
    VALUES (src.id, src.title, src.description, src.documentation_url, src.learnmore_url, src.guide_url, src.position);
Copy

Étape de configuration du connecteur

L’étape suivante de la phase de l’assistant est celle de la configuration du connecteur. Au cours de cette étape, vous pouvez configurer les objets de la base de données et les autorisations requises par le connecteur. Cette étape permet de spécifier les propriétés de configuration suivantes :

  • warehouse

  • destination_database

  • destination_schema

  • operational_warehouse

  • global_schedule

  • data_owner_role

  • agent_username

  • agent_role

Si vous avez besoin d’autres propriétés personnalisées, elles peuvent être configurées dans l’une des étapes suivantes de la phase de l’assistant. Pour plus d’informations sur chacune des propriétés, voir :

En outre, le composant streamlit (streamlit/wizard/connector_config.py) fourni dans le modèle montre comment déclencher permissions-sdk et demande quelques autorisations à l’utilisateur final. Tant que les propriétés disponibles répondent aux besoins du connecteur, il n’est pas nécessaire d’écraser les classes du backend, bien que cela soit possible de la même manière que pour les composants dans les étapes suivantes de la configuration.

Pour plus d’informations sur les procédures internes et les objets Java, voir :

L’exemple fourni par Streamlit permet de demander des autorisations au niveau du compte comme create database et execute tasks. Il permet également à l’utilisateur de spécifier une référence d’entrepôt par le biais de la fenêtre contextuelle permissions-sdk.

Dans le modèle, il est demandé à l’utilisateur de ne fournir que destination_database et destination_schema. Cependant, un commentaire de TODO dans streamlit/wizard/connector_configuration.py contient du code commenté qui peut être réutilisé pour afficher plus de boîtes de saisie dans l’UI de Streamlit.

# TODO: Here you can add additional fields in connector configuration. Supported values are the following: warehouse, operational_warehouse, data_owner_role, agent_role, agent_username
# For example:
st.subheader("Operational warehouse")
input_col, _ = st.columns([2, 1])
with input_col:
    st.text_input("", key="operational_warehouse", label_visibility="collapsed")
st.caption("Name of the operational warehouse to be used")
Copy

Étape de configuration de connexion

L’étape suivante de la phase de l’assistant est celle de la configuration de la connexion. Cette étape permet à l’utilisateur final de configurer les paramètres de connectivité externe pour le connecteur. Cette configuration peut inclure des identificateurs d’objets tels que des secrets, des intégrations, etc. Comme cela varie en fonction du système source des données ingérées par le connecteur, c’est le premier endroit où des personnalisations plus importantes doivent être effectuées dans le code source.

Pour plus d’informations sur la configuration des connexions, voir :

En commençant par l’UI côté Streamlit (fichier streamlit/wizard/connection_config.py), vous devez ajouter des zones de texte pour tous les paramètres nécessaires. Un exemple de zone de texte est implémenté pour vous et si vous recherchez le code dans ce fichier, vous trouverez un TODO avec du code commenté pour un nouveau champ.

# TODO: Additional configuration properties can be added to the UI like this:
st.subheader("Additional connection parameter")
input_col, _ = st.columns([2, 1])
with input_col:
    st.text_input("", key="additional_connection_property", label_visibility="collapsed")
st.caption("Some description of the additional property")
Copy

Une fois les propriétés ajoutées au formulaire, elles doivent être transmises à la couche dorsale du connecteur. Pour ce faire, deux endroits supplémentaires doivent être modifiés dans les fichiers Streamlit. Le premier concerne la fonction finish_config dans le fichier streamlit/wizard/connection_config.py. L’état des zones de texte nouvellement ajoutées doit être lu ici. En outre, il peut être validé si nécessaire, puis transmis à la fonction set_connection_configuration. Par exemple, si additional_connection_property a été ajouté, il se présentera comme suit après les modifications :

def finish_config():
try:
    # TODO: If some additional properties were specified they need to be passed to the set_connection_configuration function.
    # The properties can also be validated, for example, check whether they are not blank strings etc.
    response = set_connection_configuration(
        custom_connection_property=st.session_state["custom_connection_property"],
        additional_connection_property=st.session_state["additional_connection_property"],
    )

# rest of the method without changes
Copy

Ensuite, la fonction set_connection_configuration doit être éditée, elle se trouve dans le fichier streamlit/native_sdk_api/connection_config.py. Cette fonction est un proxy entre l’UI de Streamlit et la procédure SQL sous-jacente, qui est un point d’entrée vers le backend du connecteur.

def set_connection_configuration(custom_connection_property: str, additional_connection_property: str):
    # TODO: this part of the code sends the config to the backend so all custom properties need to be added here
    config = {
        "custom_connection_property": escape_identifier(custom_connection_property),
        "additional_connection_property": escape_identifier(additional_connection_property),
    }

    return call_procedure(
        "PUBLIC.SET_CONNECTION_CONFIGURATION",
        [variant_argument(config)]
    )
Copy

Ensuite, la nouvelle propriété est enregistrée dans la table des connecteurs internes, qui contient la configuration. Mais les possibilités de personnalisation ne s’arrêtent pas là. Certains composants du backend peuvent également être personnalisés, recherchez les commentaires suivants dans le code pour les trouver :

  • TODO: IMPLEMENT ME connection configuration validate

  • TODO: IMPLEMENT ME connection callback

  • TODO: IMPLEMENT ME test connection

La partie « valider » permet d’effectuer toute validation supplémentaire sur les données reçues de l’UI. Elle peut également transformer les données, par exemple en les mettant en minuscules, en les triant ou en vérifiant que les objets portant les noms fournis existent réellement à l’intérieur de Snowflake.

Le rappel de connexion est une partie qui vous permet d’effectuer toute opération supplémentaire basée sur la configuration, par exemple les procédures d’altération qui doivent utiliser des intégrations d’accès externes.

Le test de connexion est un élément final de la configuration de la connexion, il vérifie si la connexion peut être établie entre le connecteur et le système source.

Pour plus d’informations sur ces composants internes, voir :

Voici des exemples de mises en œuvre possibles :

public class TemplateConfigurationInputValidator implements ConnectionConfigurationInputValidator {

    private static final String ERROR_CODE = "INVALID_CONNECTION_CONFIGURATION";

    @Override
    public ConnectorResponse validate(Variant config) {
      // TODO: IMPLEMENT ME connection configuration validate: If the connection configuration input
      // requires some additional validation this is the place to implement this logic.
      // See more in docs:
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
      var integrationCheck = checkParameter(config, INTEGRATION_PARAM, false);
      if (!integrationCheck.isOk()) {
        return integrationCheck;
      }

      var secretCheck = checkParameter(config, SECRET_PARAM, true);
      if (!secretCheck.isOk()) {
        return ConnectorResponse.error(ERROR_CODE);
      }

      return ConnectorResponse.success();
    }
}
Copy
public class TemplateConnectionConfigurationCallback implements ConnectionConfigurationCallback {

    private final Session session;

    public TemplateConnectionConfigurationCallback(Session session) {
      this.session = session;
    }

    @Override
    public ConnectorResponse execute(Variant config) {
      // TODO: If you need to alter some procedures with external access you can use
      // configureProcedure method or implement a similar method on your own.
      // TODO: IMPLEMENT ME connection callback: Implement the custom logic of changes in application
      // to be done after connection configuration, like altering procedures with external access.
      // See more in docs:
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
      configureProcedure(format("PUBLIC.TEST_CONNECTION()"), config);

      return ConnectorResponse.success();
    }
}
Copy
public class TemplateConnectionValidator {

    private static final String ERROR_CODE = "TEST_CONNECTION_FAILED";

    public static Variant testConnection(Session session) {
      // TODO: IMPLEMENT ME test connection: Implement the custom logic of testing the connection to
      // the source system here. This usually requires connection to some webservice or other external
      // system. It is suggested to perform only the basic connectivity validation here.
      // If that's the case then this procedure must be altered in TemplateConnectionConfigurationCallback first.
      // See more in docs:
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
      return test().toVariant();
    }

    private static ConnectorResponse test() {
      try {
        var response = SourceSystemHttpHelper.testEndpoint();

        if (isSuccessful(response.statusCode())) {
          return ConnectorResponse.success();
        } else {
          return ConnectorResponse.error(ERROR_CODE, "Connection to source system failed");
        }
      } catch (Exception exception) {
        return ConnectorResponse.error(ERROR_CODE, "Test connection failed");
      }
    }
}
Copy

Finaliser l’étape de configuration

L’étape de finalisation de la configuration du connecteur est la dernière étape de la phase de l’assistant. Cette étape comporte de multiples responsabilités. Tout d’abord, elle permet à l’utilisateur de spécifier toute configuration supplémentaire requise par le connecteur. Deuxièmement, elle crée une base de données, un schéma et, si nécessaire, des tables et des vues pour les données ingérées. Enfin, elle initialise les composants internes tels que la planification et Task Reactor.

Pour plus d’informations sur la finalisation de la configuration, veuillez consulter :

Pour plus d’informations sur Task Reactor et la planification, veuillez consulter :

Comme pour l’étape de configuration de la connexion, la personnalisation peut être lancée à l’aide de l’UI Streamlit. streamlit/wizard/finalize_config.py contient un formulaire avec un exemple de propriété. D’autres propriétés peuvent être ajoutées en fonction des besoins du connecteur. Pour ajouter une autre propriété, recherchez un commentaire TODO, qui contient un exemple de code pour l’ajout d’une nouvelle propriété dans le fichier mentionné.

# TODO: Here you can add additional fields in finalize connector configuration.
# For example:
st.subheader("Some additional property")
input_col, _ = st.columns([2, 1])
with input_col:
    st.text_input("", key="some_additional_property", label_visibility="collapsed")
st.caption("Description of some new additional property")
Copy

Après avoir ajouté la zone de texte pour une nouvelle propriété, il faut la transmettre au backend. Pour ce faire, modifiez la fonction finalize_configuration dans le même fichier :

def finalize_configuration():
    try:
        st.session_state["show_main_error"] = False
        # TODO: If some additional properties were introduced, they need to be passed to the finalize_connector_configuration function.
        response = finalize_connector_configuration(
            st.session_state.get("custom_property"),
            st.session_state.get("some_additional_property")
        )
Copy

Ensuite, ouvrez streamlit/native_sdk_api/finalize_config.py et ajoutez-le à la fonction suivante :

def finalize_connector_configuration(custom_property: str, some_additional_property: str):
    # TODO: If some custom properties were configured, then they need to be specified here and passed to the FINALIZE_CONNECTOR_CONFIGURATION procedure.
    config = {
        "custom_property": custom_property,
        "some_additional_property": some_additional_property,
    }
    return call_procedure(
        "PUBLIC.FINALIZE_CONNECTOR_CONFIGURATION",
        [variant_argument(config)]
    )
Copy

Comme pour l’étape de configuration de la connexion, cette étape permet également de personnaliser divers composants du backend, qui peuvent être trouvés en utilisant les phrases suivantes dans le code :

  • TODO: IMPLEMENT ME validate source

  • TODO: IMPLEMENT ME finalize internal

La partie « validate source » est chargée d’effectuer des validations plus sophistiquées sur les systèmes sources. Si le test de connexion précédent a seulement vérifié qu’une connexion peut être établie, la validation de la source pourrait vérifier l’accès à des données spécifiques dans le système, par exemple en extrayant un seul enregistrement de données.

« finalize internal » est une procédure interne chargée d’initialiser Task Reactor et le planificateur, de créer une base de données réceptrice et les objets imbriqués nécessaires. Elle peut également être utilisée pour enregistrer la configuration fournie lors de l’étape de finalisation (cette configuration n’est pas enregistrée par défaut).

Vous trouverez de plus amples informations sur les composants internes dans :

En outre, l’entrée peut être validée à l’aide de l’interface FinalizeConnectorInputValidator et fournie au gestionnaire (handler) de finalisation (voir TemplateFinalizeConnectorConfigurationCustomHandler). Vous trouverez de plus amples informations sur l’utilisation des constructeurs dans la rubrique :

Voici un exemple de mise en œuvre de « validate source » :

public class SourceSystemAccessValidator implements SourceValidator {

    @Override
    public ConnectorResponse validate(Variant variant) {
      // TODO: IMPLEMENT ME validate source: Implement the custom logic of validating the source
      // system. In some cases this can be the same validation that happened in
      // TemplateConnectionValidator.
      // However, it is suggested to perform more complex validations, like specific access rights to
      // some specific resources here.
      // See more in docs:
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/finalize_configuration_reference
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/finalize_configuration
      var finalizeProperties = Configuration.fromCustomConfig(variant);

      var httpResponse = SourceSystemHttpHelper.validateSource(finalizeProperties.get("custom_property"));
      return prepareConnectorResponse(httpResponse.statusCode());
    }

    private ConnectorResponse prepareConnectorResponse(int statusCode) {
      switch (statusCode) {
        case 200:
          return ConnectorResponse.success();
        case 401:
          return ConnectorResponse.error("Unauthorized error");
        case 404:
          return ConnectorResponse.error("Not found error");
        default:
          return ConnectorResponse.error("Unknown error");
      }
    }
}
Copy

Créer des ressources

Une fois la phase de l’assistant terminée, le connecteur est prêt à commencer à ingérer des données. Mais d’abord, les ressources doivent être mises en œuvre et configurées. Une ressource est une abstraction décrivant un ensemble spécifique de données dans le système source, par exemple une table, un point de terminaison, un fichier, etc.

Divers systèmes sources peuvent avoir besoin d’informations variées sur une ressource, c’est pourquoi la définition d’une ressource doit être personnalisée en fonction de besoins spécifiques. Pour ce faire, accédez au fichier streamlit/daily_use/data_sync_page.py. Vous y trouverez un TODO sur l’ajout de zones de texte pour les paramètres des ressources. Les paramètres des ressources doivent permettre l’identification et la récupération des données du système source. Ces paramètres peuvent ensuite être extraits lors de l’ingestion :

# TODO: specify all the properties needed to define a resource in the source system. A subset of those properties should allow for a identification of a single resource, be it a table, endpoint, repository or some other data storage abstraction
st.text_input(
    "Resource name",
    key="resource_name",
)
st.text_input(
    "Some resource parameter",
    key="some_resource_parameter"
)
Copy

Une fois que toutes les propriétés nécessaires ont été ajoutées au formulaire, elles peuvent être transmises au backend. Tout d’abord, l’état des champs de texte doit être extrait et transmis à la méthode queue_resource au niveau de l’API dans streamlit/daily_use/data_sync_page.py :

def queue_resource():
    # TODO: add additional properties here and pass them to create_resource function
    resource_name = st.session_state.get("resource_name")
    some_resource_parameter = st.session_state.get("some_resource_parameter)

    if not resource_name:
        st.error("Resource name cannot be empty")
        return

    result = create_resource(resource_name, some_resource_parameter)
    if result.is_ok():
        st.success("Resource created")
    else:
        st.error(result.get_message())
Copy

Ensuite, la fonction create_resource de streamlit/native_sdk_api/resource_management.py doit être mise à jour :

def create_resource(resource_name, some_resource_parameter):
    ingestion_config = [{
        "id": "ingestionConfig",
        "ingestionStrategy": "INCREMENTAL",
        # TODO: HINT: scheduleType and scheduleDefinition are currently not supported out of the box, due to globalSchedule being used. However, a custom implementation of the scheduler can use those fields. They need to be provided becuase they are mandatory in the resourceDefinition.
        "scheduleType": "INTERVAL",
        "scheduleDefinition": "60m"
    }]
    # TODO: HINT: resource_id should allow identification of a table, endpoint etc. in the source system. It should be unique.
    resource_id = {
        "resource_name": resource_name,
    }
    id = f"{resource_name}_{random_suffix()}"

    # TODO: if you specified some additional resource parameters then you need to put them inside resource metadata:
    resource_metadata = {
        "some_resource_parameter": some_resource_parameter
    }

    return call_procedure("PUBLIC.CREATE_RESOURCE",
                          [
                              varchar_argument(id),
                              variant_argument(resource_id),
                              variant_list_argument(ingestion_config),
                              varchar_argument(id),
                              "true",
                              variant_argument(resource_metadata)
                          ])
Copy

Personnalisation de la logique de procédure CREATE_RESOURCE()

La procédure PUBLIC.CREATE_RESOURCE() permet au développeur de personnaliser son exécution en implémentant une logique propre qui est branchée à plusieurs endroits du flux d’exécution principal. Le SDK permet au développeur les opérations suivantes :

  1. Validez la ressource avant sa création. La logique doit être implémentée dans la procédure PUBLIC.CREATE_RESOURCE_VALIDATE().

  2. Effectuez certaines opérations personnalisées avant la création de la ressource. La logique doit être implémentée dans la procédure PUBLIC.PRE_CREATE_RESOURCE().

  3. Effectuez certaines opérations personnalisées après la création de la ressource. La logique doit être implémentée dans la procédure PUBLIC.POST_CREATE_RESOURCE().

Plus d’informations sur la personnalisation de la procédure PUBLIC.CREATE_RESOURCE() sont disponibles ici :

TemplateCreateResourceHandler.java

Cette classe est un gestionnaire pour la procédure PUBLIC.CREATE_RESOURCE(). Ici, vous pouvez injecter les implémentations Java des gestionnaires pour les procédures de rappel mentionnées précédemment. Par défaut, le modèle fournit des implémentations Java simulées de gestionnaires de rappel afin de se débarrasser des appels aux procédures SQL qui prolongent le temps d’exécution de la procédure entière. Les implémentations Java rendent l’exécution plus rapide. Ces implémentations simulées ne font rien d’autre que renvoyer une réponse de réussite. Vous pouvez soit fournir l’implémentation personnalisée aux classes de rappel préparées par le modèle, soit créer ces rappels à partir de zéro et les injecter dans le flux d’exécution de la procédure principale dans le générateur de gestionnaires.

Afin d’implémenter la logique personnalisée pour les méthodes de rappel appelées par défaut, recherchez les phrases suivantes dans le code :

  • TODO: IMPLEMENT ME create resource validate

  • TODO: IMPLEMENT ME pre create resource callback

  • TODO: IMPLEMENT ME post create resource callback

Ingestion

Pour réaliser l’ingestion de données, vous devez mettre en œuvre une classe qui gérera la connexion avec le système source et récupérera les données, en fonction de la configuration de la ressource. Les modules Planificateur et Task Reactor se chargeront du déclenchement et de la mise en file d’attente des tâches d’ingestion.

La logique d’ingestion est appelée à partir de la classe TemplateIngestion, recherchez TODO: IMPLEMENT ME ingestion dans le code et remplacez la génération aléatoire de données par la récupération de données à partir du système source. Si vous avez ajouté des propriétés personnalisées à la définition de la ressource, elles peuvent être extraites des tables des connecteurs internes à l’aide de ResourceIngestionDefinitionRepository et des propriétés disponibles sur TemplateWorkItem :

  • resourceIngestionDefinitionId

  • ingestionConfigurationId

Par exemple, la récupération de données à partir d’un service Web peut ressembler à ceci :

public final class SourceSystemHttpHelper {

  private static final String DATA_URL = "https://source_system.com/data/%s";
  private static final SourceSystemHttpClient sourceSystemClient = new SourceSystemHttpClient();
  private static final ObjectMapper objectMapper = new ObjectMapper();

  private static List<Variant> fetchData(String resourceId) {
    var response = sourceSystemClient.get(String.format(url, resourceId));
    var body = response.body();

    try {
        return Arrays.stream(objectMapper.readValue(body, Map[].class))
              .map(Variant::new)
              .collect(Collectors.toList());
    } catch (JsonProcessingException e) {
      throw new RuntimeException("Cannot parse json", e);
    }
  }
}
Copy
public class SourceSystemHttpClient {

  private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(15);

  private final HttpClient client;
  private final String secret;

  public SourceSystemHttpClient() {
    this.client = HttpClient.newHttpClient();
    this.secret =
        SnowflakeSecrets.newInstance()
            .getGenericSecretString(ConnectionConfiguration.TOKEN_NAME);
  }

  public HttpResponse<String> get(String url) {
    var request =
        HttpRequest.newBuilder()
            .uri(URI.create(url))
            .GET()
            .header("Authorization", format("Bearer %s", secret))
            .header("Content-Type", "application/json")
            .timeout(REQUEST_TIMEOUT)
            .build();

    try {
      return client.send(request, HttpResponse.BodyHandlers.ofString());
    } catch (IOException | InterruptedException ex) {
      throw new RuntimeException(format("HttpRequest failed: %s", ex.getMessage()), ex);
    }
  }
}
Copy

Gérer le cycle de vie des ressources

Une fois la logique de création des ressources et leur ingestion implémentée, vous pouvez gérer leur cycle de vie par les procédures suivantes :

  1. PUBLIC.ENABLE_RESOURCE() - cette procédure active une ressource particulière, ce qui signifie qu’elle sera planifiée pour l’ingestion

  2. PUBLIC.DISABLE_RESOURCE() - cette procédure désactive une ressource particulière, ce qui signifie que sa planification d’ingestion sera arrêtée

  3. PUBLIC.UPDATE_RESOURCE() - cette procédure permet de mettre à jour les configurations d’ingestion d’une ressource particulière. Elle n’est pas implémentée dans l’UI Streamlit par défaut car cela peut parfois être indésirable pour le développeur de permettre à l’utilisateur du connecteur de personnaliser la configuration d’ingestion (révoquer les autorisations sur cette procédure du rôle d’application ACCOUNTADMIN afin d’interdire complètement son utilisation).

Toutes ces procédures disposent de gestionnaires Java et sont étendues avec des rappels qui permettent de personnaliser leur exécution. Vous pouvez injecter des implémentations personnalisées de rappels à l’aide du générateur de ces gestionnaires. Par défaut, le modèle fournit des implémentations Java simulées de gestionnaires de rappel afin de se débarrasser des appels vers les procédures SQL qui prolongent la durée totale d’exécution des procédures mentionnées. Ces implémentations simulées ne font rien d’autre que renvoyer une réponse de réussite. Vous pouvez soit fournir l’implémentation personnalisée aux classes de rappel préparées par le modèle, soit créer ces rappels à partir de zéro et les injecter dans le flux d’exécution de la procédure principale dans les générateurs de gestionnaires.

TemplateEnableResourceHandler.java

Cette classe est un gestionnaire pour la procédure PUBLIC.ENABLE_RESOURCE(), qui peut être étendue avec les rappels dédiés à :

  1. Validez la ressource avant qu’elle ne soit activée. Recherchez l’expression TODO: IMPLEMENT ME enable resource validate dans le code afin de fournir l’implémentation personnalisée.

  2. Effectuez certaines opérations personnalisées avant que la ressource ne soit activée. Recherchez l’expression TODO: IMPLEMENT ME pre enable resource dans le code afin de fournir l’implémentation personnalisée.

  3. Effectuez certaines opérations personnalisées une fois la ressource activée. Recherchez l’expression TODO: IMPLEMENT ME post enable resource dans le code afin de fournir l’implémentation personnalisée.

Apprenez-en avec les documentations détaillées sur la procédure PUBLIC.ENABLE_RESOURCE() :

TemplateDisableResourceHandler.java

Cette classe est un gestionnaire pour la procédure PUBLIC.DISABLE_RESOURCE(), qui peut être étendue avec les rappels dédiés à :

  1. Validez la ressource avant qu’elle ne soit désactivée. Recherchez l’expression TODO: IMPLEMENT ME disable resource validate dans le code afin de fournir l’implémentation personnalisée.

  2. Effectuez certaines opérations personnalisées avant que la ressource ne soit désactivée. Recherchez l’expression TODO: IMPLEMENT ME pre disable resource dans le code afin de fournir l’implémentation personnalisée.

Apprenez-en avec les documentations détaillées sur la procédure PUBLIC.DISABLE_RESOURCE() :

TemplateUpdateResourceHandler.java

Cette classe est un gestionnaire pour la procédure PUBLIC.UPDATE_RESOURCE(), qui peut être étendue avec les rappels dédiés à :

  1. Validez la ressource avant sa mise à jour. Recherchez l’expression TODO: IMPLEMENT ME update resource validate dans le code afin de fournir l’implémentation personnalisée.

  2. Effectuez certaines opérations personnalisées avant que la ressource ne soit mise à jour. Recherchez l’expression TODO: IMPLEMENT ME pre update resource dans le code afin de fournir l’implémentation personnalisée.

  3. Effectuez certaines opérations personnalisées une fois la ressource mise à jour. Recherchez l’expression TODO: IMPLEMENT ME post update resource dans le code afin de fournir l’implémentation personnalisée.

Apprenez-en avec les documentations détaillées sur la procédure PUBLIC.UPDATE_RESOURCE() :

Paramètres

Le modèle contient un onglet « Paramètres » qui vous permet de voir toutes les configurations effectuées auparavant. Toutefois, si les propriétés de configuration ont été personnalisées, cette vue doit également l’être. Le code de l’onglet Paramètres se trouve dans le fichier streamlit/daily_use/settings_page.py. Pour le personnaliser, il suffit d’extraire les valeurs de la configuration pour les clés qui ont été ajoutées dans les configurations respectives.

Par exemple, si la propriété antérieure additional_connection_property a été ajoutée plus tôt dans l’étape de configuration de la connexion, elle pourrait être ajoutée comme suit :

def connection_config_page():
    current_config = get_connection_configuration()

    # TODO: implement the display for all the custom properties defined in the connection configuration step
    custom_property = current_config.get("custom_connection_property", "")
    additional_connection_property = current_config.get("additional_connection_property", "")


    st.header("Connector configuration")
    st.caption("Here you can see the connector connection configuration saved during the connection configuration step "
               "of the Wizard. If some new property was introduced it has to be added here to display.")
    st.divider()

    st.text_input(
        "Custom connection property:",
        value=custom_property,
        disabled=True
    )
    st.text_input(
        "Additional connection property:",
        value=additional_connection_property,
        disabled=True
    )
    st.divider()
Copy