Snowpark Submit examples

This topic includes examples that use Snowpark Submit to submit production-ready Spark applications.

Deploy an application from a Snowflake stage

The following example shows how to deploy an application and its dependencies from a Snowflake stage.

  1. Upload your application files to a stage by using the following command:

    PUT file:///<abs_path>/app.py @my_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE
    PUT file:///<abs_path>/dependencies.zip @my_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE
    
    Copy
  2. To submit the job by using the files you uploaded to the stage, use the following command:

    snowpark-submit \
      --py-files @my_stage/dependencies.zip \
      --snowflake-stage @my_stage \
      --snowflake-workload-name MY_JOB \
      --snowflake-connection-name MY_CONNECTION\
      --compute-pool MY_COMPUTE_POOL \
      @my_stage/app.py
    
    Copy

Monitor with wait and logs

The following example shows how to submit a job, wait for its completion, and then retrieve logs.

  1. Submit the job and wait for completion by using the following command:

    snowpark-submit \
      --snowflake-workload-name MY_JOB \
      --wait-for-completion \
      --snowflake-connection-name MY_CONNECTION \
      --compute-pool MY_COMPUTE_POOL \
      app.py
    
    Copy
  2. If the job fails, check the detailed logs by using the following command:

    snowpark-submit
      --snowflake-workload-name MY_JOB
      --workload-status
      --display-logs
      --snowflake-connection-name MY_CONNECTION
    
    Copy

Use Snowpark Submit in an Apache Airflow DAG

You can submit a Spark job to Snowflake via Snowpark Connect for Spark. You can use snowpark-submit in cluster mode to leverage a compute pool to run the job.

When you use Apache Airflow in this way, ensure that the Docker service or Snowpark Container Services container that runs Apache Airflow has proper access to Snowflake and the required files in the Snowflake stage.

The code in the following example performs the following tasks:

  • Creates a Python virtual environment at /tmp/myenv.

    In the create_venv task, the code uses pip to install the snowpark-submit package by using a .whl file.

  • Generates a secure connections.toml file with Snowflake connection credentials and an OAuth token.

    In the create_connections_toml task, the code creates the /app/.snowflake directory, creates the .toml file, and then changes file permissions to allow only the owner (user) to have read and write access.

  • Runs a Spark job by using the snowpark-submit command.

    In the run_snowpark_script task, the code does the following things:

    • Activates the virtual environment.

    • Runs the Spark job by using the snowpark-submit command.

    • Deploys to Snowflake by using cluster mode.

    • Uses the Snowpark Connect for Spark remote URI sc://localhost:15002.

    • Specifies the Spark application class org.example.SnowparkConnectApp.

    • Pulls the script from the @snowflake_stage stage.

    • Blocks deployment until the job finishes by using --wait-for-completion.

import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
from airflow.operators.trigger_dagrun import TriggerDagRunOperator


default_args = {
  'start_date': airflow.utils.dates.days_ago(1),
  'retries': 0,
}


with DAG(
  'run_sparkconnect_python_script',
  default_args=default_args,
  schedule_interval=None,
  catchup=False,
) as dag:

  create_venv = BashOperator(
      task_id='create_venv',
      bash_command="""
      python3 -m venv /tmp/myenv &&
      source /tmp/myenv/bin/activate &&
      export PIP_USER=false &&
      pip install --upgrade pip &&
      pip install --no-cache-dir grpcio-tools>=1.48.1 &&
      pip install /app/snowpark_submit-<version>.whl
      """
  )

  create_connections_toml = BashOperator(
      task_id='create_connections_toml',
      bash_command="""
      mkdir -p /app/.snowflake
      echo "${SNOWFLAKE_USER}"
      cat <<EOF > /app/.snowflake/connections.toml

[snowpark-submit]
host = "${SNOWFLAKE_HOST}"
port = "${SNOWFLAKE_PORT}"
protocol = "https"
account = "${SNOWFLAKE_ACCOUNT}"
authenticator = "oauth"
token = "$(cat /snowflake/session/token)"
warehouse = "airflow_wh"
database = "${SNOWFLAKE_DATABASE}"
schema = "${SNOWFLAKE_SCHEMA}"
client_session_keep_alive = true
EOF
  chmod 600 /app/.snowflake/connections.toml
  """
  )

  run_script = BashOperator(
      task_id='run_snowpark_script',
      bash_command="""
      set -e
      echo "Using SNOWFLAKE_HOME: $SNOWFLAKE_HOME"

      echo "Running Python script with Snowpark..."
      source /tmp/myenv/bin/activate &&
      snowpark-submit --deploy-mode cluster --class org.example.SnowparkConnectApp --compute-pool="snowparksubmit" --snowflake-workload-name="spcstest" --snowflake-stage="@AIRFLOW_APP_FILES" --wait-for-completion "@AIRFLOW_APP_FILES/transformation.py" --snowflake-connection-name snowpark-submit
      """,
      env={
          'SNOWFLAKE_HOME': '/app/.snowflake'
      }
  )

create_venv >> create_connections_toml >> run_script
Copy

You can monitor the DAG by using the Apache Airflow user interface’s Graph View or Tree View. Inspect the task logs for the following items:

  • Environment setup

  • Status of Snowpark Connect for Spark

  • snowpark-submit job output

You can also monitor for jobs that ran in Snowflake from the logs stored in Snowflake stage or from event tables.