Tutorial 2: Snowpark Container Services-Job erstellen¶
Wichtig
Das Job-Feature von Snowpark Container Services befindet sich derzeit in der privaten Vorschau und unterliegt den Nutzungsbedingungen für Vorschau-Features unter https://snowflake.com/legal. Weitere Informationen dazu erhalten Sie bei Ihrem Snowflake-Ansprechpartner.
Einführung¶
Nachdem Sie die grundlegende Tutorial-Einrichtung abgeschlossen haben, können Sie nun einen Job erstellen. In diesem Tutorial erstellen Sie einen einfachen Job, der eine Verbindung zu Snowflake herstellt, eine SQL-SELECT-Abfrage ausführt und das Ergebnis in einer Tabelle speichert.
Dieses Tutorial besteht aus zwei Teilen:
Teil 1: Job erstellen und testen. Sie laden den für dieses Tutorial bereitgestellten Code herunter und befolgen die schrittweise Anleitung:
Jobcode für dieses Tutorial herunterladen
Docker-Image für Snowpark Container Services erstellen und Image in Repository im eigenen Konto hochladen
Job-Spezifikationsdatei mit den Container-Konfigurationsinformationen für Snowflake im Stagingbereich bereitstellen. Neben dem Namen des Images, das zum Starten eines Containers verwendet werden soll, enthält die Spezifikationsdatei weitere Angaben:
Drei Argumente: die SELECT-Abfrage, das virtuelle Warehouse zum Ausführen der Abfrage und der Name der Tabelle, in der das Ergebnis gespeichert werden soll.
Das Warehouse, in dem die SELECT-Anweisung ausgeführt werden soll.
Job ausführen. Mit dem Befehl EXECUTE SERVICE können Sie den Job ausführen, indem Sie die Spezifikationsdatei und den Computepool angeben, in dem Snowflake den Container ausführen kann. Und abschließend überprüfen Sie die Ergebnisse des Jobs.
Teil 2: Erläuterungen zum Jobcode. Dieser Abschnitt bietet eine Übersicht zum Jobcode und zeigt auf, wie die verschiedenen Komponenten zusammenarbeiten.
1: Dienstcode herunterladen¶
Zum Erstellen eines Jobs wird ein Code (eine Python-Anwendung) bereitgestellt.
Laden Sie die ZIP-Datei in ein Verzeichnis herunter.
Entpacken Sie den Inhalt, der ein Verzeichnis für jedes Tutorial enthält. Das Verzeichnis
Tutorial-2
enthält die folgenden Dateien:main.py
Dockerfile
my_job_spec.yaml
2: Image erstellen und hochladen¶
Erstellen Sie ein Image für die linux/amd64-Plattform, die von Snowpark Container Services unterstützt wird, und laden Sie das Image dann in das Image-Repository in Ihrem Konto hoch (siehe Grundlegende Einrichtung).
Sie benötigen Informationen zum Repository (die Repository-URL und den Hostnamen der Registry), bevor Sie das Image erstellen und hochladen können. Weitere Informationen dazu finden Sie unter Registry und Repositorys.
Informationen zum Repository abrufen
Um die Repository-URL zu erhalten, führen Sie den SQL-Befehl SHOW IMAGE REPOSITORIES aus.
SHOW IMAGE REPOSITORIES;
Die URL ist in der Spalte
repository_url
der Ausgabe enthalten. Siehe folgendes Beispiel:<orgname>-<acctname>.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository
Der Hostname in der Repository-URL ist der Name des Registry-Hosts. Siehe folgendes Beispiel:
<orgname>-<acctname>.registry.snowflakecomputing.com
Image erstellen und in das Repository hochladen
Öffnen Sie ein Terminalfenster, und wechseln Sie in das Verzeichnis, das die entpackten Dateien enthält.
Um ein Docker-Image zu erstellen, führen Sie den folgenden Befehl
docker build
mithilfe der Docker-CLI aus. Beachten Sie, dass der Befehl das aktuelle Arbeitsverzeichnis (.) alsPATH
für die Dateien angibt, die für das Erstellen des Images verwendet werden sollen.docker build --rm --platform linux/amd64 -t <repository_url>/<image_name> .
Für
image_name
verwenden Siemy_job_image:latest
:
Beispiel
docker build --rm --platform linux/amd64 -t myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest .
Laden Sie das Image in das Repository in Ihrem Snowflake-Konto hoch. Damit Docker ein Image in Ihrem Namen in Ihr Repository hochladen kann, müssen Sie Docker zunächst mit Snowflake authentifizieren.
Um Docker bei der Snowflake-Registrierung zu authentifizieren, führen Sie den folgenden Befehl aus.
docker login <registry_hostname> -u <username>
Geben Sie dabei für
username
Ihren Snowflake-Benutzernamen an. Docker fordert Sie zur Eingabe Ihres Kennworts auf.
Führen Sie den folgenden Befehl aus, um das Image hochzuladen:
docker push <repository_url>/<image_name>
Beispiel
docker push myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest
3: Spezifikationsdatei im Stagingbereich bereitstellen¶
Um Ihre Jobspezifikationsdatei „my_job_spec.yaml“ in den Stagingbereich zu laden, verwenden Sie eine der folgenden Optionen:
Die Snowsight-Weboberfläche: Eine Anleitung dazu finden Sie unter Auswahl eines internen Stagingbereichs für lokale Dateien.
Die SnowSQL-CLI: Führen Sie den folgenden PUT-Befehl aus:
PUT file://<file-path>[/\]my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
Beispiel:
Linux oder macOS
PUT file:///tmp/my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
Windows
PUT file://C:\temp\my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
Sie können auch einen relativen Pfad angeben.
PUT file://./my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
Der Befehl setzt OVERWRITE=TRUE, sodass Sie die Datei bei Bedarf erneut hochladen können (z. B. wenn Sie einen Fehler in Ihrer Spezifikationsdatei behoben haben). Wenn der PUT-Befehl erfolgreich ausgeführt wurde, werden Informationen zu der hochgeladenen Datei ausgegeben.
4: Job ausführen¶
Jetzt sind Sie bereit, einen Job zu erstellen.
Um einen Job zu starten, führen Sie den Befehl EXECUTE SERVICE aus:
EXECUTE SERVICE IN COMPUTE POOL tutorial_compute_pool FROM @tutorial_stage SPEC='my_job_spec.yaml';
Beachten Sie Folgendes:
FROM und SPEC geben den Namen des Stagingbereichs bzw. den Namen der Jobspezifikationsdatei an. Wenn der Job ausgeführt wird, führt er die SQL-Anweisung aus und speichert das Ergebnis in einer Tabelle, wie in
my_job_spec.yaml
angegeben.Die SQL-Anweisung in Ihrem Job wird nicht innerhalb des Docker-Containers ausgeführt. Stattdessen stellt der ausgeführte Container eine Verbindung zu Snowflake her und führt die SQL-Anweisung in einem Snowflake-Warehouse aus.
COMPUTE_POOL stellt die Computeressourcen bereit, auf denen Snowflake den Job ausführt.
EXECUTE SERVICE gibt eine Ausgabe zurück, die die von Snowflake zugewiesene UUID des Jobs enthält, wie in der folgenden Beispielausgabe gezeigt:
+------------------------------------------------------------------------------------+ | status | -------------------------------------------------------------------------------------+ | Job 01af7ee6-0001-cb52-0020-c5870077223a completed successfully with status: DONE. | +------------------------------------------------------------------------------------+
Rufen Sie die ID der von Ihnen ausgeführten Abfrage ab (EXECUTE SERVICE ist eine Abfrage).
SET jobid = LAST_QUERY_ID();
Sie verwenden diese ID in den folgenden Schritten, um den Jobstatus und die Jobprotokollinformationen abzurufen.
Bemerkung
Es ist wichtig, dass Sie LAST_QUERY_ID unmittelbar nach dem Aufruf von EXECUTE SERVICE aufrufen, um sicherzustellen, dass die vom Befehl zurückgegebene Job-ID für den Befehl EXECUTE SERVICE bestimmt ist.
LAST_QUERY_ID gibt die Abfrage-ID eines Jobs erst nach dessen Beendigung zurück; für aktive Jobs mit langer Ausführungszeit ist sie nicht geeignet, um Statusinformationen in Echtzeit zu erhalten. Stattdessen müssen Sie die QUERY HISTORY-Tabellenfunktionen verwenden, um die Abfrage-ID des Jobs zu erhalten. Weitere Informationen dazu finden Sie unter Verwenden von Jobs.
Der Job führt eine einfache Abfrage aus und speichert das Ergebnis in der Ergebnistabelle. Sie können den erfolgreichen Abschluss des Jobs überprüfen, indem Sie die Ergebnistabelle abfragen:
SELECT * FROM results;
Beispielausgabe:
+----------+-----------+ | TIME | TEXT | |----------+-----------| | 10:56:52 | hello | +----------+-----------+
Wenn Sie die Ausführung Ihres Jobs debuggen möchten, verwenden Sie die Systemfunktionen. Verwenden Sie zum Beispiel SYSTEM$GET_JOB_STATUS, um festzustellen, ob der Job noch ausgeführt wird, ob er nicht gestartet werden konnte oder warum er fehlgeschlagen ist. Wenn Ihr Code nützliche Protokolleeinträge an die Standardausgabe oder den Standardfehler ausgibt, können Sie auch mit SYSTEM$GET_JOB_LOGS auf die Protokolle zugreifen.
Um den Jobstatus abzurufen, rufen Sie die Systemfunktion SYSTEM$GET_JOB_STATUS auf:
SELECT SYSTEM$GET_JOB_STATUS($jobid);
Beispielausgabe:
[ { "status":"DONE", "message":"Completed successfully", "containerName":"main", "instanceId":"0", "serviceName":"01af7ee6-0001-cb52-0020-c5870077223a", "image":"orgname-acctname.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest", "restartCount":0, "startTime":"" } ]
Da der Job keinen Namen hat, ist der Wert von
serviceName
in der Ausgabe die von Snowflake zugewiesene UUID (Abfrage-ID) des Jobs.Um die Jobprotokollinformationen zu erhalten, verwenden Sie die Systemfunktion SYSTEM$GET_JOB_LOGS:
SELECT SYSTEM$GET_JOB_LOGS($jobid, 'main')
job-tutorial - INFO - Job started job-tutorial - INFO - Connection succeeded. Current session context: database="TUTORIAL_DB", schema="DATA_SCHEMA", warehouse="TUTORIAL_WAREHOUSE", role="TEST_ROLE" job-tutorial - INFO - Executing query [select current_time() as time,'hello'] and writing result to table [results] job-tutorial - INFO - Job finished
5: Bereinigen¶
Wenn Sie nicht vorhaben, mit Tutorial 3 fortzufahren, sollten Sie die von Ihnen erstellten abrechenbaren Ressourcen entfernen. Weitere Informationen dazu finden Sie in Schritt 5 von Tutorial 3.
6: Erläuterungen zum Jobcode¶
In diesem Abschnitt werden die folgenden Themen behandelt:
Untersuchen der bereitgestellten Dateien: Untersuchen Sie die verschiedenen Codedateien, mit denen der Job implementiert wird.
Lokales Erstellen und Testen eines Images. In diesem Abschnitt wird erläutert, wie Sie das Docker-Image lokal testen können, bevor Sie es in ein Repository in Ihrem Snowflake-Konto hochladen.
Untersuchen der bereitgestellten Dateien¶
Die ZIP-Datei, die Sie zu Beginn des Tutorials heruntergeladen haben, enthält die folgenden Dateien:
main.py
Dockerfile
my_job_spec.yaml
Dieser Abschnitt bietet eine Übersicht darüber, wie der Code den Job implementiert.
Datei „main.py“¶
#!/opt/conda/bin/python3
import argparse
import logging
import os
import sys
from snowflake.snowpark import Session
from snowflake.snowpark.exceptions import *
# Environment variables below will be automatically populated by Snowflake.
SNOWFLAKE_ACCOUNT = os.getenv("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_HOST = os.getenv("SNOWFLAKE_HOST")
SNOWFLAKE_DATABASE = os.getenv("SNOWFLAKE_DATABASE")
SNOWFLAKE_SCHEMA = os.getenv("SNOWFLAKE_SCHEMA")
# Custom environment variables
SNOWFLAKE_USER = os.getenv("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = os.getenv("SNOWFLAKE_PASSWORD")
SNOWFLAKE_ROLE = os.getenv("SNOWFLAKE_ROLE")
SNOWFLAKE_WAREHOUSE = os.getenv("SNOWFLAKE_WAREHOUSE")
def get_arg_parser():
"""
Input argument list.
"""
parser = argparse.ArgumentParser()
parser.add_argument("--query", required=True, help="query text to execute")
parser.add_argument(
"--result_table",
required=True,
help="name of the table to store result of query specified by flag --query")
return parser
def get_logger():
"""
Get a logger for local logging.
"""
logger = logging.getLogger("job-tutorial")
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(name)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def get_login_token():
"""
Read the login token supplied automatically by Snowflake. These tokens
are short lived and should always be read right before creating any new connection.
"""
with open("/snowflake/session/token", "r") as f:
return f.read()
def get_connection_params():
"""
Construct Snowflake connection params from environment variables.
"""
if os.path.exists("/snowflake/session/token"):
return {
"account": SNOWFLAKE_ACCOUNT,
"host": SNOWFLAKE_HOST,
"authenticator": "oauth",
"token": get_login_token(),
"warehouse": SNOWFLAKE_WAREHOUSE,
"database": SNOWFLAKE_DATABASE,
"schema": SNOWFLAKE_SCHEMA
}
else:
return {
"account": SNOWFLAKE_ACCOUNT,
"host": SNOWFLAKE_HOST,
"user": SNOWFLAKE_USER,
"password": SNOWFLAKE_PASSWORD,
"role": SNOWFLAKE_ROLE,
"warehouse": SNOWFLAKE_WAREHOUSE,
"database": SNOWFLAKE_DATABASE,
"schema": SNOWFLAKE_SCHEMA
}
def run_job():
"""
Main body of this job.
"""
logger = get_logger()
logger.info("Job started")
# Parse input arguments
args = get_arg_parser().parse_args()
query = args.query
result_table = args.result_table
# Start a Snowflake session, run the query and write results to specified table
with Session.builder.configs(get_connection_params()).create() as session:
# Print out current session context information.
database = session.get_current_database()
schema = session.get_current_schema()
warehouse = session.get_current_warehouse()
role = session.get_current_role()
logger.info(
f"Connection succeeded. Current session context: database={database}, schema={schema}, warehouse={warehouse}, role={role}"
)
# Execute query and persist results in a table.
logger.info(
f"Executing query [{query}] and writing result to table [{result_table}]"
)
res = session.sql(query)
# If the table already exists, the query result must match the table scheme.
# If the table does not exist, this will create a new table.
res.write.mode("append").save_as_table(result_table)
logger.info("Job finished")
if __name__ == "__main__":
run_job()
Erläuterungen zum Code:
Der Python-Code wird bei
main
ausgeführt, wodurch er dann die Funktionrun_job()
ausführt:if __name__ == "__main__": run_job()
Die Funktion
run_job()
liest die Umgebungsvariablen und verwendet sie, um Standardwerte für verschiedene Parameter festzulegen. Der Container verwendet diese Parameter für die Verbindung zu Snowflake. Beachten Sie Folgendes:Sie können diese Standardparameterwerte überschreiben. Weitere Informationen dazu finden Sie unter Referenz der Dienstspezifikation.
Wenn das Image in Snowflake ausgeführt wird, spezifiziert Snowflake einige dieser Parameter (siehe Quellcode) automatisch. Wenn Sie das Image jedoch lokal testen, müssen Sie diese Parameter ausdrücklich bereitstellen (wie im nächsten Abschnitt Lokales Erstellen und Testen eines Images gezeigt).
Datei „Dockerfile“¶
Diese Datei enthält alle Befehle, um ein Image mit Docker zu erstellen.
ARG BASE_IMAGE=continuumio/miniconda3:4.12.0
FROM $BASE_IMAGE
RUN conda install python=3.8 && \
conda install snowflake-snowpark-python
COPY main.py ./
ENTRYPOINT ["python3", "main.py"]
Datei „my_job_spec.yaml“ (Jobspezifikation)¶
Snowflake verwendet die von Ihnen in dieser Spezifikation angegebenen Informationen zur Konfiguration und Ausführung Ihres Jobs.
spec:
container:
- name: main
image: /tutorial_db/data_schema/tutorial_repository/my_job_image:latest
env:
SNOWFLAKE_WAREHOUSE: tutorial_warehouse
args:
- "--query=select current_time() as time,'hello'"
- "--result_table=results"
Zusätzlich zu den Pflichtfeldern container.name
und container.image
(siehe Referenz der Dienstspezifikation) enthält die Spezifikation das optionale Feld container.args
zur Auflistung der Argumente:
--query
liefert die Abfrage, die bei der Ausführung des Jobs ausgeführt werden soll.--result_table
identifiziert die Tabelle, in der die Abfrageergebnisse gespeichert werden sollen.
Lokales Erstellen und Testen eines Images¶
Sie können das Docker-Image lokal testen, bevor Sie es in ein Repository Ihres Snowflake-Kontos hochladen. Bei lokalen Tests wird Ihr Container eigenständig ausgeführt (er ist kein Job, der von Snowflake ausgeführt wird).
Führen Sie die folgenden Schritte aus, um das Docker-Image von Tutorial 2 zu testen:
Um ein Docker-Image zu erstellen, führen Sie über die Docker-CLI den Befehl
docker build
aus:docker build --rm -t my_service:local .
Um Ihren Code zu starten, führen Sie den Befehl
docker run
aus und geben dabei<orgname>-<acctname>
,<username>
und<password>
) an:docker run --rm \ -e SNOWFLAKE_ACCOUNT=<orgname>-<acctname> \ -e SNOWFLAKE_HOST=<orgname>-<acctname>.snowflakecomputing.com \ -e SNOWFLAKE_DATABASE=tutorial_db \ -e SNOWFLAKE_SCHEMA=data_schema \ -e SNOWFLAKE_ROLE=test_role \ -e SNOWFLAKE_USER=<username> \ -e SNOWFLAKE_PASSWORD=<password> \ -e SNOWFLAKE_WAREHOUSE=tutorial_warehouse \ my_job:local \ --query="select current_time() as time,'hello'" \ --result_table=tutorial_db.data_schema.results
Wenn Sie das Image lokal testen, beachten Sie, dass Sie zusätzlich zu den drei Argumenten (die Abfrage, das Warehouse, in dem die Abfrage ausgeführt wird, und die Tabelle, in der das Ergebnis gespeichert wird) auch die Verbindungsparameter für den lokal ausgeführten Container angeben müssen, um eine Verbindung zu Snowflake herzustellen.
Wenn Sie den Container als Job ausführen, stellt Snowflake dem Container diese Parameter als Umgebungsvariablen zur Verfügung. Weitere Informationen dazu finden Sie unter Snowflake-Client konfigurieren.
Der Job führt die Abfrage (
select current_time() as time,'hello'
) aus und schreibt das Ergebnis in die Tabelle (tutorial_db.data_schema.results
). Wenn die Tabelle nicht vorhanden ist, wird sie erstellt. Wenn die Tabelle vorhanden ist, fügt der Job eine Zeile hinzu.Beispielergebnis für das Abfragen der Ergebnistabelle:
+----------+----------+ | TIME | TEXT | |----------+----------| | 10:56:52 | hello | +----------+----------+
Nächste Schritte¶
Sie können nun mit Tutorial 3 fortfahren, in dem gezeigt wird, wie die Kommunikation von Dienst zu Dienst funktioniert.