Tutorial 2: Snowpark Container Services-Job erstellen

Wichtig

Das Job-Feature von Snowpark Container Services befindet sich derzeit in der privaten Vorschau und unterliegt den Nutzungsbedingungen für Vorschau-Features unter https://snowflake.com/legal. Weitere Informationen dazu erhalten Sie bei Ihrem Snowflake-Ansprechpartner.

Einführung

Nachdem Sie die grundlegende Tutorial-Einrichtung abgeschlossen haben, können Sie nun einen Job erstellen. In diesem Tutorial erstellen Sie einen einfachen Job, der eine Verbindung zu Snowflake herstellt, eine SQL-SELECT-Abfrage ausführt und das Ergebnis in einer Tabelle speichert.

Dieses Tutorial besteht aus zwei Teilen:

Teil 1: Job erstellen und testen. Sie laden den für dieses Tutorial bereitgestellten Code herunter und befolgen die schrittweise Anleitung:

  1. Jobcode für dieses Tutorial herunterladen

  2. Docker-Image für Snowpark Container Services erstellen und Image in Repository im eigenen Konto hochladen

  3. Job-Spezifikationsdatei mit den Container-Konfigurationsinformationen für Snowflake im Stagingbereich bereitstellen. Neben dem Namen des Images, das zum Starten eines Containers verwendet werden soll, enthält die Spezifikationsdatei weitere Angaben:

    • Drei Argumente: die SELECT-Abfrage, das virtuelle Warehouse zum Ausführen der Abfrage und der Name der Tabelle, in der das Ergebnis gespeichert werden soll.

    • Das Warehouse, in dem die SELECT-Anweisung ausgeführt werden soll.

  4. Job ausführen. Mit dem Befehl EXECUTE SERVICE können Sie den Job ausführen, indem Sie die Spezifikationsdatei und den Computepool angeben, in dem Snowflake den Container ausführen kann. Und abschließend überprüfen Sie die Ergebnisse des Jobs.

Teil 2: Erläuterungen zum Jobcode. Dieser Abschnitt bietet eine Übersicht zum Jobcode und zeigt auf, wie die verschiedenen Komponenten zusammenarbeiten.

1: Dienstcode herunterladen

Zum Erstellen eines Jobs wird ein Code (eine Python-Anwendung) bereitgestellt.

  1. Laden Sie die ZIP-Datei in ein Verzeichnis herunter.

  2. Entpacken Sie den Inhalt, der ein Verzeichnis für jedes Tutorial enthält. Das Verzeichnis Tutorial-2 enthält die folgenden Dateien:

    • main.py

    • Dockerfile

    • my_job_spec.yaml

2: Image erstellen und hochladen

Erstellen Sie ein Image für die linux/amd64-Plattform, die von Snowpark Container Services unterstützt wird, und laden Sie das Image dann in das Image-Repository in Ihrem Konto hoch (siehe Grundlegende Einrichtung).

Sie benötigen Informationen zum Repository (die Repository-URL und den Hostnamen der Registry), bevor Sie das Image erstellen und hochladen können. Weitere Informationen dazu finden Sie unter Registry und Repositorys.

Informationen zum Repository abrufen

  1. Um die Repository-URL zu erhalten, führen Sie den SQL-Befehl SHOW IMAGE REPOSITORIES aus.

    SHOW IMAGE REPOSITORIES;
    
    Copy
    • Die URL ist in der Spalte repository_url der Ausgabe enthalten. Siehe folgendes Beispiel:

      <orgname>-<acctname>.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository
      
    • Der Hostname in der Repository-URL ist der Name des Registry-Hosts. Siehe folgendes Beispiel:

      <orgname>-<acctname>.registry.snowflakecomputing.com
      

Image erstellen und in das Repository hochladen

  1. Öffnen Sie ein Terminalfenster, und wechseln Sie in das Verzeichnis, das die entpackten Dateien enthält.

  2. Um ein Docker-Image zu erstellen, führen Sie den folgenden Befehl docker build mithilfe der Docker-CLI aus. Beachten Sie, dass der Befehl das aktuelle Arbeitsverzeichnis (.) als PATH für die Dateien angibt, die für das Erstellen des Images verwendet werden sollen.

    docker build --rm --platform linux/amd64 -t <repository_url>/<image_name> .
    
    Copy
    • Für image_name verwenden Sie my_job_image:latest:

    Beispiel

    docker build --rm --platform linux/amd64 -t myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest .
    
    Copy
  3. Laden Sie das Image in das Repository in Ihrem Snowflake-Konto hoch. Damit Docker ein Image in Ihrem Namen in Ihr Repository hochladen kann, müssen Sie Docker zunächst mit Snowflake authentifizieren.

    1. Um Docker bei der Snowflake-Registrierung zu authentifizieren, führen Sie den folgenden Befehl aus.

      docker login <registry_hostname> -u <username>
      
      Copy
      • Geben Sie dabei für username Ihren Snowflake-Benutzernamen an. Docker fordert Sie zur Eingabe Ihres Kennworts auf.

    2. Führen Sie den folgenden Befehl aus, um das Image hochzuladen:

      docker push <repository_url>/<image_name>
      
      Copy

      Beispiel

      docker push myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest
      
      Copy

3: Spezifikationsdatei im Stagingbereich bereitstellen

  • Um Ihre Jobspezifikationsdatei „my_job_spec.yaml“ in den Stagingbereich zu laden, verwenden Sie eine der folgenden Optionen:

    • Die Snowsight-Weboberfläche: Eine Anleitung dazu finden Sie unter Auswahl eines internen Stagingbereichs für lokale Dateien.

    • Die SnowSQL-CLI: Führen Sie den folgenden PUT-Befehl aus:

      PUT file://<file-path>[/\]my_job_spec.yaml @tutorial_stage
        AUTO_COMPRESS=FALSE
        OVERWRITE=TRUE;
      
      Copy

      Beispiel:

      • Linux oder macOS

        PUT file:///tmp/my_job_spec.yaml @tutorial_stage
          AUTO_COMPRESS=FALSE
          OVERWRITE=TRUE;
        
        Copy
      • Windows

        PUT file://C:\temp\my_job_spec.yaml @tutorial_stage
          AUTO_COMPRESS=FALSE
          OVERWRITE=TRUE;
        
        Copy

      Sie können auch einen relativen Pfad angeben.

      PUT file://./my_job_spec.yaml @tutorial_stage
        AUTO_COMPRESS=FALSE
        OVERWRITE=TRUE;
      
      Copy

      Der Befehl setzt OVERWRITE=TRUE, sodass Sie die Datei bei Bedarf erneut hochladen können (z. B. wenn Sie einen Fehler in Ihrer Spezifikationsdatei behoben haben). Wenn der PUT-Befehl erfolgreich ausgeführt wurde, werden Informationen zu der hochgeladenen Datei ausgegeben.

4: Job ausführen

Jetzt sind Sie bereit, einen Job zu erstellen.

  1. Um einen Job zu starten, führen Sie den Befehl EXECUTE SERVICE aus:

    EXECUTE SERVICE
      IN COMPUTE POOL tutorial_compute_pool
      FROM @tutorial_stage
      SPEC='my_job_spec.yaml';
    
    Copy

    Beachten Sie Folgendes:

    • FROM und SPEC geben den Namen des Stagingbereichs bzw. den Namen der Jobspezifikationsdatei an. Wenn der Job ausgeführt wird, führt er die SQL-Anweisung aus und speichert das Ergebnis in einer Tabelle, wie in my_job_spec.yaml angegeben.

      Die SQL-Anweisung in Ihrem Job wird nicht innerhalb des Docker-Containers ausgeführt. Stattdessen stellt der ausgeführte Container eine Verbindung zu Snowflake her und führt die SQL-Anweisung in einem Snowflake-Warehouse aus.

    • COMPUTE_POOL stellt die Computeressourcen bereit, auf denen Snowflake den Job ausführt.

    • EXECUTE SERVICE gibt eine Ausgabe zurück, die die von Snowflake zugewiesene UUID des Jobs enthält, wie in der folgenden Beispielausgabe gezeigt:

      +------------------------------------------------------------------------------------+
      |                      status                                                        |
      -------------------------------------------------------------------------------------+
      | Job 01af7ee6-0001-cb52-0020-c5870077223a completed successfully with status: DONE. |
      +------------------------------------------------------------------------------------+
      
  2. Rufen Sie die ID der von Ihnen ausgeführten Abfrage ab (EXECUTE SERVICE ist eine Abfrage).

    SET jobid = LAST_QUERY_ID();
    
    Copy

    Sie verwenden diese ID in den folgenden Schritten, um den Jobstatus und die Jobprotokollinformationen abzurufen.

    Bemerkung

    Es ist wichtig, dass Sie LAST_QUERY_ID unmittelbar nach dem Aufruf von EXECUTE SERVICE aufrufen, um sicherzustellen, dass die vom Befehl zurückgegebene Job-ID für den Befehl EXECUTE SERVICE bestimmt ist.

    LAST_QUERY_ID gibt die Abfrage-ID eines Jobs erst nach dessen Beendigung zurück; für aktive Jobs mit langer Ausführungszeit ist sie nicht geeignet, um Statusinformationen in Echtzeit zu erhalten. Stattdessen müssen Sie die QUERY HISTORY-Tabellenfunktionen verwenden, um die Abfrage-ID des Jobs zu erhalten. Weitere Informationen dazu finden Sie unter Verwenden von Jobs.

  3. Der Job führt eine einfache Abfrage aus und speichert das Ergebnis in der Ergebnistabelle. Sie können den erfolgreichen Abschluss des Jobs überprüfen, indem Sie die Ergebnistabelle abfragen:

    SELECT * FROM results;
    
    Copy

    Beispielausgabe:

    +----------+-----------+
    | TIME     | TEXT      |
    |----------+-----------|
    | 10:56:52 | hello     |
    +----------+-----------+
    
  4. Wenn Sie die Ausführung Ihres Jobs debuggen möchten, verwenden Sie die Systemfunktionen. Verwenden Sie zum Beispiel SYSTEM$GET_JOB_STATUS, um festzustellen, ob der Job noch ausgeführt wird, ob er nicht gestartet werden konnte oder warum er fehlgeschlagen ist. Wenn Ihr Code nützliche Protokolleeinträge an die Standardausgabe oder den Standardfehler ausgibt, können Sie auch mit SYSTEM$GET_JOB_LOGS auf die Protokolle zugreifen.

    1. Um den Jobstatus abzurufen, rufen Sie die Systemfunktion SYSTEM$GET_JOB_STATUS auf:

      SELECT SYSTEM$GET_JOB_STATUS($jobid);
      
      Copy

      Beispielausgabe:

      [
        {
            "status":"DONE",
            "message":"Completed successfully",
            "containerName":"main",
            "instanceId":"0",
            "serviceName":"01af7ee6-0001-cb52-0020-c5870077223a",
            "image":"orgname-acctname.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest",
            "restartCount":0,
            "startTime":""
        }
      ]
      
      Copy

      Da der Job keinen Namen hat, ist der Wert von serviceName in der Ausgabe die von Snowflake zugewiesene UUID (Abfrage-ID) des Jobs.

    2. Um die Jobprotokollinformationen zu erhalten, verwenden Sie die Systemfunktion SYSTEM$GET_JOB_LOGS:

      SELECT SYSTEM$GET_JOB_LOGS($jobid, 'main')
      
      Copy
      job-tutorial - INFO - Job started
      job-tutorial - INFO - Connection succeeded. Current session context: database="TUTORIAL_DB", schema="DATA_SCHEMA", warehouse="TUTORIAL_WAREHOUSE", role="TEST_ROLE"
      job-tutorial - INFO - Executing query [select current_time() as time,'hello'] and writing result to table [results]
      job-tutorial - INFO - Job finished
      

5: Bereinigen

Wenn Sie nicht vorhaben, mit Tutorial 3 fortzufahren, sollten Sie die von Ihnen erstellten abrechenbaren Ressourcen entfernen. Weitere Informationen dazu finden Sie in Schritt 5 von Tutorial 3.

6: Erläuterungen zum Jobcode

In diesem Abschnitt werden die folgenden Themen behandelt:

Untersuchen der bereitgestellten Dateien

Die ZIP-Datei, die Sie zu Beginn des Tutorials heruntergeladen haben, enthält die folgenden Dateien:

  • main.py

  • Dockerfile

  • my_job_spec.yaml

Dieser Abschnitt bietet eine Übersicht darüber, wie der Code den Job implementiert.

Datei „main.py“

#!/opt/conda/bin/python3

import argparse
import logging
import os
import sys

from snowflake.snowpark import Session
from snowflake.snowpark.exceptions import *

# Environment variables below will be automatically populated by Snowflake.
SNOWFLAKE_ACCOUNT = os.getenv("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_HOST = os.getenv("SNOWFLAKE_HOST")
SNOWFLAKE_DATABASE = os.getenv("SNOWFLAKE_DATABASE")
SNOWFLAKE_SCHEMA = os.getenv("SNOWFLAKE_SCHEMA")

# Custom environment variables
SNOWFLAKE_USER = os.getenv("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = os.getenv("SNOWFLAKE_PASSWORD")
SNOWFLAKE_ROLE = os.getenv("SNOWFLAKE_ROLE")
SNOWFLAKE_WAREHOUSE = os.getenv("SNOWFLAKE_WAREHOUSE")


def get_arg_parser():
  """
  Input argument list.
  """
  parser = argparse.ArgumentParser()
  parser.add_argument("--query", required=True, help="query text to execute")
  parser.add_argument(
    "--result_table",
    required=True,
    help="name of the table to store result of query specified by flag --query")

  return parser


def get_logger():
  """
  Get a logger for local logging.
  """
  logger = logging.getLogger("job-tutorial")
  logger.setLevel(logging.DEBUG)
  handler = logging.StreamHandler(sys.stdout)
  handler.setLevel(logging.DEBUG)
  formatter = logging.Formatter("%(name)s - %(levelname)s - %(message)s")
  handler.setFormatter(formatter)
  logger.addHandler(handler)
  return logger


def get_login_token():
  """
  Read the login token supplied automatically by Snowflake. These tokens
  are short lived and should always be read right before creating any new connection.
  """
  with open("/snowflake/session/token", "r") as f:
    return f.read()


def get_connection_params():
  """
  Construct Snowflake connection params from environment variables.
  """
  if os.path.exists("/snowflake/session/token"):
    return {
      "account": SNOWFLAKE_ACCOUNT,
      "host": SNOWFLAKE_HOST,
      "authenticator": "oauth",
      "token": get_login_token(),
      "warehouse": SNOWFLAKE_WAREHOUSE,
      "database": SNOWFLAKE_DATABASE,
      "schema": SNOWFLAKE_SCHEMA
    }
  else:
    return {
      "account": SNOWFLAKE_ACCOUNT,
      "host": SNOWFLAKE_HOST,
      "user": SNOWFLAKE_USER,
      "password": SNOWFLAKE_PASSWORD,
      "role": SNOWFLAKE_ROLE,
      "warehouse": SNOWFLAKE_WAREHOUSE,
      "database": SNOWFLAKE_DATABASE,
      "schema": SNOWFLAKE_SCHEMA
    }

def run_job():
  """
  Main body of this job.
  """
  logger = get_logger()
  logger.info("Job started")

  # Parse input arguments
  args = get_arg_parser().parse_args()
  query = args.query
  result_table = args.result_table

  # Start a Snowflake session, run the query and write results to specified table
  with Session.builder.configs(get_connection_params()).create() as session:
    # Print out current session context information.
    database = session.get_current_database()
    schema = session.get_current_schema()
    warehouse = session.get_current_warehouse()
    role = session.get_current_role()
    logger.info(
      f"Connection succeeded. Current session context: database={database}, schema={schema}, warehouse={warehouse}, role={role}"
    )

    # Execute query and persist results in a table.
    logger.info(
      f"Executing query [{query}] and writing result to table [{result_table}]"
    )
    res = session.sql(query)
    # If the table already exists, the query result must match the table scheme.
    # If the table does not exist, this will create a new table.
    res.write.mode("append").save_as_table(result_table)

  logger.info("Job finished")


if __name__ == "__main__":
  run_job()
Copy

Erläuterungen zum Code:

  • Der Python-Code wird bei main ausgeführt, wodurch er dann die Funktion run_job() ausführt:

    if __name__ == "__main__":
      run_job()
    
    Copy
  • Die Funktion run_job() liest die Umgebungsvariablen und verwendet sie, um Standardwerte für verschiedene Parameter festzulegen. Der Container verwendet diese Parameter für die Verbindung zu Snowflake. Beachten Sie Folgendes:

    • Sie können diese Standardparameterwerte überschreiben. Weitere Informationen dazu finden Sie unter Referenz der Dienstspezifikation.

    • Wenn das Image in Snowflake ausgeführt wird, spezifiziert Snowflake einige dieser Parameter (siehe Quellcode) automatisch. Wenn Sie das Image jedoch lokal testen, müssen Sie diese Parameter ausdrücklich bereitstellen (wie im nächsten Abschnitt Lokales Erstellen und Testen eines Images gezeigt).

Datei „Dockerfile“

Diese Datei enthält alle Befehle, um ein Image mit Docker zu erstellen.

ARG BASE_IMAGE=continuumio/miniconda3:4.12.0
FROM $BASE_IMAGE
RUN conda install python=3.8 && \
  conda install snowflake-snowpark-python
COPY main.py ./
ENTRYPOINT ["python3", "main.py"]
Copy

Datei „my_job_spec.yaml“ (Jobspezifikation)

Snowflake verwendet die von Ihnen in dieser Spezifikation angegebenen Informationen zur Konfiguration und Ausführung Ihres Jobs.

spec:
container:
- name: main
   image: /tutorial_db/data_schema/tutorial_repository/my_job_image:latest
   env:
      SNOWFLAKE_WAREHOUSE: tutorial_warehouse
   args:
   - "--query=select current_time() as time,'hello'"
   - "--result_table=results"
Copy

Zusätzlich zu den Pflichtfeldern container.name und container.image (siehe Referenz der Dienstspezifikation) enthält die Spezifikation das optionale Feld container.args zur Auflistung der Argumente:

  • --query liefert die Abfrage, die bei der Ausführung des Jobs ausgeführt werden soll.

  • --result_table identifiziert die Tabelle, in der die Abfrageergebnisse gespeichert werden sollen.

Lokales Erstellen und Testen eines Images

Sie können das Docker-Image lokal testen, bevor Sie es in ein Repository Ihres Snowflake-Kontos hochladen. Bei lokalen Tests wird Ihr Container eigenständig ausgeführt (er ist kein Job, der von Snowflake ausgeführt wird).

Führen Sie die folgenden Schritte aus, um das Docker-Image von Tutorial 2 zu testen:

  1. Um ein Docker-Image zu erstellen, führen Sie über die Docker-CLI den Befehl docker build aus:

    docker build --rm -t my_service:local .
    
    Copy
  2. Um Ihren Code zu starten, führen Sie den Befehl docker run aus und geben dabei <orgname>-<acctname>, <username> und <password>) an:

    docker run --rm \
      -e SNOWFLAKE_ACCOUNT=<orgname>-<acctname> \
      -e SNOWFLAKE_HOST=<orgname>-<acctname>.snowflakecomputing.com \
      -e SNOWFLAKE_DATABASE=tutorial_db \
      -e SNOWFLAKE_SCHEMA=data_schema \
      -e SNOWFLAKE_ROLE=test_role \
      -e SNOWFLAKE_USER=<username> \
      -e SNOWFLAKE_PASSWORD=<password> \
      -e SNOWFLAKE_WAREHOUSE=tutorial_warehouse \
      my_job:local \
      --query="select current_time() as time,'hello'" \
      --result_table=tutorial_db.data_schema.results
    
    Copy

    Wenn Sie das Image lokal testen, beachten Sie, dass Sie zusätzlich zu den drei Argumenten (die Abfrage, das Warehouse, in dem die Abfrage ausgeführt wird, und die Tabelle, in der das Ergebnis gespeichert wird) auch die Verbindungsparameter für den lokal ausgeführten Container angeben müssen, um eine Verbindung zu Snowflake herzustellen.

    Wenn Sie den Container als Job ausführen, stellt Snowflake dem Container diese Parameter als Umgebungsvariablen zur Verfügung. Weitere Informationen dazu finden Sie unter Snowflake-Client konfigurieren.

    Der Job führt die Abfrage (select current_time() as time,'hello') aus und schreibt das Ergebnis in die Tabelle (tutorial_db.data_schema.results). Wenn die Tabelle nicht vorhanden ist, wird sie erstellt. Wenn die Tabelle vorhanden ist, fügt der Job eine Zeile hinzu.

    Beispielergebnis für das Abfragen der Ergebnistabelle:

    +----------+----------+
    | TIME     | TEXT     |
    |----------+----------|
    | 10:56:52 | hello    |
    +----------+----------+
    

Nächste Schritte

Sie können nun mit Tutorial 3 fortfahren, in dem gezeigt wird, wie die Kommunikation von Dienst zu Dienst funktioniert.