Tâches Snowflake ML¶
Utilisez les tâches Snowflake ML pour exécuter des flux de travail de machine learning (ML) dans des environnements d’exécution des conteneurs Snowflake ML. Vous pouvez les exécuter à partir de n’importe quel environnement de développement. Vous n’avez pas besoin d’exécuter le code dans une feuille de calcul ou un notebooks Snowflake. Utilisez les tâches pour tirer parti de l’infrastructure de Snowflake afin d’exécuter des tâches exigeantes en ressources dans le cadre de votre flux de développement. Pour obtenir des informations sur la configuration de Snowflake ML au niveau local, voir Utilisation de Snowflake ML en local.
Important
Les tâches Snowflake ML sont disponibles dans snowflake-ml-python
à partir de la version 1.9.2.
Les tâches Snowflake ML vous permettent d’effectuer les opérations suivantes :
Exécuter les charges de travail ML sur les pools de calcul Snowflake, y compris les instances GPU et les instances CPU à mémoire élevée.
Utiliser votre environnement de développement préféré, tel que VS Code ou les notebooks Jupyter.
Installez et utilisez des paquets Python personnalisés dans votre environnement d’exécution.
Utiliser les APIs distribuées de Snowflake pour optimiser le chargement des données, l’entraînement et l’ajustement des hyperparamètres.
Intégrez des outils d’orchestration, tels que Apache Airflow.
Surveiller et gérer les travaux à l’aide des APIs de Snowflake.
Vous pouvez utiliser ces fonctionnalités pour effectuer les opérations suivantes :
Effectuer un entraînement exigeant en ressources sur de grands ensembles de données nécessitant l’accélération de GPU ou d’importantes ressources de calcul.
Mettre en production des workflows ML en faisant passer le code ML du développement à la production avec une exécution programmatique par le biais de pipelines.
Conserver votre environnement de développement existant tout en exploitant les ressources de calcul de Snowflake.
Transformer les workflows OSS ML avec un minimum de modifications du code.
Travailler directement avec de grands ensembles de données Snowflake pour réduire les mouvements de données et éviter les transferts de données coûteux.
Conditions préalables¶
Important
Actuellement, les tâches Snowflake ML ne prennent en charge que les clients Python 3.10. Veuillez contacter l’équipe de votre compte Snowflake si vous avez besoin d’aide pour d’autres versions de Python.
Installez le paquet Python Snowflake ML dans votre environnement Python 3.10.
pip install snowflake-ml-python>=1.9.2
La taille par défaut du pool de calcul utilise la famille d’instances CPU_X64_S. Le nombre minimum de nœuds est 1 et le maximum 25. Vous pouvez utiliser la commande SQL suivante pour créer un pool de calcul personnalisé :
CREATE COMPUTE POOL IF NOT EXISTS MY_COMPUTE_POOL MIN_NODES = <MIN_NODES> MAX_NODES = <MAX_NODES> INSTANCE_FAMILY = <INSTANCE_FAMILY>;
Les tâches Snowflake ML nécessitent une session Snowpark. Utilisez le code suivant pour la créer :
from snowflake.snowpark import Session from snowflake.ml.jobs import list_jobs ls = list_jobs() # This will fail! You must create a session first. # Requires valid ~/.snowflake/config.toml file session = Session.builder.getOrCreate() ls = list_jobs(session=session) ls = list_jobs() # Infers created session from context
Pour obtenir des informations sur la création d’une session, voir Création d’une session.
Exécuter une tâche Snowflake ML¶
Vous pouvez exécuter une tâche Snowflake ML de l’une des manières suivantes :
Utilisation d’un décorateur de fonction dans votre code.
Soumission de fichiers ou de répertoires entiers à l’aide de l’API Python.
Exécution d’une fonction Python en tant que tâche Snowflake ML¶
Utilisez Function Dispatch pour exécuter des fonctions Python individuelles à distance sur les ressources de calcul de Snowflake avec le décorateur @remote
.
À l’aide du décorateur @remote
, vous pouvez effectuer les opérations suivantes :
Sérialiser la fonction et ses dépendances.
La télécharger dans une zone de préparation Snowflake spécifiée.
L’exécuter dans Container Runtime pour ML.
L’exemple de code Python suivant utilise le décorateur @remote
pour soumettre une tâche Snowflake ML. Notez qu’une Session
Snowpark est requise, voir Conditions préalables.
from snowflake.ml.jobs import remote
@remote("MY_COMPUTE_POOL", stage_name="payload_stage", session=session)
def train_model(data_table: str):
# Provide your ML code here, including imports and function calls
...
job = train_model("my_training_data")
L’appel d’une fonction décorée @remote
renvoie un objet MLJob
Snowflake qui peut être utilisé pour gérer et surveiller l’exécution de la tâche. Pour plus d’informations, voir Gestion des tâches ML.
Exécuter un fichier Python en tant que tâche Snowflake ML¶
Exécutez des fichiers Python ou des répertoires de projets sur les ressources de calcul Snowflake. Ceci est utile lorsque :
Vous avez des projets ML complexes avec de multiples modules et dépendances.
Vous souhaitez maintenir une séparation entre le développement local et le code de production.
Vous devez exécuter des scripts qui utilisent des arguments de ligne de commande.
Vous travaillez avec des projets ML existants qui n’ont pas été spécifiquement conçus pour être exécutés sur le pool de calcul Snowflake.
L’API de la tâche Snowflake propose trois méthodes principales de soumission des charges utiles basées sur des fichiers :
submit_file()
: Pour l’exécution d’un seul fichier Pythonsubmit_directory()
: Pour l’exécution de projets Python couvrant plusieurs fichiers et ressourcessubmit_from_stage()
: Pour l’exécution de projets Python enregistrés sur une zone de préparation Snowflake
Les deux méthodes prennent en charge ce qui suit :
La transmission de l’argument de ligne de commande
La configuration de la variable d’environnement
La spécification de la dépendance personnalisée
Gestion des actifs du projet à travers les zones de préparation Snowflake
File Dispatch est particulièrement utile pour mettre en production des workflows ML existants et pour maintenir une séparation claire entre les environnements de développement et d’exécution.
Le code Python suivant soumet un fichier en tant que tâche Snowflake ML :
from snowflake.ml.jobs import submit_file
# Run a single file
job1 = submit_file(
"train.py",
"MY_COMPUTE_POOL",
stage_name="payload_stage",
args=["--data-table", "my_training_data"],
session=session,
)
Le code Python suivant soumet un répertoire en tant que tâche Snowflake ML :
from snowflake.ml.jobs import submit_directory
# Run from a directory
job2 = submit_directory(
"./ml_project/",
"MY_COMPUTE_POOL",
entrypoint="train.py",
stage_name="payload_stage",
session=session,
)
Le code Python suivant soumet un répertoire à partir d’une zone de préparation Snowflake en tant que tâche Snowflake ML :
from snowflake.ml.jobs import submit_from_stage
# Run from a directory
job3 = submit_from_stage(
"@source_stage/ml_project/"
"MY_COMPUTE_POOL",
entrypoint="@source_stage/ml_project/train.py",
stage_name="payload_stage",
session=session,
)
# Entrypoint may also be a relative path
job4 = submit_from_stage(
"@source_stage/ml_project/",
"MY_COMPUTE_POOL",
entrypoint="train.py", # Resolves to @source_stage/ml_project/train.py
stage_name="payload_stage",
session=session,
)
La soumission d’un fichier ou d’un répertoire renvoie un objet MLJob
Snowflake qui peut être utilisé pour gérer et surveiller l’exécution de la tâche. Pour plus d’informations, voir Gestion des tâches ML.
Prise en charge des charges utiles supplémentaires dans les soumissions¶
Lors de la soumission d’un fichier, d’un répertoire ou à partir d’une zone de préparation, des charges utiles supplémentaires sont prises en charge pour une utilisation pendant l’exécution de la tâche. Le chemin d’importation peut être spécifié explicitement. Sinon, il sera déduit de l’emplacement de la charge utile supplémentaire.
Important
Seuls les répertoires peuvent être spécifiés comme sources d’importation. L’importation de fichiers individuels n’est pas prise en charge.
# Run from a file
job1 = submit_file(
"train.py",
"MY_COMPUTE_POOL",
stage_name="payload_stage",
session=session,
additional_payloads=[
("src/utils/", "utils"), # the import path is utils
],
)
# Run from a directory
job2 = submit_directory(
"./ml_project/",
"MY_COMPUTE_POOL",
entrypoint="train.py",
stage_name="payload_stage",
session=session,
additional_payloads=[
("src/utils/"), # the import path is utils
],
)
# Run from a stage
job3 = submit_from_stage(
"@source_stage/ml_project/",
"MY_COMPUTE_POOL",
entrypoint="@source_stage/ml_project/train.py",
stage_name="payload_stage",
session=session,
additional_payloads=[
("@source_stage/src/utils/sub_utils/", "utils.sub_utils"),
],
)
Accès à la session Snowpark dans les tâches ML¶
Lors de l’exécution des tâches ML sur Snowflake, une session Snowpark est automatiquement disponible dans le contexte d’exécution. Vous pouvez accéder à l’objet de la session à partir de la charge utile de la tâche ML à l’aide des approches suivantes :
from snowflake.ml.jobs import remote
from snowflake.snowpark import Session
@remote("MY_COMPUTE_POOL", stage_name="payload_stage")
def my_function():
# This approach works for all payload types, including file and directory payloads
session = Session.builder.getOrCreate()
print(session.sql("SELECT CURRENT_VERSION()").collect())
@remote("MY_COMPUTE_POOL", stage_name="payload_stage")
def my_function_with_injected_session(session: Session):
# This approach works only for function dispatch payloads
# The session is injected automatically by the Snowflake ML Job API
print(session.sql("SELECT CURRENT_VERSION()").collect())
La session Snowpark peut être utilisée pour accéder aux tableaux, zones de préparation et autres objets de base de données Snowflake à l’intérieur de votre tâche ML.
Renvoi des résultats des tâches ML¶
Les tâches Snowflake ML prennent en charge le renvoi des résultats de l’exécution vers l’environnement client. Cela vous permet de récupérer des valeurs calculées, des modèles entraînés ou tout autre artefact produit par vos charges utiles de tâche.
Pour la distribution des fonctions, il suffit de renvoyer une valeur de votre fonction décorée. La valeur renvoyée sera sérialisée et mise à disposition par l’intermédiaire de la méthode:code:result()
.
from snowflake.ml.jobs import remote
@remote("MY_COMPUTE_POOL", stage_name="payload_stage")
def train_model(data_table: str):
# Your ML code here
model = XGBClassifier()
model.fit(data_table)
return model
job1 = train_model("my_training_data")
Pour les tâches basées sur des fichiers, utilisez la variable spéciale __return__
pour spécifier la valeur de retour.
# Example: /path/to/repo/my_script.py
def main():
# Your ML code here
model = XGBClassifier()
model.fit(data_table)
return model
if __name__ == "__main__":
__return__ = main()
from snowflake.ml.jobs import submit_file
job2 = submit_file(
"/path/to/repo/my_script.py",
"MY_COMPUTE_POOL",
stage_name="payload_stage",
session=session,
)
Vous pouvez récupérer le résultat de l’exécution de la tâche en utilisant l’API:code:MLJob.result()
. L’API bloque le thread appelant jusqu’à ce que la tâche atteigne un état final, puis renvoie la valeur de retour de la charge utile ou, lève une exception en cas d’échec de l’exécution. Si la charge utile ne définit pas de valeur de retour, le résultat de la réussite sera None
.
# These will block until the respective job is done and return the trained model
model1 = job1.result()
model2 = job2.result()
Gestion des tâches ML¶
Lorsque vous soumettez une tâche Snowflake ML, l’API crée un objet MLJob
. Vous pouvez l’utiliser pour effectuer les opérations suivantes :
Suivre l’avancement de la tâche grâce à des mises à jour de statut
Déboguer les problèmes à l’aide de journaux d’exécution détaillés
Récupérer le résultat de l’exécution (le cas échéant)
Vous pouvez utiliser l’API get_job()
pour récupérer un objet MLJob
par son ID. Le code Python suivant montre comment récupérer un objet MLJob
:
from snowflake.ml.jobs import MLJob, get_job, list_jobs, delete_job
# Get a list of the 10 most recent jobs as a Pandas DataFrame
jobs_df = list_jobs(limit=10)
print(jobs_df) # Display list in table format
# Retrieve an existing job based on ID
job = get_job("<job_id>") # job is an MLJob instance
# Retrieve status and logs for the retrieved job
print(job.status) # PENDING, RUNNING, FAILED, DONE
print(job.get_logs())
# Clean up the job
delete_job(job)
Gestion des dépendances¶
L’API de la tâche Snowflake ML exécute des charges utiles dans l’environnement Container Runtime pour ML. L’environnement dispose des paquets Python les plus couramment utilisés pour le machine learning et la science des données. La plupart des cas d’utilisation devraient fonctionner « prêts à l’emploi » sans configuration supplémentaire. Si vous avez besoin de dépendances personnalisées, vous pouvez utiliser pip_requirements
pour les installer.
Pour installer des dépendances personnalisées, vous devez activer l’accès au réseau externe à l’aide d’une intégration de l’accès externe. Vous pouvez utiliser l’exemple de commande SQL suivante pour fournir l’accès :
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION PYPI_EAI
ALLOWED_NETWORK_RULES = (snowflake.external_access.pypi_rule)
ENABLED = true;
Pour plus d’informations sur les intégrations d’accès externe, voir Création et utilisation d’une intégration d’accès externe.
Après avoir fourni un accès au réseau externe, vous pouvez utiliser les paramètres pip_requirements
et external_access_integrations
pour configurer des dépendances personnalisées. Vous pouvez utiliser des paquets qui ne sont pas disponibles dans l’environnement du Container Runtime. Vous pouvez également spécifier les versions des paquets.
Le code Python suivant montre comment spécifier des dépendances personnalisées au décorateur remote
:
@remote(
"MY_COMPUTE_POOL",
stage_name="payload_stage",
pip_requirements=["custom-package"],
external_access_integrations=["PYPI_EAI"],
session=session,
)
def my_function():
# Your code here
Le code Python suivant montre comment spécifier des dépendances personnalisées pour la méthode submit_file()
:
from snowflake.ml.jobs import submit_file
# Can include version specifier to specify version(s)
job = submit_file(
"/path/to/repo/my_script.py",
compute_pool,
stage_name="payload_stage",
pip_requirements=["custom-package==1.0.*"],
external_access_integrations=["pypi_eai"],
session=session,
)
Flux de paquets privés¶
Les tâches Snowflake ML prennent également en charge le chargement de paquets à partir de flux privés tels que JFrog Artifactory et Sonatype Nexus Repository. Ces flux sont généralement utilisés pour distribuer des paquets internes et propriétaires, maintenir le contrôle sur les versions des dépendances et assurer la sécurité/conformité.
Pour installer des paquets à partir d’un flux privé, vous devez procéder comme suit :
Créez une règle de réseau pour autoriser l’accès à l’URL du flux privé.
Pour les sources qui utilisent l’authentification de base, vous pouvez simplement créer une règle de réseau.
CREATE OR REPLACE NETWORK RULE private_feed_nr MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('<your-repo>.jfrog.io');
Pour configurer l’accès à une source à l’aide d’une connectivité privée (c’est-à-dire Private Link), suivez les étapes décrites dans Sortie réseau par connectivité privée.
Créez une intégration de l’accès externe à l’aide de la règle de réseau. Accordez l’autorisation d’utiliser l’EAI au rôle qui soumettra les tâches.
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION private_feed_eai ALLOWED_NETWORK_RULES = (PRIVATE_FEED_NR) ENABLED = true; GRANT USAGE ON INTEGRATION private_feed_eai TO ROLE <role_name>;
Indiquez l’URL du flux privé, l’intégration de l’accès externe et le(s) paquet(s) lors de la soumission de la tâche
# Option 1: Specify private feed URL in pip_requirements job = submit_file( "/path/to/script.py", compute_pool="MY_COMPUTE_POOL", stage_name="payload_stage", pip_requirements=[ "--index-url=https://your.private.feed.url", "internal-package==1.2.3" ], external_access_integrations=["PRIVATE_FEED_EAI"] )
# Option 2: Specify private feed URL by environment variable job = submit_directory( "/path/to/code/", compute_pool="MY_COMPUTE_POOL", entrypoint="script.py", stage_name="payload_stage", pip_requirements=["internal-package==1.2.3"], external_access_integrations=["PRIVATE_FEED_EAI"], env_vars={'PIP_INDEX_URL': 'https://your.private.feed.url'}, )
Si votre flux privé URL contient des informations sensibles comme des jetons d’authentification, gérez l” URL en créant un secret Snowflake. Utilisez CREATE SECRET pour créer un secret. Configurez les secrets lors de la soumission d’une tâche avec l’argument spec_overrides
.
# Create secret for private feed URL with embedded auth token
feed_url = "<your-repo>.jfrog.io/artifactory/api/pypi/test-pypi/simple"
user = "<auth_user>"
token = "<auth_token>"
session.sql(f"""
CREATE SECRET IF NOT EXISTS PRIVATE_FEED_URL_SECRET
TYPE = GENERIC_STRING
SECRET_STRING = 'https://{auth_user}:{auth_token}@{feed_url}'
""").collect()
# Prepare service spec override for mounting secret into job execution
spec_overrides = {
"spec": {
"containers": [
{
"name": "main", # Primary container name is always "main"
"secrets": [
{
"snowflakeSecret": "PRIVATE_FEED_URL_SECRET",
"envVarName": "PIP_INDEX_URL",
"secretKeyRef": "secret_string"
},
],
}
]
}
}
# Load private feed URL from secret (e.g. if URL includes auth token)
job = submit_file(
"/path/to/script.py",
compute_pool="MY_COMPUTE_POOL",
stage_name="payload_stage",
pip_requirements=[
"internal-package==1.2.3"
],
external_access_integrations=["PRIVATE_FEED_EAI"],
spec_overrides=spec_overrides,
)
Pour plus d’informations sur container.secrets
, voir champ containers.secrets.
Exemples¶
Voir ML Échantillons de code de tâches pour des exemples d’utilisation de tâches Snowflake ML.
Considérations relatives aux clients¶
Les tâches Snowflake ML s’exécutent sur Snowpark Container Services et sont facturés en fonction de l’utilisation. Pour des informations sur les coûts de calcul, voir Coûts de Snowpark Container Services.
Les charges utiles des tâches sont téléchargées dans la zone de préparation spécifiée avec l’argument stage_name
. Pour éviter des frais supplémentaires, vous devez les nettoyer. Pour plus d’informations, voir Comprendre le coût de stockage et Exploration des coûts de stockage pour en savoir plus sur les coûts liés au stockage de la zone de préparation.