Snowpark Submit examples¶
This topic includes examples that use Snowpark Submit to submit production-ready Spark applications.
Deploy an application from a Snowflake stage¶
The following example shows how to deploy an application and its dependencies from a Snowflake stage.
Upload your application files to a stage by using the following command:
PUT file:///<abs_path>/app.py @my_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE PUT file:///<abs_path>/dependencies.zip @my_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE
To submit the job by using the files you uploaded to the stage, use the following command:
snowpark-submit \ --py-files @my_stage/dependencies.zip \ --snowflake-stage @my_stage \ --snowflake-workload-name MY_JOB \ --snowflake-connection-name MY_CONNECTION\ --compute-pool MY_COMPUTE_POOL \ @my_stage/app.py
Monitor with wait and logs¶
The following example shows how to submit a job, wait for its completion, and then retrieve logs.
Submit the job and wait for completion by using the following command:
snowpark-submit \ --snowflake-workload-name MY_JOB \ --wait-for-completion \ --snowflake-connection-name MY_CONNECTION \ --compute-pool MY_COMPUTE_POOL \ app.py
If the job fails, check the detailed logs by using the following command:
snowpark-submit --snowflake-workload-name MY_JOB --workload-status --display-logs --snowflake-connection-name MY_CONNECTION
Use Snowpark Submit in an Apache Airflow DAG¶
You can submit a Spark job to Snowflake via Snowpark Connect for Spark. You can use snowpark-submit in cluster mode to leverage a compute pool to run the job.
When you use Apache Airflow in this way, ensure that the Docker service or Snowpark Container Services container that runs Apache Airflow has proper access to Snowflake and the required files in the Snowflake stage.
The code in the following example performs the following tasks:
Creates a Python virtual environment at
/tmp/myenv
.In the
create_venv
task, the code usespip
to install thesnowpark-submit
package by using a.whl
file.Generates a secure
connections.toml
file with Snowflake connection credentials and an OAuth token.In the
create_connections_toml
task, the code creates the/app/.snowflake
directory, creates the.toml
file, and then changes file permissions to allow only the owner (user) to have read and write access.Runs a Spark job by using the snowpark-submit command.
In the
run_snowpark_script
task, the code does the following things:Activates the virtual environment.
Runs the Spark job by using the snowpark-submit command.
Deploys to Snowflake by using cluster mode.
Uses the Snowpark Connect for Spark remote URI sc://localhost:15002.
Specifies the Spark application class
org.example.SnowparkConnectApp
.Pulls the script from the @snowflake_stage stage.
Blocks deployment until the job finishes by using
--wait-for-completion
.
import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
default_args = {
'start_date': airflow.utils.dates.days_ago(1),
'retries': 0,
}
with DAG(
'run_sparkconnect_python_script',
default_args=default_args,
schedule_interval=None,
catchup=False,
) as dag:
create_venv = BashOperator(
task_id='create_venv',
bash_command="""
python3 -m venv /tmp/myenv &&
source /tmp/myenv/bin/activate &&
export PIP_USER=false &&
pip install --upgrade pip &&
pip install --no-cache-dir grpcio-tools>=1.48.1 &&
pip install /app/snowpark_submit-<version>.whl
"""
)
create_connections_toml = BashOperator(
task_id='create_connections_toml',
bash_command="""
mkdir -p /app/.snowflake
echo "${SNOWFLAKE_USER}"
cat <<EOF > /app/.snowflake/connections.toml
[snowpark-submit]
host = "${SNOWFLAKE_HOST}"
port = "${SNOWFLAKE_PORT}"
protocol = "https"
account = "${SNOWFLAKE_ACCOUNT}"
authenticator = "oauth"
token = "$(cat /snowflake/session/token)"
warehouse = "airflow_wh"
database = "${SNOWFLAKE_DATABASE}"
schema = "${SNOWFLAKE_SCHEMA}"
client_session_keep_alive = true
EOF
chmod 600 /app/.snowflake/connections.toml
"""
)
run_script = BashOperator(
task_id='run_snowpark_script',
bash_command="""
set -e
echo "Using SNOWFLAKE_HOME: $SNOWFLAKE_HOME"
echo "Running Python script with Snowpark..."
source /tmp/myenv/bin/activate &&
snowpark-submit --deploy-mode cluster --class org.example.SnowparkConnectApp --compute-pool="snowparksubmit" --snowflake-workload-name="spcstest" --snowflake-stage="@AIRFLOW_APP_FILES" --wait-for-completion "@AIRFLOW_APP_FILES/transformation.py" --snowflake-connection-name snowpark-submit
""",
env={
'SNOWFLAKE_HOME': '/app/.snowflake'
}
)
create_venv >> create_connections_toml >> run_script
You can monitor the DAG by using the Apache Airflow user interface’s Graph View or Tree View. Inspect the task logs for the following items:
Environment setup
Status of Snowpark Connect for Spark
snowpark-submit job output
You can also monitor for jobs that ran in Snowflake from the logs stored in Snowflake stage or from event tables.