Tutoriel : SDK natif pour les modèles Java pour les connecteurs¶
Introduction¶
Bienvenue dans notre tutoriel sur l’utilisation d’un modèle de connecteur à l’aide de Snowflake Native SDK for Connectors. Ce guide vous aidera à configurer une application native Connector simple.
Dans ce tutoriel, vous apprendrez à :
Déployer une application native Connector
Configurer un modèle de connecteur pour ingérer des données
Personnaliser un modèle de connecteur en fonction de vos propres besoins
Le modèle contient divers commentaires utiles dans le code pour vous permettre de trouver plus facilement les fichiers spécifiques qui doivent être modifiés. Cherchez les commentaires avec les mots-clés suivants, ils vous guideront et aideront à mettre en œuvre votre propre connecteur :
TODO
TODO: HINT
TODO: IMPLEMENT ME
Avant de commencer ce tutoriel, vous devez vous préparer en consultant le contenu recommandé ci-dessous :
Conditions préalables¶
Avant de commencer, assurez-vous que vous remplissez les conditions suivantes :
Java 11 installé
Accès au compte Snowflake avec le rôle
ACCOUNTADMIN
Outil SnowSQL (CLI client) avec
variable_substitution
etexit_on_error
configurés sur votre machine localeVous avez consulté cette page de documentation : SDK natif Snowflake pour les connecteurs et vous la gardez ouverte en ligne ou imprimée à partir de votre navigateur. Vous avez consulté ce quickstart : SDK Connector Native Java (facultatif, mais recommandé). Le quickstart utilise un exemple de connecteur basé sur un modèle et il peut être référencé pour vérifier des exemples d’implémentation de divers composants.
Initialisation et déploiement¶
Pour initialiser un projet, clonez le référentiel Native SDK for Connectors à partir de GitHub et copiez le répertoire /templates/native-sdk-connectors-java-template
à l’emplacement souhaité pour le projet. Ce modèle contient tout le code nécessaire pour déployer une application native Connector fonctionnelle. Une fois cette opération effectuée, le modèle est prêt à être déployé.
Déploiement¶
Le modèle est prêt à être déployé et fournit un script de commodité qui gère l’ensemble du processus pour vous. Avant de déployer le connecteur, une connexion snowsql
doit être spécifiée. Pour ce faire, ouvrez Makefile
et indiquez le nom de la connexion dans la variable d’environnement CONNECTION
.
Pour déployer rapidement l’application, allez dans le répertoire principal du modèle et exécutez la commande suivante :
make reinstall_application_from_version_dir
Cette commande permet d’effectuer les opérations suivantes :
Supprime les comptes
APPLICATION
etAPPLICATION PACKAGE
qui existaient précédemment dans le compte Snowflake.Copie le fichier jar du SDK et les fichiers sql extraits du jar dans le répertoire cible de
sf_build
.Copie les composants streamlit et java personnalisés de l’application dans le répertoire
sf_build
.Crée un nouveau
APPLICATION PACKAGE
à partir des fichiers du répertoiresf_build
à l’intérieur d’un compte Snowflake.Crée une nouvelle instance
APPLICATION
à l’intérieur d’un compte Snowflake.
Ce processus prend environ 2 à 3 minutes. Une fois qu’il est terminé, naviguez vers l’onglet Data Products
-> Apps
à l’intérieur de Snowflake, votre connecteur devrait y être visible. Si vous avez beaucoup d’applications et que vous avez du mal à les trouver, essayez de taper NATIVE_SDK_CONNECTOR_TEMPLATE
dans la barre de recherche ou, dans le cas d’un nom personnalisé d”APPLICATION
, utilisez le nom personnalisé à la place. Ce connecteur est prêt à être configuré. Les étapes suivantes vous guident tout au long du processus et vous expliquent comment personnaliser chacune d’entre elles.
Si vous avez besoin de redéployer votre connecteur pendant l’une des étapes de ce tutoriel, par exemple pour tester vos modifications, il vous suffit de réexécuter la commande ci-dessus.
Étape préalable¶
Juste après le déploiement, le connecteur est dans sa phase d’assistant. Cette phase consiste en quelques étapes qui guident l’utilisateur final à travers toutes les configurations nécessaires. La première étape est celle des conditions préalables. Elle est facultative et peut ne pas être nécessaire pour tous les connecteurs. Les conditions préalables sont généralement des actions requises par l’utilisateur en dehors de l’application, par exemple l’exécution de requêtes dans la feuille de calcul, la réalisation de certaines configurations du côté du système source, etc.
En savoir plus sur les conditions préalables :
Le contenu de chaque condition préalable est extrait directement de la table interne (STATE.PREREQUISITES
) du connecteur. Elles peuvent être personnalisées à l’aide du script setup.sql
. Cependant, n’oubliez pas que le script setup.sql
est exécuté à chaque installation, mise à niveau et rétrogradation de l’application. Les insertions doivent être idempotentes, c’est pourquoi il est recommandé d’utiliser une requête de fusion comme dans l’exemple ci-dessous :
MERGE INTO STATE.PREREQUISITES AS dest
USING (SELECT * FROM VALUES
('1',
'Sample prerequisite',
'Prerequisites can be used to notice the end user of the connector about external configurations. Read more in the SDK documentation below. This content can be modified inside `setup.sql` script',
'https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/prerequisites',
NULL,
NULL,
1
)
) AS src (id, title, description, documentation_url, learnmore_url, guide_url, position)
ON dest.id = src.id
WHEN NOT MATCHED THEN
INSERT (id, title, description, documentation_url, learnmore_url, guide_url, position)
VALUES (src.id, src.title, src.description, src.documentation_url, src.learnmore_url, src.guide_url, src.position);
Étape de configuration du connecteur¶
L’étape suivante de la phase de l’assistant est celle de la configuration du connecteur. Au cours de cette étape, vous pouvez configurer les objets de la base de données et les autorisations requises par le connecteur. Cette étape permet de spécifier les propriétés de configuration suivantes :
warehouse
destination_database
destination_schema
operational_warehouse
global_schedule
data_owner_role
agent_username
agent_role
Si vous avez besoin d’autres propriétés personnalisées, elles peuvent être configurées dans l’une des étapes suivantes de la phase de l’assistant. Pour plus d’informations sur chacune des propriétés, voir :
En outre, le composant streamlit (streamlit/wizard/connector_config.py
) fourni dans le modèle montre comment déclencher permissions-sdk
et demande quelques autorisations à l’utilisateur final. Tant que les propriétés disponibles répondent aux besoins du connecteur, il n’est pas nécessaire d’écraser les classes du backend, bien que cela soit possible de la même manière que pour les composants dans les étapes suivantes de la configuration.
Pour plus d’informations sur les procédures internes et les objets Java, voir :
L’exemple fourni par Streamlit permet de demander des autorisations au niveau du compte comme create database
et execute tasks
. Il permet également à l’utilisateur de spécifier une référence d’entrepôt par le biais de la fenêtre contextuelle permissions-sdk
.
Dans le modèle, il est demandé à l’utilisateur de ne fournir que destination_database
et destination_schema
. Cependant, un commentaire de TODO
dans streamlit/wizard/connector_configuration.py
contient du code commenté qui peut être réutilisé pour afficher plus de boîtes de saisie dans l’UI de Streamlit.
# TODO: Here you can add additional fields in connector configuration. Supported values are the following: warehouse, operational_warehouse, data_owner_role, agent_role, agent_username
# For example:
st.subheader("Operational warehouse")
input_col, _ = st.columns([2, 1])
with input_col:
st.text_input("", key="operational_warehouse", label_visibility="collapsed")
st.caption("Name of the operational warehouse to be used")
Étape de configuration de connexion¶
L’étape suivante de la phase de l’assistant est celle de la configuration de la connexion. Cette étape permet à l’utilisateur final de configurer les paramètres de connectivité externe pour le connecteur. Cette configuration peut inclure des identificateurs d’objets tels que des secrets, des intégrations, etc. Comme cela varie en fonction du système source des données ingérées par le connecteur, c’est le premier endroit où des personnalisations plus importantes doivent être effectuées dans le code source.
Pour plus d’informations sur la configuration des connexions, voir :
En commençant par l’UI côté Streamlit (fichier streamlit/wizard/connection_config.py
), vous devez ajouter des zones de texte pour tous les paramètres nécessaires. Un exemple de zone de texte est implémenté pour vous et si vous recherchez le code dans ce fichier, vous trouverez un TODO
avec du code commenté pour un nouveau champ.
# TODO: Additional configuration properties can be added to the UI like this:
st.subheader("Additional connection parameter")
input_col, _ = st.columns([2, 1])
with input_col:
st.text_input("", key="additional_connection_property", label_visibility="collapsed")
st.caption("Some description of the additional property")
Une fois les propriétés ajoutées au formulaire, elles doivent être transmises à la couche dorsale du connecteur. Pour ce faire, deux endroits supplémentaires doivent être modifiés dans les fichiers Streamlit. Le premier concerne la fonction finish_config
dans le fichier streamlit/wizard/connection_config.py
. L’état des zones de texte nouvellement ajoutées doit être lu ici. En outre, il peut être validé si nécessaire, puis transmis à la fonction set_connection_configuration
. Par exemple, si additional_connection_property
a été ajouté, il se présentera comme suit après les modifications :
def finish_config():
try:
# TODO: If some additional properties were specified they need to be passed to the set_connection_configuration function.
# The properties can also be validated, for example, check whether they are not blank strings etc.
response = set_connection_configuration(
custom_connection_property=st.session_state["custom_connection_property"],
additional_connection_property=st.session_state["additional_connection_property"],
)
# rest of the method without changes
Ensuite, la fonction set_connection_configuration
doit être éditée, elle se trouve dans le fichier streamlit/native_sdk_api/connection_config.py
. Cette fonction est un proxy entre l’UI de Streamlit et la procédure SQL sous-jacente, qui est un point d’entrée vers le backend du connecteur.
def set_connection_configuration(custom_connection_property: str, additional_connection_property: str):
# TODO: this part of the code sends the config to the backend so all custom properties need to be added here
config = {
"custom_connection_property": escape_identifier(custom_connection_property),
"additional_connection_property": escape_identifier(additional_connection_property),
}
return call_procedure(
"PUBLIC.SET_CONNECTION_CONFIGURATION",
[variant_argument(config)]
)
Ensuite, la nouvelle propriété est enregistrée dans la table des connecteurs internes, qui contient la configuration. Mais les possibilités de personnalisation ne s’arrêtent pas là. Certains composants du backend peuvent également être personnalisés, recherchez les commentaires suivants dans le code pour les trouver :
TODO: IMPLEMENT ME connection configuration validate
TODO: IMPLEMENT ME connection callback
TODO: IMPLEMENT ME test connection
La partie « valider » permet d’effectuer toute validation supplémentaire sur les données reçues de l’UI. Elle peut également transformer les données, par exemple en les mettant en minuscules, en les triant ou en vérifiant que les objets portant les noms fournis existent réellement à l’intérieur de Snowflake.
Le rappel de connexion est une partie qui vous permet d’effectuer toute opération supplémentaire basée sur la configuration, par exemple les procédures d’altération qui doivent utiliser des intégrations d’accès externes.
Le test de connexion est un élément final de la configuration de la connexion, il vérifie si la connexion peut être établie entre le connecteur et le système source.
Pour plus d’informations sur ces composants internes, voir :
Voici des exemples de mises en œuvre possibles :
public class TemplateConfigurationInputValidator implements ConnectionConfigurationInputValidator {
private static final String ERROR_CODE = "INVALID_CONNECTION_CONFIGURATION";
@Override
public ConnectorResponse validate(Variant config) {
// TODO: IMPLEMENT ME connection configuration validate: If the connection configuration input
// requires some additional validation this is the place to implement this logic.
// See more in docs:
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
var integrationCheck = checkParameter(config, INTEGRATION_PARAM, false);
if (!integrationCheck.isOk()) {
return integrationCheck;
}
var secretCheck = checkParameter(config, SECRET_PARAM, true);
if (!secretCheck.isOk()) {
return ConnectorResponse.error(ERROR_CODE);
}
return ConnectorResponse.success();
}
}
public class TemplateConnectionConfigurationCallback implements ConnectionConfigurationCallback {
private final Session session;
public TemplateConnectionConfigurationCallback(Session session) {
this.session = session;
}
@Override
public ConnectorResponse execute(Variant config) {
// TODO: If you need to alter some procedures with external access you can use
// configureProcedure method or implement a similar method on your own.
// TODO: IMPLEMENT ME connection callback: Implement the custom logic of changes in application
// to be done after connection configuration, like altering procedures with external access.
// See more in docs:
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
configureProcedure(format("PUBLIC.TEST_CONNECTION()"), config);
return ConnectorResponse.success();
}
}
public class TemplateConnectionValidator {
private static final String ERROR_CODE = "TEST_CONNECTION_FAILED";
public static Variant testConnection(Session session) {
// TODO: IMPLEMENT ME test connection: Implement the custom logic of testing the connection to
// the source system here. This usually requires connection to some webservice or other external
// system. It is suggested to perform only the basic connectivity validation here.
// If that's the case then this procedure must be altered in TemplateConnectionConfigurationCallback first.
// See more in docs:
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
return test().toVariant();
}
private static ConnectorResponse test() {
try {
var response = SourceSystemHttpHelper.testEndpoint();
if (isSuccessful(response.statusCode())) {
return ConnectorResponse.success();
} else {
return ConnectorResponse.error(ERROR_CODE, "Connection to source system failed");
}
} catch (Exception exception) {
return ConnectorResponse.error(ERROR_CODE, "Test connection failed");
}
}
}
Finaliser l’étape de configuration¶
L’étape de finalisation de la configuration du connecteur est la dernière étape de la phase de l’assistant. Cette étape comporte de multiples responsabilités. Tout d’abord, elle permet à l’utilisateur de spécifier toute configuration supplémentaire requise par le connecteur. Deuxièmement, elle crée une base de données, un schéma et, si nécessaire, des tables et des vues pour les données ingérées. Enfin, elle initialise les composants internes tels que la planification et Task Reactor.
Pour plus d’informations sur la finalisation de la configuration, veuillez consulter :
Pour plus d’informations sur Task Reactor et la planification, veuillez consulter :
Comme pour l’étape de configuration de la connexion, la personnalisation peut être lancée à l’aide de l’UI Streamlit. streamlit/wizard/finalize_config.py
contient un formulaire avec un exemple de propriété. D’autres propriétés peuvent être ajoutées en fonction des besoins du connecteur. Pour ajouter une autre propriété, recherchez un commentaire TODO
, qui contient un exemple de code pour l’ajout d’une nouvelle propriété dans le fichier mentionné.
# TODO: Here you can add additional fields in finalize connector configuration.
# For example:
st.subheader("Some additional property")
input_col, _ = st.columns([2, 1])
with input_col:
st.text_input("", key="some_additional_property", label_visibility="collapsed")
st.caption("Description of some new additional property")
Après avoir ajouté la zone de texte pour une nouvelle propriété, il faut la transmettre au backend. Pour ce faire, modifiez la fonction finalize_configuration
dans le même fichier :
def finalize_configuration():
try:
st.session_state["show_main_error"] = False
# TODO: If some additional properties were introduced, they need to be passed to the finalize_connector_configuration function.
response = finalize_connector_configuration(
st.session_state.get("custom_property"),
st.session_state.get("some_additional_property")
)
Ensuite, ouvrez streamlit/native_sdk_api/finalize_config.py
et ajoutez-le à la fonction suivante :
def finalize_connector_configuration(custom_property: str, some_additional_property: str):
# TODO: If some custom properties were configured, then they need to be specified here and passed to the FINALIZE_CONNECTOR_CONFIGURATION procedure.
config = {
"custom_property": custom_property,
"some_additional_property": some_additional_property,
}
return call_procedure(
"PUBLIC.FINALIZE_CONNECTOR_CONFIGURATION",
[variant_argument(config)]
)
Comme pour l’étape de configuration de la connexion, cette étape permet également de personnaliser divers composants du backend, qui peuvent être trouvés en utilisant les phrases suivantes dans le code :
TODO: IMPLEMENT ME validate source
TODO: IMPLEMENT ME finalize internal
La partie « validate source » est chargée d’effectuer des validations plus sophistiquées sur les systèmes sources. Si le test de connexion précédent a seulement vérifié qu’une connexion peut être établie, la validation de la source pourrait vérifier l’accès à des données spécifiques dans le système, par exemple en extrayant un seul enregistrement de données.
« finalize internal » est une procédure interne chargée d’initialiser Task Reactor et le planificateur, de créer une base de données réceptrice et les objets imbriqués nécessaires. Elle peut également être utilisée pour enregistrer la configuration fournie lors de l’étape de finalisation (cette configuration n’est pas enregistrée par défaut).
Vous trouverez de plus amples informations sur les composants internes dans :
En outre, l’entrée peut être validée à l’aide de l’interface FinalizeConnectorInputValidator
et fournie au gestionnaire (handler) de finalisation (voir TemplateFinalizeConnectorConfigurationCustomHandler
). Vous trouverez de plus amples informations sur l’utilisation des constructeurs dans la rubrique :
Voici un exemple de mise en œuvre de « validate source » :
public class SourceSystemAccessValidator implements SourceValidator {
@Override
public ConnectorResponse validate(Variant variant) {
// TODO: IMPLEMENT ME validate source: Implement the custom logic of validating the source
// system. In some cases this can be the same validation that happened in
// TemplateConnectionValidator.
// However, it is suggested to perform more complex validations, like specific access rights to
// some specific resources here.
// See more in docs:
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/finalize_configuration_reference
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/finalize_configuration
var finalizeProperties = Configuration.fromCustomConfig(variant);
var httpResponse = SourceSystemHttpHelper.validateSource(finalizeProperties.get("custom_property"));
return prepareConnectorResponse(httpResponse.statusCode());
}
private ConnectorResponse prepareConnectorResponse(int statusCode) {
switch (statusCode) {
case 200:
return ConnectorResponse.success();
case 401:
return ConnectorResponse.error("Unauthorized error");
case 404:
return ConnectorResponse.error("Not found error");
default:
return ConnectorResponse.error("Unknown error");
}
}
}
Créer des ressources¶
Une fois la phase de l’assistant terminée, le connecteur est prêt à commencer à ingérer des données. Mais d’abord, les ressources doivent être mises en œuvre et configurées. Une ressource est une abstraction décrivant un ensemble spécifique de données dans le système source, par exemple une table, un point de terminaison, un fichier, etc.
Divers systèmes sources peuvent avoir besoin d’informations variées sur une ressource, c’est pourquoi la définition d’une ressource doit être personnalisée en fonction de besoins spécifiques. Pour ce faire, accédez au fichier streamlit/daily_use/data_sync_page.py
. Vous y trouverez un TODO
sur l’ajout de zones de texte pour les paramètres des ressources. Les paramètres des ressources doivent permettre l’identification et la récupération des données du système source. Ces paramètres peuvent ensuite être extraits lors de l’ingestion :
# TODO: specify all the properties needed to define a resource in the source system. A subset of those properties should allow for a identification of a single resource, be it a table, endpoint, repository or some other data storage abstraction
st.text_input(
"Resource name",
key="resource_name",
)
st.text_input(
"Some resource parameter",
key="some_resource_parameter"
)
Une fois que toutes les propriétés nécessaires ont été ajoutées au formulaire, elles peuvent être transmises au backend. Tout d’abord, l’état des champs de texte doit être extrait et transmis à la méthode queue_resource
au niveau de l’API dans streamlit/daily_use/data_sync_page.py
:
def queue_resource():
# TODO: add additional properties here and pass them to create_resource function
resource_name = st.session_state.get("resource_name")
some_resource_parameter = st.session_state.get("some_resource_parameter)
if not resource_name:
st.error("Resource name cannot be empty")
return
result = create_resource(resource_name, some_resource_parameter)
if result.is_ok():
st.success("Resource created")
else:
st.error(result.get_message())
Ensuite, la fonction create_resource
de streamlit/native_sdk_api/resource_management.py
doit être mise à jour :
def create_resource(resource_name, some_resource_parameter):
ingestion_config = [{
"id": "ingestionConfig",
"ingestionStrategy": "INCREMENTAL",
# TODO: HINT: scheduleType and scheduleDefinition are currently not supported out of the box, due to globalSchedule being used. However, a custom implementation of the scheduler can use those fields. They need to be provided becuase they are mandatory in the resourceDefinition.
"scheduleType": "INTERVAL",
"scheduleDefinition": "60m"
}]
# TODO: HINT: resource_id should allow identification of a table, endpoint etc. in the source system. It should be unique.
resource_id = {
"resource_name": resource_name,
}
id = f"{resource_name}_{random_suffix()}"
# TODO: if you specified some additional resource parameters then you need to put them inside resource metadata:
resource_metadata = {
"some_resource_parameter": some_resource_parameter
}
return call_procedure("PUBLIC.CREATE_RESOURCE",
[
varchar_argument(id),
variant_argument(resource_id),
variant_list_argument(ingestion_config),
varchar_argument(id),
"true",
variant_argument(resource_metadata)
])
Personnalisation de la logique de procédure CREATE_RESOURCE()¶
La procédure PUBLIC.CREATE_RESOURCE()
permet au développeur de personnaliser son exécution en implémentant une logique propre qui est branchée à plusieurs endroits du flux d’exécution principal. Le SDK permet au développeur les opérations suivantes :
Validez la ressource avant sa création. La logique doit être implémentée dans la procédure
PUBLIC.CREATE_RESOURCE_VALIDATE()
.Effectuez certaines opérations personnalisées avant la création de la ressource. La logique doit être implémentée dans la procédure
PUBLIC.PRE_CREATE_RESOURCE()
.Effectuez certaines opérations personnalisées après la création de la ressource. La logique doit être implémentée dans la procédure
PUBLIC.POST_CREATE_RESOURCE()
.
Plus d’informations sur la personnalisation de la procédure PUBLIC.CREATE_RESOURCE()
sont disponibles ici :
TemplateCreateResourceHandler.java¶
Cette classe est un gestionnaire pour la procédure PUBLIC.CREATE_RESOURCE()
. Ici, vous pouvez injecter les implémentations Java des gestionnaires pour les procédures de rappel mentionnées précédemment. Par défaut, le modèle fournit des implémentations Java simulées de gestionnaires de rappel afin de se débarrasser des appels aux procédures SQL qui prolongent le temps d’exécution de la procédure entière. Les implémentations Java rendent l’exécution plus rapide. Ces implémentations simulées ne font rien d’autre que renvoyer une réponse de réussite. Vous pouvez soit fournir l’implémentation personnalisée aux classes de rappel préparées par le modèle, soit créer ces rappels à partir de zéro et les injecter dans le flux d’exécution de la procédure principale dans le générateur de gestionnaires.
Afin d’implémenter la logique personnalisée pour les méthodes de rappel appelées par défaut, recherchez les phrases suivantes dans le code :
TODO: IMPLEMENT ME create resource validate
TODO: IMPLEMENT ME pre create resource callback
TODO: IMPLEMENT ME post create resource callback
Ingestion¶
Pour réaliser l’ingestion de données, vous devez mettre en œuvre une classe qui gérera la connexion avec le système source et récupérera les données, en fonction de la configuration de la ressource. Les modules Planificateur et Task Reactor se chargeront du déclenchement et de la mise en file d’attente des tâches d’ingestion.
La logique d’ingestion est appelée à partir de la classe TemplateIngestion
, recherchez TODO: IMPLEMENT ME ingestion
dans le code et remplacez la génération aléatoire de données par la récupération de données à partir du système source. Si vous avez ajouté des propriétés personnalisées à la définition de la ressource, elles peuvent être extraites des tables des connecteurs internes à l’aide de ResourceIngestionDefinitionRepository
et des propriétés disponibles sur TemplateWorkItem
:
resourceIngestionDefinitionId
ingestionConfigurationId
Par exemple, la récupération de données à partir d’un service Web peut ressembler à ceci :
public final class SourceSystemHttpHelper {
private static final String DATA_URL = "https://source_system.com/data/%s";
private static final SourceSystemHttpClient sourceSystemClient = new SourceSystemHttpClient();
private static final ObjectMapper objectMapper = new ObjectMapper();
private static List<Variant> fetchData(String resourceId) {
var response = sourceSystemClient.get(String.format(url, resourceId));
var body = response.body();
try {
return Arrays.stream(objectMapper.readValue(body, Map[].class))
.map(Variant::new)
.collect(Collectors.toList());
} catch (JsonProcessingException e) {
throw new RuntimeException("Cannot parse json", e);
}
}
}
public class SourceSystemHttpClient {
private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(15);
private final HttpClient client;
private final String secret;
public SourceSystemHttpClient() {
this.client = HttpClient.newHttpClient();
this.secret =
SnowflakeSecrets.newInstance()
.getGenericSecretString(ConnectionConfiguration.TOKEN_NAME);
}
public HttpResponse<String> get(String url) {
var request =
HttpRequest.newBuilder()
.uri(URI.create(url))
.GET()
.header("Authorization", format("Bearer %s", secret))
.header("Content-Type", "application/json")
.timeout(REQUEST_TIMEOUT)
.build();
try {
return client.send(request, HttpResponse.BodyHandlers.ofString());
} catch (IOException | InterruptedException ex) {
throw new RuntimeException(format("HttpRequest failed: %s", ex.getMessage()), ex);
}
}
}
Gérer le cycle de vie des ressources¶
Une fois la logique de création des ressources et leur ingestion implémentée, vous pouvez gérer leur cycle de vie par les procédures suivantes :
PUBLIC.ENABLE_RESOURCE()
- cette procédure active une ressource particulière, ce qui signifie qu’elle sera planifiée pour l’ingestionPUBLIC.DISABLE_RESOURCE()
- cette procédure désactive une ressource particulière, ce qui signifie que sa planification d’ingestion sera arrêtéePUBLIC.UPDATE_RESOURCE()
- cette procédure permet de mettre à jour les configurations d’ingestion d’une ressource particulière. Elle n’est pas implémentée dans l’UI Streamlit par défaut car cela peut parfois être indésirable pour le développeur de permettre à l’utilisateur du connecteur de personnaliser la configuration d’ingestion (révoquer les autorisations sur cette procédure du rôle d’applicationACCOUNTADMIN
afin d’interdire complètement son utilisation).
Toutes ces procédures disposent de gestionnaires Java et sont étendues avec des rappels qui permettent de personnaliser leur exécution. Vous pouvez injecter des implémentations personnalisées de rappels à l’aide du générateur de ces gestionnaires. Par défaut, le modèle fournit des implémentations Java simulées de gestionnaires de rappel afin de se débarrasser des appels vers les procédures SQL qui prolongent la durée totale d’exécution des procédures mentionnées. Ces implémentations simulées ne font rien d’autre que renvoyer une réponse de réussite. Vous pouvez soit fournir l’implémentation personnalisée aux classes de rappel préparées par le modèle, soit créer ces rappels à partir de zéro et les injecter dans le flux d’exécution de la procédure principale dans les générateurs de gestionnaires.
TemplateEnableResourceHandler.java¶
Cette classe est un gestionnaire pour la procédure PUBLIC.ENABLE_RESOURCE()
, qui peut être étendue avec les rappels dédiés à :
Validez la ressource avant qu’elle ne soit activée. Recherchez l’expression
TODO: IMPLEMENT ME enable resource validate
dans le code afin de fournir l’implémentation personnalisée.Effectuez certaines opérations personnalisées avant que la ressource ne soit activée. Recherchez l’expression
TODO: IMPLEMENT ME pre enable resource
dans le code afin de fournir l’implémentation personnalisée.Effectuez certaines opérations personnalisées une fois la ressource activée. Recherchez l’expression
TODO: IMPLEMENT ME post enable resource
dans le code afin de fournir l’implémentation personnalisée.
Apprenez-en avec les documentations détaillées sur la procédure PUBLIC.ENABLE_RESOURCE()
:
TemplateDisableResourceHandler.java¶
Cette classe est un gestionnaire pour la procédure PUBLIC.DISABLE_RESOURCE()
, qui peut être étendue avec les rappels dédiés à :
Validez la ressource avant qu’elle ne soit désactivée. Recherchez l’expression
TODO: IMPLEMENT ME disable resource validate
dans le code afin de fournir l’implémentation personnalisée.Effectuez certaines opérations personnalisées avant que la ressource ne soit désactivée. Recherchez l’expression
TODO: IMPLEMENT ME pre disable resource
dans le code afin de fournir l’implémentation personnalisée.
Apprenez-en avec les documentations détaillées sur la procédure PUBLIC.DISABLE_RESOURCE()
:
TemplateUpdateResourceHandler.java¶
Cette classe est un gestionnaire pour la procédure PUBLIC.UPDATE_RESOURCE()
, qui peut être étendue avec les rappels dédiés à :
Validez la ressource avant sa mise à jour. Recherchez l’expression
TODO: IMPLEMENT ME update resource validate
dans le code afin de fournir l’implémentation personnalisée.Effectuez certaines opérations personnalisées avant que la ressource ne soit mise à jour. Recherchez l’expression
TODO: IMPLEMENT ME pre update resource
dans le code afin de fournir l’implémentation personnalisée.Effectuez certaines opérations personnalisées une fois la ressource mise à jour. Recherchez l’expression
TODO: IMPLEMENT ME post update resource
dans le code afin de fournir l’implémentation personnalisée.
Apprenez-en avec les documentations détaillées sur la procédure PUBLIC.UPDATE_RESOURCE()
:
Paramètres¶
Le modèle contient un onglet « Paramètres » qui vous permet de voir toutes les configurations effectuées auparavant. Toutefois, si les propriétés de configuration ont été personnalisées, cette vue doit également l’être. Le code de l’onglet Paramètres se trouve dans le fichier streamlit/daily_use/settings_page.py
. Pour le personnaliser, il suffit d’extraire les valeurs de la configuration pour les clés qui ont été ajoutées dans les configurations respectives.
Par exemple, si la propriété antérieure additional_connection_property
a été ajoutée plus tôt dans l’étape de configuration de la connexion, elle pourrait être ajoutée comme suit :
def connection_config_page():
current_config = get_connection_configuration()
# TODO: implement the display for all the custom properties defined in the connection configuration step
custom_property = current_config.get("custom_connection_property", "")
additional_connection_property = current_config.get("additional_connection_property", "")
st.header("Connector configuration")
st.caption("Here you can see the connector connection configuration saved during the connection configuration step "
"of the Wizard. If some new property was introduced it has to be added here to display.")
st.divider()
st.text_input(
"Custom connection property:",
value=custom_property,
disabled=True
)
st.text_input(
"Additional connection property:",
value=additional_connection_property,
disabled=True
)
st.divider()