Tutorial 2: Create a Snowpark Container Services Job

Important

The Snowpark Container Services job feature is currently in private preview and is subject to Preview Terms at https://snowflake.com/legal. Contact your Snowflake representative for more information.

Introduction

After completing the Tutorial Common Setup, you are ready to create a job. In this tutorial, you create a simple job that connects to Snowflake, executes a SQL SELECT query, and saves the result to a table.

There are two parts to this tutorial:

Part 1: Create and test a job. You download code provided for this tutorial and follow step-by-step instructions:

  1. Download the job code for this tutorial.

  2. Build a Docker image for Snowpark Container Services, and upload the image to a repository in your account.

  3. Stage the job specification file, which gives Snowflake the container configuration information. In addition to the name of the image to use to start a container, the specification file provides:

    • Three arguments: a SELECT query, a virtual warehouse to execute the query, and the name of the table to save the result to.

    • The warehouse to execute the SELECT statement in.

  4. Execute the job. Using the EXECUTE SERVICE command, you can execute the job by providing the specification file and the compute pool where Snowflake can run the container. And finally, verify the job results.

Part 2: Understand the job code. This section provides an overview of the job code and highlights how different components collaborate.

1: Download the service code

Code (a Python application) is provided to create a job.

  1. Download the zip file into a directory.

  2. Unzip the content, which includes one directory for each tutorial. The Tutorial-2 directory has the following files:

    • main.py

    • Dockerfile

    • my_job_spec.yaml

2: Build and upload an image

Build an image for the linux/amd64 platform that Snowpark Container Services supports, and then upload the image to the image repository in your account (see Common Setup).

You will need information about the repository (the repository URL and the registry hostname) before you can build and upload the image. For more information, see Registry and Repositories.

Get information about the repository

  1. To get the repository URL, execute the SHOW IMAGE REPOSITORIES SQL command.

    SHOW IMAGE REPOSITORIES;
    
    Copy
    • The repository_url column in the output provides the URL. An example is shown:

      <orgname>-<acctname>.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository
      
    • The host name in the repository URL is registry host name. An example is shown:

      <orgname>-<acctname>.registry.snowflakecomputing.com
      

Build image and upload it to the repository

  1. Open a terminal window, and change to the directory containing the files you unzipped.

  2. To build a Docker image, execute the following docker build command using the Docker CLI. Note the command specifies current working directory (.) as the PATH for files to use for building the image.

    docker build --rm --platform linux/amd64 -t <repository_url>/<image_name> .
    
    Copy
    • For image_name, use my_job_image:latest.

    Example

    docker build --rm --platform linux/amd64 -t myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest .
    
    Copy
  3. Upload the image to the repository in your Snowflake account. In order for Docker to upload an image on your behalf to your repository, you must first authenticate Docker with Snowflake.

    1. To authenticate Docker with the Snowflake registry, execute the following command.

      docker login <registry_hostname> -u <username>
      
      Copy
      • For username, specify your Snowflake username. Docker will prompt you for your password.

    2. To upload the image execute the following command:

      docker push <repository_url>/<image_name>
      
      Copy

      Example

      docker push myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest
      
      Copy

3: Stage the specification file

  • To upload your job specification file ( my_job_spec.yaml) to the stage, use one of the following options:

    • The Snowsight web interface: For instructions, see Choosing an internal stage for local files.

    • The SnowSQL CLI: Execute the following PUT command:

      PUT file://<file-path>[/\]my_job_spec.yaml @tutorial_stage
        AUTO_COMPRESS=FALSE
        OVERWRITE=TRUE;
      
      Copy

      For example:

      • Linux or macOS

        PUT file:///tmp/my_job_spec.yaml @tutorial_stage
          AUTO_COMPRESS=FALSE
          OVERWRITE=TRUE;
        
        Copy
      • Windows

        PUT file://C:\temp\my_job_spec.yaml @tutorial_stage
          AUTO_COMPRESS=FALSE
          OVERWRITE=TRUE;
        
        Copy

      You can also specify a relative path.

      PUT file://./my_job_spec.yaml @tutorial_stage
        AUTO_COMPRESS=FALSE
        OVERWRITE=TRUE;
      
      Copy

      The command sets OVERWRITE=TRUE so that you can upload the file again, if needed (for example, if you fixed an error in your specification file). If the PUT command is executed successfully, information about the uploaded file is printed out.

4: Execute the job

Now you are ready to create a job.

  1. To start a job, run the EXECUTE SERVICE command:

    EXECUTE SERVICE
      IN COMPUTE POOL tutorial_compute_pool
      FROM @tutorial_stage
      SPEC='my_job_spec.yaml';
    
    Copy

    Note the following:

    • FROM and SPEC provide the stage name and the name of the job specification file. When the job is executed, it runs the SQL statement and saves the result to a table as specified in my_job_spec.yaml.

      The SQL statement in your job is not executed within the Docker container. Instead, the running container connects to Snowflake and runs the SQL statement in a Snowflake warehouse.

    • COMPUTE_POOL provides the compute resources where Snowflake executes the job.

    • EXECUTE SERVICE returns output that includes the Snowflake-assigned UUID of the job, as shown in the following sample output:

      +------------------------------------------------------------------------------------+
      |                      status                                                        |
      -------------------------------------------------------------------------------------+
      | Job 01af7ee6-0001-cb52-0020-c5870077223a completed successfully with status: DONE. |
      +------------------------------------------------------------------------------------+
      
  2. Get ID of the query you executed (EXECUTE SERVICE is a query).

    SET jobid = LAST_QUERY_ID();
    
    Copy

    You use the ID in the following steps to retrieve the job status and the job log information.

    Note

    It is important you call LAST_QUERY_ID immediately after you call EXECUTE SERVICE to ensure job ID the command returns is for the EXECUTE SERVICE command.

    LAST_QUERY_ID returns the query ID of a job only after the job completes; for ongoing long-running jobs, it is not suitable for obtaining real-time status information, and instead, you should use the QUERY HISTORY family of table functions to obtain the query ID of the job. For more information, see Working with Jobs.

  3. The job runs a simple query and saves result to the results table. You can verify the job successfully completed by querying the results table:

    SELECT * FROM results;
    
    Copy

    Sample output:

    +----------+-----------+
    | TIME     | TEXT      |
    |----------+-----------|
    | 10:56:52 | hello     |
    +----------+-----------+
    
  4. If you want to debug execution of your job use the system functions. For example, use SYSTEM$GET_JOB_STATUS to determine if the job is still running, failed to start, or why it failed if it did. Also, assuming your code outputs useful logs to standard output or standard error, you can access the logs using SYSTEM$GET_JOB_LOGS.

    1. To get the job status, call the system function SYSTEM$GET_JOB_STATUS:

      SELECT SYSTEM$GET_JOB_STATUS($jobid);
      
      Copy

      Sample output:

      [
        {
            "status":"DONE",
            "message":"Completed successfully",
            "containerName":"main",
            "instanceId":"0",
            "serviceName":"01af7ee6-0001-cb52-0020-c5870077223a",
            "image":"orgname-acctname.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest",
            "restartCount":0,
            "startTime":""
        }
      ]
      
      Copy

      In the output, because the job does not have a name, the serviceName is the Snowflake-assigned UUID (query ID) of the job.

    2. To get the job log information, use the system function SYSTEM$GET_JOB_LOGS:

      SELECT SYSTEM$GET_JOB_LOGS($jobid, 'main')
      
      Copy
      job-tutorial - INFO - Job started
      job-tutorial - INFO - Connection succeeded. Current session context: database="TUTORIAL_DB", schema="DATA_SCHEMA", warehouse="TUTORIAL_WAREHOUSE", role="TEST_ROLE"
      job-tutorial - INFO - Executing query [select current_time() as time,'hello'] and writing result to table [results]
      job-tutorial - INFO - Job finished
      

5: Clean up

If you do not plan to continue with Tutorial 3, you should remove billable resources you created. For more information, see Step 5 in Tutorial 3.

6: Reviewing the job code

This section covers the following topics:

Examining the files provided

The zip file you downloaded at the beginning of the tutorial includes the following files:

  • main.py

  • Dockerfile

  • my_job_spec.yaml

This section provides an overview of how the code implements the job.

main.py file

#!/opt/conda/bin/python3

import argparse
import logging
import os
import sys

from snowflake.snowpark import Session
from snowflake.snowpark.exceptions import *

# Environment variables below will be automatically populated by Snowflake.
SNOWFLAKE_ACCOUNT = os.getenv("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_HOST = os.getenv("SNOWFLAKE_HOST")
SNOWFLAKE_DATABASE = os.getenv("SNOWFLAKE_DATABASE")
SNOWFLAKE_SCHEMA = os.getenv("SNOWFLAKE_SCHEMA")

# Custom environment variables
SNOWFLAKE_USER = os.getenv("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = os.getenv("SNOWFLAKE_PASSWORD")
SNOWFLAKE_ROLE = os.getenv("SNOWFLAKE_ROLE")
SNOWFLAKE_WAREHOUSE = os.getenv("SNOWFLAKE_WAREHOUSE")


def get_arg_parser():
  """
  Input argument list.
  """
  parser = argparse.ArgumentParser()
  parser.add_argument("--query", required=True, help="query text to execute")
  parser.add_argument(
    "--result_table",
    required=True,
    help="name of the table to store result of query specified by flag --query")

  return parser


def get_logger():
  """
  Get a logger for local logging.
  """
  logger = logging.getLogger("job-tutorial")
  logger.setLevel(logging.DEBUG)
  handler = logging.StreamHandler(sys.stdout)
  handler.setLevel(logging.DEBUG)
  formatter = logging.Formatter("%(name)s - %(levelname)s - %(message)s")
  handler.setFormatter(formatter)
  logger.addHandler(handler)
  return logger


def get_login_token():
  """
  Read the login token supplied automatically by Snowflake. These tokens
  are short lived and should always be read right before creating any new connection.
  """
  with open("/snowflake/session/token", "r") as f:
    return f.read()


def get_connection_params():
  """
  Construct Snowflake connection params from environment variables.
  """
  if os.path.exists("/snowflake/session/token"):
    return {
      "account": SNOWFLAKE_ACCOUNT,
      "host": SNOWFLAKE_HOST,
      "authenticator": "oauth",
      "token": get_login_token(),
      "warehouse": SNOWFLAKE_WAREHOUSE,
      "database": SNOWFLAKE_DATABASE,
      "schema": SNOWFLAKE_SCHEMA
    }
  else:
    return {
      "account": SNOWFLAKE_ACCOUNT,
      "host": SNOWFLAKE_HOST,
      "user": SNOWFLAKE_USER,
      "password": SNOWFLAKE_PASSWORD,
      "role": SNOWFLAKE_ROLE,
      "warehouse": SNOWFLAKE_WAREHOUSE,
      "database": SNOWFLAKE_DATABASE,
      "schema": SNOWFLAKE_SCHEMA
    }

def run_job():
  """
  Main body of this job.
  """
  logger = get_logger()
  logger.info("Job started")

  # Parse input arguments
  args = get_arg_parser().parse_args()
  query = args.query
  result_table = args.result_table

  # Start a Snowflake session, run the query and write results to specified table
  with Session.builder.configs(get_connection_params()).create() as session:
    # Print out current session context information.
    database = session.get_current_database()
    schema = session.get_current_schema()
    warehouse = session.get_current_warehouse()
    role = session.get_current_role()
    logger.info(
      f"Connection succeeded. Current session context: database={database}, schema={schema}, warehouse={warehouse}, role={role}"
    )

    # Execute query and persist results in a table.
    logger.info(
      f"Executing query [{query}] and writing result to table [{result_table}]"
    )
    res = session.sql(query)
    # If the table already exists, the query result must match the table scheme.
    # If the table does not exist, this will create a new table.
    res.write.mode("append").save_as_table(result_table)

  logger.info("Job finished")


if __name__ == "__main__":
  run_job()
Copy

In the code:

  • Python code executes at main, which then executes the run_job() function:

    if __name__ == "__main__":
      run_job()
    
    Copy
  • The run_job() function reads the environment variables and uses them to set default values for various parameters. The container uses these parameters to connect to Snowflake. Note that:

    • You can override these default parameter values. For more information, see Service specification reference.

    • When the image runs in Snowflake, Snowflake populates some of these parameters (see source code) automatically. However, when testing the image locally, you need to explicitly provide these parameters (as shown in the next section, Building and testing an image locally).

Dockerfile

This file contains all the commands to build an image using Docker.

ARG BASE_IMAGE=continuumio/miniconda3:4.12.0
FROM $BASE_IMAGE
RUN conda install python=3.8 && \
  conda install snowflake-snowpark-python
COPY main.py ./
ENTRYPOINT ["python3", "main.py"]
Copy

my_job_spec.yaml File (Job Specification)

Snowflake uses information you provide in this specification to configure and run your job.

spec:
container:
- name: main
   image: /tutorial_db/data_schema/tutorial_repository/my_job_image:latest
   env:
      SNOWFLAKE_WAREHOUSE: tutorial_warehouse
   args:
   - "--query=select current_time() as time,'hello'"
   - "--result_table=results"
Copy

In addition to the container.name and container.image required fields (see Service specification reference), the specification includes the optional container.args field to list the arguments:

  • --query provides the query to execute when the job runs.

  • --result_table identifies the table to save the query results.

Building and testing an image locally

You can test the Docker image locally before uploading it to a repository in your Snowflake account. In local testing, your container runs standalone (it is not a job that Snowflake executes).

Use the following steps to test the Tutorial 2 Docker image:

  1. To create a Docker image, in the Docker CLI, execute the docker build command:

    docker build --rm -t my_service:local .
    
    Copy
  2. To launch your code, execute the docker run command, providing <orgname>-<acctname>, <username>, and <password>):

    docker run --rm \
      -e SNOWFLAKE_ACCOUNT=<orgname>-<acctname> \
      -e SNOWFLAKE_HOST=<orgname>-<acctname>.snowflakecomputing.com \
      -e SNOWFLAKE_DATABASE=tutorial_db \
      -e SNOWFLAKE_SCHEMA=data_schema \
      -e SNOWFLAKE_ROLE=test_role \
      -e SNOWFLAKE_USER=<username> \
      -e SNOWFLAKE_PASSWORD=<password> \
      -e SNOWFLAKE_WAREHOUSE=tutorial_warehouse \
      my_job:local \
      --query="select current_time() as time,'hello'" \
      --result_table=tutorial_db.data_schema.results
    
    Copy

    When testing the image locally, note that, in addition to the three arguments (a query, the warehouse to run the query, and a table to save the result to), you also provide the connection parameters for the container running locally to connect to Snowflake.

    When you run the container as a job, Snowflake provides these parameters to the container as the environment variables. For more information, see Configure Snowflake Client.

    The job executes the query (select current_time() as time,'hello') and writes result to the table (tutorial_db.data_schema.results). If the table does not exist, it is created. If the table exists, the job adds a row.

    Sample result of querying the results table:

    +----------+----------+
    | TIME     | TEXT     |
    |----------+----------|
    | 10:56:52 | hello    |
    +----------+----------+
    

What’s next?

You can now test Tutorial 3, which shows how service-to-service communication works.