Tutorial 2: Snowpark Container Services-Jobdienst erstellen¶
Einführung¶
Nachdem Sie die grundlegende Tutorial-Einrichtung abgeschlossen haben, können Sie nun einen Jobdienst erstellen. In diesem Tutorial erstellen Sie einen einfachen Jobdienst, der eine Verbindung zu Snowflake herstellt, eine SQL-SELECT-Abfrage ausführt und das Ergebnis in einer Tabelle speichert.
Dieses Tutorial besteht aus zwei Teilen:
Teil 1: Jobdienst erstellen und testen. Sie laden den für dieses Tutorial bereitgestellten Code herunter und befolgen die schrittweise Anleitung:
Jobdienstcode für dieses Tutorial herunterladen
Docker-Image für Snowpark Container Services erstellen und Image in Repository im eigenen Konto hochladen
Dienstspezifikationsdatei mit den Container-Konfigurationsinformationen für Snowflake im Stagingbereich bereitstellen. Zusätzlich zum Namen des Images, das zum Starten des Containers verwendet wird, gibt die Spezifikationsdatei drei Argumente an: die SELECT-Abfrage, das virtuelle Warehouse zum Ausführen der Abfrage und den Namen der Tabelle, in der das Ergebnis gespeichert werden soll.
Jobdienst ausführen. Mit dem Befehl EXECUTE JOB SERVICE können Sie den Jobdienst ausführen, indem Sie die Spezifikationsdatei und den Computepool angeben, in dem Snowflake den Container ausführen kann. Ergebnisse des Dienstes überprüfen.
Teil 2: Erläuterungen zum Jobdienstcode. Dieser Abschnitt bietet eine Übersicht zum Jobdienstcode und zeigt auf, wie die verschiedenen Komponenten zusammenarbeiten.
1: Jobdienstcode herunterladen¶
Zum Implementieren eines Jobdienstes wird ein Code (eine Python-Anwendung) bereitgestellt.
Laden Sie
SnowparkContainerServices-Tutorials.zip
herunter.Entpacken Sie den Inhalt, der ein Verzeichnis für jedes Tutorial enthält. Das Verzeichnis
Tutorial-2
enthält die folgenden Dateien:main.py
Dockerfile
my_job_spec.yaml
2: Image erstellen und hochladen¶
Erstellen Sie ein Image für die linux/amd64-Plattform, die von Snowpark Container Services unterstützt wird, und laden Sie das Image dann in das Image-Repository in Ihrem Konto hoch (siehe Grundlegende Einrichtung).
Sie benötigen Informationen zum Repository (die Repository-URL und den Hostnamen der Registry), bevor Sie das Image erstellen und hochladen können. Weitere Informationen dazu finden Sie unter Registry und Repositorys.
Informationen zum Repository abrufen
Um die Repository-URL zu erhalten, führen Sie den SQL-Befehl SHOW IMAGE REPOSITORIES aus.
SHOW IMAGE REPOSITORIES;
Die URL ist in der Spalte
repository_url
der Ausgabe enthalten. Siehe folgendes Beispiel:<orgname>-<acctname>.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository
Der Hostname in der Repository-URL ist der Name des Registry-Hosts. Siehe folgendes Beispiel:
<orgname>-<acctname>.registry.snowflakecomputing.com
Image erstellen und in das Repository hochladen
Öffnen Sie ein Terminalfenster, und wechseln Sie in das Verzeichnis, das die entpackten Dateien enthält.
Um ein Docker-Image zu erstellen, führen Sie den folgenden Befehl
docker build
mithilfe der Docker-CLI aus. Beachten Sie, dass der Befehl das aktuelle Arbeitsverzeichnis (.) alsPATH
für die Dateien angibt, die für das Erstellen des Images verwendet werden sollen.docker build --rm --platform linux/amd64 -t <repository_url>/<image_name> .
Für
image_name
verwenden Siemy_job_image:latest
:
Beispiel
docker build --rm --platform linux/amd64 -t myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest .
Laden Sie das Image in das Repository in Ihrem Snowflake-Konto hoch. Damit Docker ein Image in Ihrem Namen in Ihr Repository hochladen kann, müssen Sie zunächst Docker mit der Registry authentifizieren.
Um Docker bei der Snowflake-Registrierung zu authentifizieren, führen Sie den folgenden Befehl aus.
docker login <registry_hostname> -u <username>
Geben Sie dabei für
username
Ihren Snowflake-Benutzernamen an. Docker fordert Sie zur Eingabe Ihres Kennworts auf.
Führen Sie den folgenden Befehl aus, um das Image hochzuladen:
docker push <repository_url>/<image_name>
Beispiel
docker push myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest
3: Spezifikationsdatei im Stagingbereich bereitstellen¶
Um Ihre Dienstspezifikationsdatei (
my_job_spec.yaml
) in den Stagingbereich hochzuladen, verwenden Sie eine der folgenden Optionen:Die Snowsight-Weboberfläche: Eine Anleitung dazu finden Sie unter Auswahl eines internen Stagingbereichs für lokale Dateien.
Die SnowSQL-CLI: Führen Sie den folgenden PUT-Befehl aus:
PUT file://<file-path>[/\]my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
Beispiel:
Linux oder macOS
PUT file:///tmp/my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
Windows
PUT file://C:\temp\my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
Sie können auch einen relativen Pfad angeben.
PUT file://./my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
Der Befehl setzt OVERWRITE=TRUE, sodass Sie die Datei bei Bedarf erneut hochladen können (z. B. wenn Sie einen Fehler in Ihrer Spezifikationsdatei behoben haben). Wenn der PUT-Befehl erfolgreich ausgeführt wurde, werden Informationen zu der hochgeladenen Datei ausgegeben.
4: Jobdienst ausführen¶
Jetzt sind Sie bereit, einen Job zu erstellen.
Um einen Jobdienst zu starten, führen Sie den Befehl EXECUTE JOB SERVICE aus:
EXECUTE JOB SERVICE IN COMPUTE POOL tutorial_compute_pool NAME=tutorial_2_job_service FROM @tutorial_stage SPEC='my_job_spec.yaml';
Beachten Sie Folgendes:
FROM und SPEC geben den Namen des Stagingbereichs bzw. den Namen der Dienstspezifikationsdatei des Jobdienstes an. Wenn der Jobdienst ausgeführt wird, führt er die SQL-Anweisung aus und speichert das Ergebnis in einer Tabelle, wie in
my_job_spec.yaml
angegeben.Die SQL-Anweisung wird nicht innerhalb des Docker-Containers ausgeführt. Stattdessen stellt der ausgeführte Container eine Verbindung zu Snowflake her und führt die SQL-Anweisung in einem Snowflake-Warehouse aus.
COMPUTE_POOL stellt die Computeressourcen bereit, auf denen Snowflake den Jobdienst ausführt.
EXECUTE JOB SERVICE gibt eine Ausgabe zurück, die den Jobnamen enthält, wie in der folgenden Beispielausgabe gezeigt:
+------------------------------------------------------------------------------------+ | status | -------------------------------------------------------------------------------------+ | Job TUTORIAL_2_JOB_SERVICE completed successfully with status: DONE. | +------------------------------------------------------------------------------------+
Der Jobdienst führt eine einfache Abfrage aus und speichert das Ergebnis in der Ergebnistabelle. Sie können den erfolgreichen Abschluss des Jobdienstes überprüfen, indem Sie die Ergebnistabelle abfragen:
SELECT * FROM results;
Beispielausgabe:
+----------+-----------+ | TIME | TEXT | |----------+-----------| | 10:56:52 | hello | +----------+-----------+
Wenn Sie die Ausführung Ihres Jobdienstes debuggen möchten, verwenden Sie die Systemfunktionen. Verwenden Sie zum Beispiel SYSTEM$GET_SERVICE_STATUS, um festzustellen, ob der Jobdienst noch ausgeführt wird, ob er nicht gestartet werden konnte oder warum er fehlgeschlagen ist. Wenn Ihr Code nützliche Protokolleeinträge an die Standardausgabe oder den Standardfehler ausgibt, können Sie auch mit SYSTEM$GET_SERVICE_LOGS auf die Protokolle zugreifen.
Um den Status des Jobdienstes abzurufen, rufen Sie die Systemfunktion SYSTEM$GET_SERVICE_STATUS – Veraltet auf:
SELECT SYSTEM$GET_SERVICE_STATUS('tutorial_2_job_service');
Beispielausgabe:
[ { "status":"DONE", "message":"Completed successfully", "containerName":"main", "instanceId":"0", "serviceName":"TUTORIAL_2_JOB_SERVICE", "image":"orgname-acctname.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest", "restartCount":0, "startTime":"" } ]
Um die Potokollinformationen zum Jobdienst zu erhalten, verwenden Sie die Systemfunktion SYSTEM$GET_SERVICE_LOGS:
SELECT SYSTEM$GET_SERVICE_LOGS('tutorial_2_job_service', 0, 'main')
job-tutorial - INFO - Job started job-tutorial - INFO - Connection succeeded. Current session context: database="TUTORIAL_DB", schema="DATA_SCHEMA", warehouse="TUTORIAL_WAREHOUSE", role="TEST_ROLE" job-tutorial - INFO - Executing query [select current_time() as time,'hello'] and writing result to table [results] job-tutorial - INFO - Job finished
5: Bereinigen¶
Wenn Sie nicht vorhaben, mit Tutorial 3 fortzufahren, sollten Sie die von Ihnen erstellten abrechenbaren Ressourcen entfernen. Weitere Informationen dazu finden Sie in Schritt 5 von Tutorial 3.
6: Überprüfen des Jobdienstcodes¶
In diesem Abschnitt werden die folgenden Themen behandelt:
Untersuchen der bereitgestellten Dateien: Untersuchen Sie die verschiedenen Codedateien, mit denen der Jobdienst implementiert wird.
Lokales Erstellen und Testen eines Images. In diesem Abschnitt wird erläutert, wie Sie das Docker-Image lokal testen können, bevor Sie es in ein Repository in Ihrem Snowflake-Konto hochladen.
Untersuchen der bereitgestellten Dateien¶
Die ZIP-Datei, die Sie zu Beginn des Tutorials heruntergeladen haben, enthält die folgenden Dateien:
main.py
Dockerfile
my_job_spec.yaml
Dieser Abschnitt bietet eine Übersicht zum Code.
Datei „main.py“¶
#!/opt/conda/bin/python3
import argparse
import logging
import os
import sys
from snowflake.snowpark import Session
from snowflake.snowpark.exceptions import *
# Environment variables below will be automatically populated by Snowflake.
SNOWFLAKE_ACCOUNT = os.getenv("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_HOST = os.getenv("SNOWFLAKE_HOST")
SNOWFLAKE_DATABASE = os.getenv("SNOWFLAKE_DATABASE")
SNOWFLAKE_SCHEMA = os.getenv("SNOWFLAKE_SCHEMA")
# Custom environment variables
SNOWFLAKE_USER = os.getenv("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = os.getenv("SNOWFLAKE_PASSWORD")
SNOWFLAKE_ROLE = os.getenv("SNOWFLAKE_ROLE")
SNOWFLAKE_WAREHOUSE = os.getenv("SNOWFLAKE_WAREHOUSE")
def get_arg_parser():
"""
Input argument list.
"""
parser = argparse.ArgumentParser()
parser.add_argument("--query", required=True, help="query text to execute")
parser.add_argument(
"--result_table",
required=True,
help="name of the table to store result of query specified by flag --query")
return parser
def get_logger():
"""
Get a logger for local logging.
"""
logger = logging.getLogger("job-tutorial")
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(name)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def get_login_token():
"""
Read the login token supplied automatically by Snowflake. These tokens
are short lived and should always be read right before creating any new connection.
"""
with open("/snowflake/session/token", "r") as f:
return f.read()
def get_connection_params():
"""
Construct Snowflake connection params from environment variables.
"""
if os.path.exists("/snowflake/session/token"):
return {
"account": SNOWFLAKE_ACCOUNT,
"host": SNOWFLAKE_HOST,
"authenticator": "oauth",
"token": get_login_token(),
"warehouse": SNOWFLAKE_WAREHOUSE,
"database": SNOWFLAKE_DATABASE,
"schema": SNOWFLAKE_SCHEMA
}
else:
return {
"account": SNOWFLAKE_ACCOUNT,
"host": SNOWFLAKE_HOST,
"user": SNOWFLAKE_USER,
"password": SNOWFLAKE_PASSWORD,
"role": SNOWFLAKE_ROLE,
"warehouse": SNOWFLAKE_WAREHOUSE,
"database": SNOWFLAKE_DATABASE,
"schema": SNOWFLAKE_SCHEMA
}
def run_job():
"""
Main body of this job.
"""
logger = get_logger()
logger.info("Job started")
# Parse input arguments
args = get_arg_parser().parse_args()
query = args.query
result_table = args.result_table
# Start a Snowflake session, run the query and write results to specified table
with Session.builder.configs(get_connection_params()).create() as session:
# Print out current session context information.
database = session.get_current_database()
schema = session.get_current_schema()
warehouse = session.get_current_warehouse()
role = session.get_current_role()
logger.info(
f"Connection succeeded. Current session context: database={database}, schema={schema}, warehouse={warehouse}, role={role}"
)
# Execute query and persist results in a table.
logger.info(
f"Executing query [{query}] and writing result to table [{result_table}]"
)
res = session.sql(query)
# If the table already exists, the query result must match the table scheme.
# If the table does not exist, this will create a new table.
res.write.mode("append").save_as_table(result_table)
logger.info("Job finished")
if __name__ == "__main__":
run_job()
Erläuterungen zum Code:
Der Python-Code wird bei
main
ausgeführt, wodurch er dann die Funktionrun_job()
ausführt:if __name__ == "__main__": run_job()
Die Funktion
run_job()
liest die Umgebungsvariablen und verwendet sie, um Standardwerte für verschiedene Parameter festzulegen. Der Container verwendet diese Parameter für die Verbindung zu Snowflake. Beachten Sie Folgendes:Sie können die im Dienst verwendeten Parameterwerte überschreiben, indem Sie die Felder
containers.env
undcontainers.args
in der Dienstspezifikation verwenden. Weitere Informationen dazu finden Sie unter Referenz der Dienstspezifikation.Wenn das Image in Snowflake ausgeführt wird, spezifiziert Snowflake einige dieser Parameter (siehe Quellcode) automatisch. Wenn Sie das Image jedoch lokal testen, müssen Sie diese Parameter ausdrücklich bereitstellen (wie im nächsten Abschnitt Lokales Erstellen und Testen eines Images gezeigt).
Datei „Dockerfile“¶
Diese Datei enthält alle Befehle, um ein Image mit Docker zu erstellen.
ARG BASE_IMAGE=continuumio/miniconda3:4.12.0
FROM $BASE_IMAGE
RUN conda install python=3.8 && \
conda install snowflake-snowpark-python
COPY main.py ./
ENTRYPOINT ["python3", "main.py"]
Datei my_job_spec.yaml (Dienstspezifikation)¶
Snowflake verwendet die von Ihnen in dieser Spezifikation angegebenen Informationen zur Konfiguration und Ausführung Ihres Jobdienstes.
spec:
containers:
- name: main
image: /tutorial_db/data_schema/tutorial_repository/my_job_image:latest
env:
SNOWFLAKE_WAREHOUSE: tutorial_warehouse
args:
- "--query=select current_time() as time,'hello'"
- "--result_table=results"
Zusätzlich zu den Pflichtfeldern container.name
und container.image
(siehe Referenz der Dienstspezifikation) enthält die Spezifikation das optionale Feld container.args
zur Auflistung der Argumente:
--query
liefert die Abfrage, die bei der Ausführung des Jobdienstes ausgeführt werden soll.--result_table
identifiziert die Tabelle, in der die Abfrageergebnisse gespeichert werden sollen.
Lokales Erstellen und Testen eines Images¶
Sie können das Docker-Image lokal testen, bevor Sie es in ein Repository Ihres Snowflake-Kontos hochladen. Bei lokalen Tests wird Ihr Container eigenständig ausgeführt (er ist kein Jobdienst, der von Snowflake ausgeführt wird).
Führen Sie die folgenden Schritte aus, um das Docker-Image von Tutorial 2 zu testen:
Um ein Docker-Image zu erstellen, führen Sie über die Docker-CLI den Befehl
docker build
aus:docker build --rm -t my_service:local .
Um Ihren Code zu starten, führen Sie den Befehl
docker run
aus und geben dabei<orgname>-<acctname>
,<username>
und<password>
an:docker run --rm \ -e SNOWFLAKE_ACCOUNT=<orgname>-<acctname> \ -e SNOWFLAKE_HOST=<orgname>-<acctname>.snowflakecomputing.com \ -e SNOWFLAKE_DATABASE=tutorial_db \ -e SNOWFLAKE_SCHEMA=data_schema \ -e SNOWFLAKE_ROLE=test_role \ -e SNOWFLAKE_USER=<username> \ -e SNOWFLAKE_PASSWORD=<password> \ -e SNOWFLAKE_WAREHOUSE=tutorial_warehouse \ my_job:local \ --query="select current_time() as time,'hello'" \ --result_table=tutorial_db.data_schema.results
Wenn Sie das Image lokal testen, beachten Sie, dass Sie zusätzlich zu den drei Argumenten (die Abfrage, das Warehouse, in dem die Abfrage ausgeführt wird, und die Tabelle, in der das Ergebnis gespeichert wird) auch die Verbindungsparameter für den lokal ausgeführten Container angeben müssen, um eine Verbindung zu Snowflake herzustellen.
Wenn Sie den Container als Dienst ausführen, stellt Snowflake dem Container diese Parameter als Umgebungsvariablen zur Verfügung. Weitere Informationen dazu finden Sie unter Snowflake-Client konfigurieren.
Der Jobdienst führt die Abfrage (
select current_time() as time,'hello'
) aus und schreibt das Ergebnis in die Tabelle (tutorial_db.data_schema.results
). Wenn die Tabelle nicht vorhanden ist, wird sie erstellt. Wenn die Tabelle vorhanden ist, fügt der Jobdienst eine Zeile hinzu.Beispielergebnis für das Abfragen der Ergebnistabelle:
+----------+----------+ | TIME | TEXT | |----------+----------| | 10:56:52 | hello | +----------+----------+
Nächste Schritte¶
Sie können nun mit Tutorial 3 fortfahren, in dem gezeigt wird, wie die Kommunikation von Dienst zu Dienst funktioniert.