Tutorial 2: Snowpark Container Services-Jobdienst erstellen

Einführung

Nachdem Sie die grundlegende Tutorial-Einrichtung abgeschlossen haben, können Sie nun einen Jobdienst erstellen. In diesem Tutorial erstellen Sie einen einfachen Jobdienst, der eine Verbindung zu Snowflake herstellt, eine SQL-SELECT-Abfrage ausführt und das Ergebnis in einer Tabelle speichert.

Dieses Tutorial besteht aus zwei Teilen:

Teil 1: Jobdienst erstellen und testen. Sie laden den für dieses Tutorial bereitgestellten Code herunter und befolgen die schrittweise Anleitung:

  1. Jobdienstcode für dieses Tutorial herunterladen

  2. Docker-Image für Snowpark Container Services erstellen und Image in Repository im eigenen Konto hochladen

  3. Dienstspezifikationsdatei mit den Container-Konfigurationsinformationen für Snowflake im Stagingbereich bereitstellen. Zusätzlich zum Namen des Images, das zum Starten des Containers verwendet wird, gibt die Spezifikationsdatei drei Argumente an: die SELECT-Abfrage, das virtuelle Warehouse zum Ausführen der Abfrage und den Namen der Tabelle, in der das Ergebnis gespeichert werden soll.

  4. Jobdienst ausführen. Mit dem Befehl EXECUTE JOB SERVICE können Sie den Jobdienst ausführen, indem Sie die Spezifikationsdatei und den Computepool angeben, in dem Snowflake den Container ausführen kann. Ergebnisse des Dienstes überprüfen.

Teil 2: Erläuterungen zum Jobdienstcode. Dieser Abschnitt bietet eine Übersicht zum Jobdienstcode und zeigt auf, wie die verschiedenen Komponenten zusammenarbeiten.

1: Jobdienstcode herunterladen

Zum Implementieren eines Jobdienstes wird ein Code (eine Python-Anwendung) bereitgestellt.

  1. Laden Sie SnowparkContainerServices-Tutorials.zip herunter.

  2. Entpacken Sie den Inhalt, der ein Verzeichnis für jedes Tutorial enthält. Das Verzeichnis Tutorial-2 enthält die folgenden Dateien:

    • main.py

    • Dockerfile

    • my_job_spec.yaml

2: Image erstellen und hochladen

Erstellen Sie ein Image für die linux/amd64-Plattform, die von Snowpark Container Services unterstützt wird, und laden Sie das Image dann in das Image-Repository in Ihrem Konto hoch (siehe Grundlegende Einrichtung).

Sie benötigen Informationen zum Repository (die Repository-URL und den Hostnamen der Registry), bevor Sie das Image erstellen und hochladen können. Weitere Informationen dazu finden Sie unter Registry und Repositorys.

Informationen zum Repository abrufen

  1. Um die Repository-URL zu erhalten, führen Sie den SQL-Befehl SHOW IMAGE REPOSITORIES aus.

    SHOW IMAGE REPOSITORIES;
    
    Copy
    • Die URL ist in der Spalte repository_url der Ausgabe enthalten. Siehe folgendes Beispiel:

      <orgname>-<acctname>.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository
      
    • Der Hostname in der Repository-URL ist der Name des Registry-Hosts. Siehe folgendes Beispiel:

      <orgname>-<acctname>.registry.snowflakecomputing.com
      

Image erstellen und in das Repository hochladen

  1. Öffnen Sie ein Terminalfenster, und wechseln Sie in das Verzeichnis, das die entpackten Dateien enthält.

  2. Um ein Docker-Image zu erstellen, führen Sie den folgenden Befehl docker build mithilfe der Docker-CLI aus. Beachten Sie, dass der Befehl das aktuelle Arbeitsverzeichnis (.) als PATH für die Dateien angibt, die für das Erstellen des Images verwendet werden sollen.

    docker build --rm --platform linux/amd64 -t <repository_url>/<image_name> .
    
    Copy
    • Für image_name verwenden Sie my_job_image:latest:

    Beispiel

    docker build --rm --platform linux/amd64 -t myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest .
    
    Copy
  3. Laden Sie das Image in das Repository in Ihrem Snowflake-Konto hoch. Damit Docker ein Image in Ihrem Namen in Ihr Repository hochladen kann, müssen Sie zunächst Docker mit der Registry authentifizieren.

    1. Um Docker bei der Snowflake-Registrierung zu authentifizieren, führen Sie den folgenden Befehl aus.

      docker login <registry_hostname> -u <username>
      
      Copy
      • Geben Sie dabei für username Ihren Snowflake-Benutzernamen an. Docker fordert Sie zur Eingabe Ihres Kennworts auf.

    2. Führen Sie den folgenden Befehl aus, um das Image hochzuladen:

      docker push <repository_url>/<image_name>
      
      Copy

      Beispiel

      docker push myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest
      
      Copy

3: Spezifikationsdatei im Stagingbereich bereitstellen

  • Um Ihre Dienstspezifikationsdatei (my_job_spec.yaml) in den Stagingbereich hochzuladen, verwenden Sie eine der folgenden Optionen:

    • Die Snowsight-Weboberfläche: Eine Anleitung dazu finden Sie unter Auswahl eines internen Stagingbereichs für lokale Dateien.

    • Die SnowSQL-CLI: Führen Sie den folgenden PUT-Befehl aus:

      PUT file://<file-path>[/\]my_job_spec.yaml @tutorial_stage
        AUTO_COMPRESS=FALSE
        OVERWRITE=TRUE;
      
      Copy

      Beispiel:

      • Linux oder macOS

        PUT file:///tmp/my_job_spec.yaml @tutorial_stage
          AUTO_COMPRESS=FALSE
          OVERWRITE=TRUE;
        
        Copy
      • Windows

        PUT file://C:\temp\my_job_spec.yaml @tutorial_stage
          AUTO_COMPRESS=FALSE
          OVERWRITE=TRUE;
        
        Copy

      Sie können auch einen relativen Pfad angeben.

      PUT file://./my_job_spec.yaml @tutorial_stage
        AUTO_COMPRESS=FALSE
        OVERWRITE=TRUE;
      
      Copy

      Der Befehl setzt OVERWRITE=TRUE, sodass Sie die Datei bei Bedarf erneut hochladen können (z. B. wenn Sie einen Fehler in Ihrer Spezifikationsdatei behoben haben). Wenn der PUT-Befehl erfolgreich ausgeführt wurde, werden Informationen zu der hochgeladenen Datei ausgegeben.

4: Jobdienst ausführen

Jetzt sind Sie bereit, einen Job zu erstellen.

  1. Um einen Jobdienst zu starten, führen Sie den Befehl EXECUTE JOB SERVICE aus:

    EXECUTE JOB SERVICE
      IN COMPUTE POOL tutorial_compute_pool
      NAME=tutorial_2_job_service
      FROM @tutorial_stage
      SPEC='my_job_spec.yaml';
    
    Copy

    Beachten Sie Folgendes:

    • FROM und SPEC geben den Namen des Stagingbereichs bzw. den Namen der Dienstspezifikationsdatei des Jobdienstes an. Wenn der Jobdienst ausgeführt wird, führt er die SQL-Anweisung aus und speichert das Ergebnis in einer Tabelle, wie in my_job_spec.yaml angegeben.

      Die SQL-Anweisung wird nicht innerhalb des Docker-Containers ausgeführt. Stattdessen stellt der ausgeführte Container eine Verbindung zu Snowflake her und führt die SQL-Anweisung in einem Snowflake-Warehouse aus.

    • COMPUTE_POOL stellt die Computeressourcen bereit, auf denen Snowflake den Jobdienst ausführt.

    • EXECUTE JOB SERVICE gibt eine Ausgabe zurück, die den Jobnamen enthält, wie in der folgenden Beispielausgabe gezeigt:

      +------------------------------------------------------------------------------------+
      |                      status                                                        |
      -------------------------------------------------------------------------------------+
      | Job TUTORIAL_2_JOB_SERVICE completed successfully with status: DONE.               |
      +------------------------------------------------------------------------------------+
      
  2. Der Jobdienst führt eine einfache Abfrage aus und speichert das Ergebnis in der Ergebnistabelle. Sie können den erfolgreichen Abschluss des Jobdienstes überprüfen, indem Sie die Ergebnistabelle abfragen:

    SELECT * FROM results;
    
    Copy

    Beispielausgabe:

    +----------+-----------+
    | TIME     | TEXT      |
    |----------+-----------|
    | 10:56:52 | hello     |
    +----------+-----------+
    
  3. Wenn Sie die Ausführung Ihres Jobdienstes debuggen möchten, verwenden Sie die Systemfunktionen. Verwenden Sie zum Beispiel SYSTEM$GET_SERVICE_STATUS, um festzustellen, ob der Jobdienst noch ausgeführt wird, ob er nicht gestartet werden konnte oder warum er fehlgeschlagen ist. Wenn Ihr Code nützliche Protokolleeinträge an die Standardausgabe oder den Standardfehler ausgibt, können Sie auch mit SYSTEM$GET_SERVICE_LOGS auf die Protokolle zugreifen.

    1. Um den Status des Jobdienstes abzurufen, rufen Sie die Systemfunktion SYSTEM$GET_SERVICE_STATUS – Veraltet auf:

      SELECT SYSTEM$GET_SERVICE_STATUS('tutorial_2_job_service');
      
      Copy

      Beispielausgabe:

      [
        {
            "status":"DONE",
            "message":"Completed successfully",
            "containerName":"main",
            "instanceId":"0",
            "serviceName":"TUTORIAL_2_JOB_SERVICE",
            "image":"orgname-acctname.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest",
            "restartCount":0,
            "startTime":""
        }
      ]
      
      Copy
    2. Um die Potokollinformationen zum Jobdienst zu erhalten, verwenden Sie die Systemfunktion SYSTEM$GET_SERVICE_LOGS:

      SELECT SYSTEM$GET_SERVICE_LOGS('tutorial_2_job_service', 0, 'main')
      
      Copy
      job-tutorial - INFO - Job started
      job-tutorial - INFO - Connection succeeded. Current session context: database="TUTORIAL_DB", schema="DATA_SCHEMA", warehouse="TUTORIAL_WAREHOUSE", role="TEST_ROLE"
      job-tutorial - INFO - Executing query [select current_time() as time,'hello'] and writing result to table [results]
      job-tutorial - INFO - Job finished
      

5: Bereinigen

Wenn Sie nicht vorhaben, mit Tutorial 3 fortzufahren, sollten Sie die von Ihnen erstellten abrechenbaren Ressourcen entfernen. Weitere Informationen dazu finden Sie in Schritt 5 von Tutorial 3.

6: Überprüfen des Jobdienstcodes

In diesem Abschnitt werden die folgenden Themen behandelt:

Untersuchen der bereitgestellten Dateien

Die ZIP-Datei, die Sie zu Beginn des Tutorials heruntergeladen haben, enthält die folgenden Dateien:

  • main.py

  • Dockerfile

  • my_job_spec.yaml

Dieser Abschnitt bietet eine Übersicht zum Code.

Datei „main.py“

#!/opt/conda/bin/python3

import argparse
import logging
import os
import sys

from snowflake.snowpark import Session
from snowflake.snowpark.exceptions import *

# Environment variables below will be automatically populated by Snowflake.
SNOWFLAKE_ACCOUNT = os.getenv("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_HOST = os.getenv("SNOWFLAKE_HOST")
SNOWFLAKE_DATABASE = os.getenv("SNOWFLAKE_DATABASE")
SNOWFLAKE_SCHEMA = os.getenv("SNOWFLAKE_SCHEMA")

# Custom environment variables
SNOWFLAKE_USER = os.getenv("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = os.getenv("SNOWFLAKE_PASSWORD")
SNOWFLAKE_ROLE = os.getenv("SNOWFLAKE_ROLE")
SNOWFLAKE_WAREHOUSE = os.getenv("SNOWFLAKE_WAREHOUSE")


def get_arg_parser():
  """
  Input argument list.
  """
  parser = argparse.ArgumentParser()
  parser.add_argument("--query", required=True, help="query text to execute")
  parser.add_argument(
    "--result_table",
    required=True,
    help="name of the table to store result of query specified by flag --query")

  return parser


def get_logger():
  """
  Get a logger for local logging.
  """
  logger = logging.getLogger("job-tutorial")
  logger.setLevel(logging.DEBUG)
  handler = logging.StreamHandler(sys.stdout)
  handler.setLevel(logging.DEBUG)
  formatter = logging.Formatter("%(name)s - %(levelname)s - %(message)s")
  handler.setFormatter(formatter)
  logger.addHandler(handler)
  return logger


def get_login_token():
  """
  Read the login token supplied automatically by Snowflake. These tokens
  are short lived and should always be read right before creating any new connection.
  """
  with open("/snowflake/session/token", "r") as f:
    return f.read()


def get_connection_params():
  """
  Construct Snowflake connection params from environment variables.
  """
  if os.path.exists("/snowflake/session/token"):
    return {
      "account": SNOWFLAKE_ACCOUNT,
      "host": SNOWFLAKE_HOST,
      "authenticator": "oauth",
      "token": get_login_token(),
      "warehouse": SNOWFLAKE_WAREHOUSE,
      "database": SNOWFLAKE_DATABASE,
      "schema": SNOWFLAKE_SCHEMA
    }
  else:
    return {
      "account": SNOWFLAKE_ACCOUNT,
      "host": SNOWFLAKE_HOST,
      "user": SNOWFLAKE_USER,
      "password": SNOWFLAKE_PASSWORD,
      "role": SNOWFLAKE_ROLE,
      "warehouse": SNOWFLAKE_WAREHOUSE,
      "database": SNOWFLAKE_DATABASE,
      "schema": SNOWFLAKE_SCHEMA
    }

def run_job():
  """
  Main body of this job.
  """
  logger = get_logger()
  logger.info("Job started")

  # Parse input arguments
  args = get_arg_parser().parse_args()
  query = args.query
  result_table = args.result_table

  # Start a Snowflake session, run the query and write results to specified table
  with Session.builder.configs(get_connection_params()).create() as session:
    # Print out current session context information.
    database = session.get_current_database()
    schema = session.get_current_schema()
    warehouse = session.get_current_warehouse()
    role = session.get_current_role()
    logger.info(
      f"Connection succeeded. Current session context: database={database}, schema={schema}, warehouse={warehouse}, role={role}"
    )

    # Execute query and persist results in a table.
    logger.info(
      f"Executing query [{query}] and writing result to table [{result_table}]"
    )
    res = session.sql(query)
    # If the table already exists, the query result must match the table scheme.
    # If the table does not exist, this will create a new table.
    res.write.mode("append").save_as_table(result_table)

  logger.info("Job finished")


if __name__ == "__main__":
  run_job()
Copy

Erläuterungen zum Code:

  • Der Python-Code wird bei main ausgeführt, wodurch er dann die Funktion run_job() ausführt:

    if __name__ == "__main__":
      run_job()
    
    Copy
  • Die Funktion run_job() liest die Umgebungsvariablen und verwendet sie, um Standardwerte für verschiedene Parameter festzulegen. Der Container verwendet diese Parameter für die Verbindung zu Snowflake. Beachten Sie Folgendes:

    • Sie können die im Dienst verwendeten Parameterwerte überschreiben, indem Sie die Felder containers.env und containers.args in der Dienstspezifikation verwenden. Weitere Informationen dazu finden Sie unter Referenz der Dienstspezifikation.

    • Wenn das Image in Snowflake ausgeführt wird, spezifiziert Snowflake einige dieser Parameter (siehe Quellcode) automatisch. Wenn Sie das Image jedoch lokal testen, müssen Sie diese Parameter ausdrücklich bereitstellen (wie im nächsten Abschnitt Lokales Erstellen und Testen eines Images gezeigt).

Datei „Dockerfile“

Diese Datei enthält alle Befehle, um ein Image mit Docker zu erstellen.

ARG BASE_IMAGE=continuumio/miniconda3:4.12.0
FROM $BASE_IMAGE
RUN conda install python=3.8 && \
  conda install snowflake-snowpark-python
COPY main.py ./
ENTRYPOINT ["python3", "main.py"]
Copy

Datei my_job_spec.yaml (Dienstspezifikation)

Snowflake verwendet die von Ihnen in dieser Spezifikation angegebenen Informationen zur Konfiguration und Ausführung Ihres Jobdienstes.

spec:
  containers:
  - name: main
    image: /tutorial_db/data_schema/tutorial_repository/my_job_image:latest
    env:
      SNOWFLAKE_WAREHOUSE: tutorial_warehouse
    args:
    - "--query=select current_time() as time,'hello'"
    - "--result_table=results"
Copy

Zusätzlich zu den Pflichtfeldern container.name und container.image (siehe Referenz der Dienstspezifikation) enthält die Spezifikation das optionale Feld container.args zur Auflistung der Argumente:

  • --query liefert die Abfrage, die bei der Ausführung des Jobdienstes ausgeführt werden soll.

  • --result_table identifiziert die Tabelle, in der die Abfrageergebnisse gespeichert werden sollen.

Lokales Erstellen und Testen eines Images

Sie können das Docker-Image lokal testen, bevor Sie es in ein Repository Ihres Snowflake-Kontos hochladen. Bei lokalen Tests wird Ihr Container eigenständig ausgeführt (er ist kein Jobdienst, der von Snowflake ausgeführt wird).

Führen Sie die folgenden Schritte aus, um das Docker-Image von Tutorial 2 zu testen:

  1. Um ein Docker-Image zu erstellen, führen Sie über die Docker-CLI den Befehl docker build aus:

    docker build --rm -t my_service:local .
    
    Copy
  2. Um Ihren Code zu starten, führen Sie den Befehl docker run aus und geben dabei <orgname>-<acctname>, <username> und <password> an:

    docker run --rm \
      -e SNOWFLAKE_ACCOUNT=<orgname>-<acctname> \
      -e SNOWFLAKE_HOST=<orgname>-<acctname>.snowflakecomputing.com \
      -e SNOWFLAKE_DATABASE=tutorial_db \
      -e SNOWFLAKE_SCHEMA=data_schema \
      -e SNOWFLAKE_ROLE=test_role \
      -e SNOWFLAKE_USER=<username> \
      -e SNOWFLAKE_PASSWORD=<password> \
      -e SNOWFLAKE_WAREHOUSE=tutorial_warehouse \
      my_job:local \
      --query="select current_time() as time,'hello'" \
      --result_table=tutorial_db.data_schema.results
    
    Copy

    Wenn Sie das Image lokal testen, beachten Sie, dass Sie zusätzlich zu den drei Argumenten (die Abfrage, das Warehouse, in dem die Abfrage ausgeführt wird, und die Tabelle, in der das Ergebnis gespeichert wird) auch die Verbindungsparameter für den lokal ausgeführten Container angeben müssen, um eine Verbindung zu Snowflake herzustellen.

    Wenn Sie den Container als Dienst ausführen, stellt Snowflake dem Container diese Parameter als Umgebungsvariablen zur Verfügung. Weitere Informationen dazu finden Sie unter Snowflake-Client konfigurieren.

    Der Jobdienst führt die Abfrage (select current_time() as time,'hello') aus und schreibt das Ergebnis in die Tabelle (tutorial_db.data_schema.results). Wenn die Tabelle nicht vorhanden ist, wird sie erstellt. Wenn die Tabelle vorhanden ist, fügt der Jobdienst eine Zeile hinzu.

    Beispielergebnis für das Abfragen der Ergebnistabelle:

    +----------+----------+
    | TIME     | TEXT     |
    |----------+----------|
    | 10:56:52 | hello    |
    +----------+----------+
    

Nächste Schritte

Sie können nun mit Tutorial 3 fortfahren, in dem gezeigt wird, wie die Kommunikation von Dienst zu Dienst funktioniert.