Snowflake ML Jobs¶
Verwenden Sie Snowflake ML Jobs, um Workflows des maschinellen Lernens (ML) innerhalb von Snowflake ML Container-Laufzeiten auszuführen. Sie können sie von jeder Entwicklungsumgebung aus ausführen. Sie müssen den Code nicht in einem Snowflake-Arbeitsblatt oder -Notizbuch ausführen. Verwenden Sie Jobs, um die Infrastruktur von Snowflake für ressourcenintensive Aufgaben innerhalb Ihres Entwicklungsworkflows zu nutzen. Informationen zur lokalen Einrichtung von Snowflake ML finden Sie unter Lokale Verwendung von Snowflake ML.
Wichtig
Snowflake ML-Jobs sind in snowflake-ml-python
Version 1.9.2 und höher verfügbar.
Mit Snowflake ML Jobs können Sie Folgendes tun:
Führen Sie ML-Workloads auf Snowflake Computepools aus, einschließlich GPU- und CPU-Instanzen mit hohem Speicherbedarf.
Verwenden Sie Ihre bevorzugte Entwicklungsumgebung wie VS Code oder Jupyter Notebooks.
Installieren und verwenden Sie benutzerdefinierte Python-Pakete in Ihrer Laufzeitumgebung.
Nutzen Sie Snowflakes verteilte APIs, um das Laden von Daten, das Training und die Abstimmung der Hyperparameter zu optimieren.
Integrieren Sie mit Orchestrierungstools wie Apache Airflow.
Überwachen und verwalten Sie Aufträge mit Hilfeder Snowflake-APIs.
Mit diesen Funktionen können Sie Folgendes tun:
Führen Sie ressourcenintensives Training auf großen Datensätzen durch, die GPU-Beschleunigung oder erhebliche Rechenressourcen erfordern.
Bringen Sie ML-Workflows in Produktion, indem Sie ML-Code von der Entwicklung zur Produktion mit programmatischer Ausführung durch Pipelines verschieben.
Behalten Sie Ihre bestehende Entwicklungsumgebung bei und nutzen Sie gleichzeitig die Rechenressourcen von Snowflake.
Heben und verschieben Sie OSS ML-Workflows mit minimalen Codeänderungen.
Arbeiten Sie direkt mit großen Snowflake-Datensätzen, um Datenbewegungen zu reduzieren und teure Datenübertragungen zu vermeiden.
Voraussetzungen¶
Wichtig
Snowflake ML Jobs unterstützt derzeit nur Python 3.10 Clients. Bitte kontaktieren Sie Ihr Snowflake-Kundenteam, wenn Sie Unterstützung für andere Python-Versionen benötigen.
Installieren Sie das Snowflake ML Python-Paket in Ihrer Python 3.10 Umgebung.
pip install snowflake-ml-python>=1.9.2
Die Standardgröße des Computepools verwendet die Instanzfamilie CPU_X64_S. Die minimale Anzahl von Knoten ist 1 und die maximale ist 25. Sie können den folgenden SQL-Befehl verwenden, um einen benutzerdefinierten Computepool zu erstellen:
CREATE COMPUTE POOL IF NOT EXISTS MY_COMPUTE_POOL MIN_NODES = <MIN_NODES> MAX_NODES = <MAX_NODES> INSTANCE_FAMILY = <INSTANCE_FAMILY>;
Snowflake ML Jobs erfordern eine Snowpark-Sitzung. Verwenden Sie den folgenden Code, um sie zu erstellen:
from snowflake.snowpark import Session from snowflake.ml.jobs import list_jobs ls = list_jobs() # This will fail! You must create a session first. # Requires valid ~/.snowflake/config.toml file session = Session.builder.getOrCreate() ls = list_jobs(session=session) ls = list_jobs() # Infers created session from context
Informationen zum Erstellen einer Sitzung finden Sie unter Erstellen einer Sitzung.
Führen Sie einen Snowflake ML Job aus¶
Sie können einen Snowflake ML Job auf eine der folgenden Arten ausführen:
Verwenden eines Funktions-Decorators in Ihrem Code
Übermittlung ganzer Dateien oder Verzeichnisse mit Python-API.
Python-Funktion als Snowflake ML-Job ausführen¶
Verwenden Sie Function Dispatch, um einzelne Python-Funktionen mit dem Decorator @remote
auf den Rechenressourcen von Snowflake auszuführen.
Mit dem Decorator @remote
können Sie Folgendes tun:
Serialisieren Sie die Funktion und ihre Abhängigkeiten.
Laden Sie sie in einen bestimmten Snowflake Stagingbereich hoch.
Führen Sie es innerhalb der Container Runtime für ML aus.
Der folgende Python-Beispielcode verwendet den Decorator @remote
, um einen Snowflake ML Job zu übermitteln. Beachten Sie, dass eine Snowpark Session
erforderlich ist, siehe Voraussetzungen.
from snowflake.ml.jobs import remote
@remote("MY_COMPUTE_POOL", stage_name="payload_stage", session=session)
def train_model(data_table: str):
# Provide your ML code here, including imports and function calls
...
job = train_model("my_training_data")
Der Aufruf einer dekorierten @remote
-Funktion gibt ein Snowflake MLJob
-Objekt zurück, das zur Verwaltung und Überwachung der Jobausführung verwendet werden kann. Weitere Informationen dazu finden Sie unter Verwalten von ML-Jobs.
Eine Python-Datei als Snowflake ML Job ausführen¶
Führen Sie Python-Dateien oder Projektverzeichnisse auf Snowflake-Rechenressourcen aus. Dies ist nützlich, wenn:
Sie komplexe ML-Projekte mit mehreren Modulen und Abhängigkeiten haben.
Sie die Trennung zwischen lokaler Entwicklung und Produktionscode aufrechterhalten wollen.
Sie Skripte ausführen müssen, die Befehlszeilenargumente verwenden.
Sie mit bestehenden ML-Projekten arbeiten, die nicht speziell für die Ausführung auf Snowflake Compute entwickelt wurden.
Die Snowflake Job-API bietet drei Hauptmethoden für die Übermittlung von dateibasierten Nutzlasten:
submit_file()
: Zum Ausführen einzelner Python-Dateiensubmit_directory()
: Für die Ausführung von Python-Projekten, die mehrere Dateien und Ressourcen umfassensubmit_from_stage()
: Für die Ausführung von Python-Projekten, die in einem Snowflake-Stagingbereich gespeichert sind
Beide Methoden unterstützen Folgendes:
Übergabe von Argumenten in der Befehlszeile
Konfiguration der Umgebungsvariablen
Benutzerdefinierte Abhängigkeitsangaben
Verwaltung der Projektressourcen über die Snowflake-Stagingbereiche
File Dispatch ist besonders nützlich, um bestehende ML-Workflows zu produzieren und eine klare Trennung zwischen Entwicklungs- und Ausführungsumgebung aufrechtzuerhalten.
Der folgende Python-Code übermittelt eine Datei als Snowflake ML-Job:
from snowflake.ml.jobs import submit_file
# Run a single file
job1 = submit_file(
"train.py",
"MY_COMPUTE_POOL",
stage_name="payload_stage",
args=["--data-table", "my_training_data"],
session=session,
)
Der folgende Python-Code übermittelt ein Verzeichnis als Snowflake ML-Job:
from snowflake.ml.jobs import submit_directory
# Run from a directory
job2 = submit_directory(
"./ml_project/",
"MY_COMPUTE_POOL",
entrypoint="train.py",
stage_name="payload_stage",
session=session,
)
Der folgende Python-Code übermittelt ein Verzeichnis aus einem Snowflake-Stagingbereich als Snowflake ML-Job:
from snowflake.ml.jobs import submit_from_stage
# Run from a directory
job3 = submit_from_stage(
"@source_stage/ml_project/"
"MY_COMPUTE_POOL",
entrypoint="@source_stage/ml_project/train.py",
stage_name="payload_stage",
session=session,
)
# Entrypoint may also be a relative path
job4 = submit_from_stage(
"@source_stage/ml_project/",
"MY_COMPUTE_POOL",
entrypoint="train.py", # Resolves to @source_stage/ml_project/train.py
stage_name="payload_stage",
session=session,
)
Die Übermittlung einer Datei oder eines Verzeichnisses gibt ein Snowflake MLJob
-Objekt zurück, das zur Verwaltung und Überwachung der Jobausführung verwendet werden kann. Weitere Informationen dazu finden Sie unter Verwalten von ML-Jobs.
Unterstützung zusätzlicher Nutzlasten in Übermittlungen¶
Beim Übermitteln einer Datei, eines Verzeichnisses oder aus einem Stagingbereich werden zusätzliche Nutzdaten für die Verwendung während der Jobausführung unterstützt. Der Importpfad kann explizit angegeben werden. Andernfalls wird er vom Speicherort der zusätzlichen Nutzlast abgeleitet.
Wichtig
Es können nur Verzeichnisse als Importquellen angegeben werden. Das Importieren einzelner Dateien wird nicht unterstützt.
# Run from a file
job1 = submit_file(
"train.py",
"MY_COMPUTE_POOL",
stage_name="payload_stage",
session=session,
additional_payloads=[
("src/utils/", "utils"), # the import path is utils
],
)
# Run from a directory
job2 = submit_directory(
"./ml_project/",
"MY_COMPUTE_POOL",
entrypoint="train.py",
stage_name="payload_stage",
session=session,
additional_payloads=[
("src/utils/"), # the import path is utils
],
)
# Run from a stage
job3 = submit_from_stage(
"@source_stage/ml_project/",
"MY_COMPUTE_POOL",
entrypoint="@source_stage/ml_project/train.py",
stage_name="payload_stage",
session=session,
additional_payloads=[
("@source_stage/src/utils/sub_utils/", "utils.sub_utils"),
],
)
Zugriff auf Snowpark-Sitzung über ML-Jobs¶
Beim Ausführen von ML-Jobs in Snowflake ist eine Snowpark-Sitzung im Ausführungskontext automatisch verfügbar. Sie können über die folgenden Ansätze auf das Sitzungsobjekt aus Ihrer ML-Job-Nutzlast zugreifen:
from snowflake.ml.jobs import remote
from snowflake.snowpark import Session
@remote("MY_COMPUTE_POOL", stage_name="payload_stage")
def my_function():
# This approach works for all payload types, including file and directory payloads
session = Session.builder.getOrCreate()
print(session.sql("SELECT CURRENT_VERSION()").collect())
@remote("MY_COMPUTE_POOL", stage_name="payload_stage")
def my_function_with_injected_session(session: Session):
# This approach works only for function dispatch payloads
# The session is injected automatically by the Snowflake ML Job API
print(session.sql("SELECT CURRENT_VERSION()").collect())
Die Snowpark-Sitzung kann für den Zugriff auf Snowflake-Tabellen, -Stagingbereiche und andere Datenbankobjekte innerhalb Ihres ML-Jobs verwendet werden.
Rückgabe von Ergebnissen aus ML-Jobs¶
Snowflake ML-Jobs unterstützen die Rückgabe von Ausführungsergebnissen an die Clientumgebung. So können Sie berechnete Werte, trainierte Modelle oder andere Artefakte abrufen, die durch Ihre Job-Nutzlasten erzeugt werden.
Für das Dispatching von Funktionen geben Sie einfach einen Wert aus Ihrer dekorierten Funktion zurück. Der zurückgegebene Wert wird serialisiert und über die result()
-Methode zur Verfügung gestellt.
from snowflake.ml.jobs import remote
@remote("MY_COMPUTE_POOL", stage_name="payload_stage")
def train_model(data_table: str):
# Your ML code here
model = XGBClassifier()
model.fit(data_table)
return model
job1 = train_model("my_training_data")
Für dateibasierte Jobs verwenden Sie die spezielle __return__
-Variable zur Angabe des Rückgabewerts.
# Example: /path/to/repo/my_script.py
def main():
# Your ML code here
model = XGBClassifier()
model.fit(data_table)
return model
if __name__ == "__main__":
__return__ = main()
from snowflake.ml.jobs import submit_file
job2 = submit_file(
"/path/to/repo/my_script.py",
"MY_COMPUTE_POOL",
stage_name="payload_stage",
session=session,
)
Sie können das Ergebnis der Jobausführung über die MLJob.result()
-API abrufen. Die API blockiert den aufrufenden Thread, bis der Job einen Endzustand erreicht hat, und gibt dann den Rückgabewert der Nutzlast zurück oder löst bei fehlgeschlagener Ausführung eine Ausnahme aus. Wenn die Nutzlast keinen Rückgabewert definiert, ist das Ergebnis None
bei Erfolg.
# These will block until the respective job is done and return the trained model
model1 = job1.result()
model2 = job2.result()
Verwalten von ML-Jobs¶
Wenn Sie einen Snowflake ML Job übermitteln, erstellt die API ein MLJob
-Objekt. Sie können damit Folgendes tun:
Verfolgen des Job-Fortschritts durch Statusaktualisierungen
Probleme anhand detaillierter Ausführungsprotokolle beheben
Abrufen des Ausführungsergebnisses (falls vorhanden)
Sie können die get_job()
API verwenden, um ein MLJob
-Objekt über dessen ID abzurufen. Der folgende Python-Code zeigt, wie Sie ein MLJob
-Objekt abrufen können:
from snowflake.ml.jobs import MLJob, get_job, list_jobs, delete_job
# Get a list of the 10 most recent jobs as a Pandas DataFrame
jobs_df = list_jobs(limit=10)
print(jobs_df) # Display list in table format
# Retrieve an existing job based on ID
job = get_job("<job_id>") # job is an MLJob instance
# Retrieve status and logs for the retrieved job
print(job.status) # PENDING, RUNNING, FAILED, DONE
print(job.get_logs())
# Clean up the job
delete_job(job)
Abhängigkeiten verwalten¶
Die Snowflake ML Job-API führt Nutzlasten innerhalb der Umgebung Container Runtime für ML aus. Die Umgebung enthält die am häufigsten verwendeten Python-Pakete für maschinelles Lernen und Data Science. Die meisten Anwendungsfälle sollten im Ist-Zustand ohne zusätzliche Konfiguration funktionieren. Wenn Sie benutzerdefinierte Abhängigkeiten benötigen, können Sie pip_requirements
verwenden, um sie zu installieren.
Um benutzerdefinierte Abhängigkeiten zu installieren, müssen Sie den externen Netzwerkzugriff mithilfe einer externen Zugriffsintegration aktivieren. Sie können den folgenden SQL-Beispielbefehl verwenden, um den Zugriff zu ermöglichen:
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION PYPI_EAI
ALLOWED_NETWORK_RULES = (snowflake.external_access.pypi_rule)
ENABLED = true;
Weitere Informationen zu Integrationen für den externen Zugriff finden Sie unter Erstellen und Verwenden einer Integration für den externen Zugriff.
Nachdem Sie den externen Netzwerkzugang bereitgestellt haben, können Sie die Parameter pip_requirements
und external_access_integrations
verwenden, um benutzerdefinierte Abhängigkeiten zu konfigurieren. Sie können Pakete verwenden, die in der Container Runtime-Umgebung nicht verfügbar sind oder wenn Sie bestimmte Versionen der Pakete verwenden.
Der folgende Python-Code zeigt, wie Sie benutzerdefinierte Abhängigkeiten für den remote
-Decorator angeben können:
@remote(
"MY_COMPUTE_POOL",
stage_name="payload_stage",
pip_requirements=["custom-package"],
external_access_integrations=["PYPI_EAI"],
session=session,
)
def my_function():
# Your code here
Der folgende Python-Code zeigt, wie Sie benutzerdefinierte Abhängigkeiten für die Methode submit_file()
angeben können:
from snowflake.ml.jobs import submit_file
# Can include version specifier to specify version(s)
job = submit_file(
"/path/to/repo/my_script.py",
compute_pool,
stage_name="payload_stage",
pip_requirements=["custom-package==1.0.*"],
external_access_integrations=["pypi_eai"],
session=session,
)
Private Paketzuführungen¶
Snowflake ML Jobs unterstützt auch das Laden von Paketen aus privaten Feeds wie JFrog Artifactory und Sonatype Nexus Repository. Diese Feeds werden häufig verwendet, um interne und proprietäre Pakete zu verteilen, die Kontrolle über die Versionen von Abhängigkeiten zu behalten und Sicherheit und Compliance zu gewährleisten.
Um Pakete aus einem privaten Feed zu installieren, müssen Sie Folgendes tun:
Erstellen Sie eine Netzwerkregel, um den Zugriff auf die URL des privaten Feeds zu erlauben.
Für Quellen, die eine einfache Authentifizierung verwenden, können Sie einfach eine Netzwerkregel erstellen.
CREATE OR REPLACE NETWORK RULE private_feed_nr MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('<your-repo>.jfrog.io');
Um den Zugriff auf eine Quelle über private Konnektivität (d. h. Private Link) zu konfigurieren, folgen Sie den Schritten unter Netzwerkausgang über private Konnektivität.
Erstellen Sie eine externe Zugriffsintegration mit der Netzwerkregel. Erteilen Sie der Rolle, die Aufträge einreichen wird, die Erlaubnis zur Verwendung von EAI.
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION private_feed_eai ALLOWED_NETWORK_RULES = (PRIVATE_FEED_NR) ENABLED = true; GRANT USAGE ON INTEGRATION private_feed_eai TO ROLE <role_name>;
Geben Sie die URL des privaten Feeds, die externe Zugriffsintegration und das/die Paket(e) an, wenn Sie den Job übermitteln.
# Option 1: Specify private feed URL in pip_requirements job = submit_file( "/path/to/script.py", compute_pool="MY_COMPUTE_POOL", stage_name="payload_stage", pip_requirements=[ "--index-url=https://your.private.feed.url", "internal-package==1.2.3" ], external_access_integrations=["PRIVATE_FEED_EAI"] )
# Option 2: Specify private feed URL by environment variable job = submit_directory( "/path/to/code/", compute_pool="MY_COMPUTE_POOL", entrypoint="script.py", stage_name="payload_stage", pip_requirements=["internal-package==1.2.3"], external_access_integrations=["PRIVATE_FEED_EAI"], env_vars={'PIP_INDEX_URL': 'https://your.private.feed.url'}, )
Wenn Ihre private Feed-URL sensible Informationen wie Authentifizierungstoken enthält, verwalten Sie die URL durch Erstellen eines Snowflake-Geheimnisses. Erstellen Sie ein Geheimnis mithilfe von CREATE SECRET. Konfigurieren Sie Geheimnisse während der Übermittlung des Jobs mit dem spec_overrides
-Argument.
# Create secret for private feed URL with embedded auth token
feed_url = "<your-repo>.jfrog.io/artifactory/api/pypi/test-pypi/simple"
user = "<auth_user>"
token = "<auth_token>"
session.sql(f"""
CREATE SECRET IF NOT EXISTS PRIVATE_FEED_URL_SECRET
TYPE = GENERIC_STRING
SECRET_STRING = 'https://{auth_user}:{auth_token}@{feed_url}'
""").collect()
# Prepare service spec override for mounting secret into job execution
spec_overrides = {
"spec": {
"containers": [
{
"name": "main", # Primary container name is always "main"
"secrets": [
{
"snowflakeSecret": "PRIVATE_FEED_URL_SECRET",
"envVarName": "PIP_INDEX_URL",
"secretKeyRef": "secret_string"
},
],
}
]
}
}
# Load private feed URL from secret (e.g. if URL includes auth token)
job = submit_file(
"/path/to/script.py",
compute_pool="MY_COMPUTE_POOL",
stage_name="payload_stage",
pip_requirements=[
"internal-package==1.2.3"
],
external_access_integrations=["PRIVATE_FEED_EAI"],
spec_overrides=spec_overrides,
)
Weitere Informationen über container.secrets
finden Sie unter Feld containers.secrets.
Beispiele¶
Siehe Codebeispiele für ML-Jobs; dort finden Sie Beispiele für die Verwendung von Snowflake ML-Jobs.
Hinweise zu Kosten¶
Snowflake ML Jobs laufen auf Snowpark Container Services und werden nach Verbrauch abgerechnet. Informationen zu den Rechenkosten finden Sie unter Kosten von Snowpark Container Services.
Die Nutzdaten des Jobs werden in den Stagingbereich hochgeladen, der mit dem Argument stage_name
angegeben wurde. Um zusätzliche Kosten zu vermeiden, müssen Sie diese bereinigen. Weitere Informationen finden Sie unter Erläuterungen zu den Speicherkosten und Untersuchen der Speicherkosten, um mehr über die mit dem Stagingbereich verbundenen Kosten zu erfahren.