Snowpark Submit-Beispiele¶
Dieses Thema enthält Beispiele, die mit Snowpark Submit produktionsbereite Spark-Anwendungen übermitteln.
Bereitstellen einer Anwendung aus einem Snowflake-Stagingbereich heraus¶
Das folgende Beispiel zeigt, wie Sie eine Anwendung und ihre Abhängigkeiten aus einem Snowflake-Stagingbereich heraus bereitstellen.
Laden Sie Ihre Anwendungsdateien mit dem folgenden Befehl in einen Stagingbereich hoch:
PUT file:///<abs_path>/app.py @my_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE PUT file:///<abs_path>/dependencies.zip @my_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE
Um den Job unter Verwendung der Dateien zu übermitteln, die Sie in den Stagingbereich hochgeladen haben, verwenden Sie den folgenden Befehl:
snowpark-submit \ --py-files @my_stage/dependencies.zip \ --snowflake-stage @my_stage \ --snowflake-workload-name MY_JOB \ --snowflake-connection-name MY_CONNECTION\ --compute-pool MY_COMPUTE_POOL \ @my_stage/app.py
Überwachen mit Warten und Protokollen¶
Das folgende Beispiel zeigt, wie Sie einen Job übermitteln, auf dessen Beendigung warten und dann Protokolle abrufen.
Übermitteln Sie den Job, und warten Sie auf die Fertigstellung, indem Sie den folgenden Befehl verwenden:
snowpark-submit \ --snowflake-workload-name MY_JOB \ --wait-for-completion \ --snowflake-connection-name MY_CONNECTION \ --compute-pool MY_COMPUTE_POOL \ app.py
Wenn der Job fehlschlägt, überprüfen Sie die detaillierten Protokolleinträge mit dem folgenden Befehl:
snowpark-submit --snowflake-workload-name MY_JOB --workload-status --display-logs --snowflake-connection-name MY_CONNECTION
Verwenden Sie Snowpark Submit in einem Apache Airflow DAG¶
Sie können einen Spark-Job über Snowpark Connect for Spark an Snowflake senden. Sie können snowpark-submit im Clustermodus verwenden, um einen Computepool zur Ausführung des Jobs zu nutzen.
Wenn Sie Apache Airflow auf diese Weise verwenden, muss der Docker-Dienst oder der Snowpark Container Services-Container, auf dem Apache Airflow ausgeführt wird, ordnungsgemäßen Zugriff auf Snowflake und die erforderlichen Dateien im Snowflake-Stagingbereich haben.
Mit dem Code im folgenden Beispiel wird die folgende Aufgabe ausgeführt:
Erstellt eine virtuelle Python-Umgebung unter
/tmp/myenv
.In der
create_venv
-Aufgabe verwendet der Codepip
zur Installation dessnowpark-submit
-Pakets mithilfe einer.whl
-Datei.Erzeugt eine sichere
connections.toml
-Datei mit Snowflake-Verbindungsdaten und einem OAuth-Token.In der
create_connections_toml
-Aufgabe erstellt der Code das Verzeichnis/app/.snowflake
, erstellt die.toml
-Datei und ändert dann die Dateiberechtigungen so, dass nur der Eigentümer (Benutzer) Lese- und Schreibzugriff hat.Führt einen Spark-Job mit dem Befehl snowpark-submit aus.
In der Aufgabe
run_snowpark_script
macht der Code Folgendes:Aktiviert die virtuelle Umgebung.
Führt den Spark-Job mit dem Befehl snowpark-submit aus.
Führt eine Bereitstellung in Snowflake im Clustermodus durch.
Es wird die Snowpark Connect for Spark-Remote-URI sc://localhost:15002 verwendet.
Gibt die Spark-Anwendungsklasse
org.example.SnowparkConnectApp
an.Ruft das Skript aus dem Stagingbereich @snowflake_stage ab.
Blockiert mit
--wait-for-completion
die Bereitstellung, bis der Job beendet ist.
import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
default_args = {
'start_date': airflow.utils.dates.days_ago(1),
'retries': 0,
}
with DAG(
'run_sparkconnect_python_script',
default_args=default_args,
schedule_interval=None,
catchup=False,
) as dag:
create_venv = BashOperator(
task_id='create_venv',
bash_command="""
python3 -m venv /tmp/myenv &&
source /tmp/myenv/bin/activate &&
export PIP_USER=false &&
pip install --upgrade pip &&
pip install --no-cache-dir grpcio-tools>=1.48.1 &&
pip install /app/snowpark_submit-<version>.whl
"""
)
create_connections_toml = BashOperator(
task_id='create_connections_toml',
bash_command="""
mkdir -p /app/.snowflake
echo "${SNOWFLAKE_USER}"
cat <<EOF > /app/.snowflake/connections.toml
[snowpark-submit]
host = "${SNOWFLAKE_HOST}"
port = "${SNOWFLAKE_PORT}"
protocol = "https"
account = "${SNOWFLAKE_ACCOUNT}"
authenticator = "oauth"
token = "$(cat /snowflake/session/token)"
warehouse = "airflow_wh"
database = "${SNOWFLAKE_DATABASE}"
schema = "${SNOWFLAKE_SCHEMA}"
client_session_keep_alive = true
EOF
chmod 600 /app/.snowflake/connections.toml
"""
)
run_script = BashOperator(
task_id='run_snowpark_script',
bash_command="""
set -e
echo "Using SNOWFLAKE_HOME: $SNOWFLAKE_HOME"
echo "Running Python script with Snowpark..."
source /tmp/myenv/bin/activate &&
snowpark-submit --deploy-mode cluster --class org.example.SnowparkConnectApp --compute-pool="snowparksubmit" --snowflake-workload-name="spcstest" --snowflake-stage="@AIRFLOW_APP_FILES" --wait-for-completion "@AIRFLOW_APP_FILES/transformation.py" --snowflake-connection-name snowpark-submit
""",
env={
'SNOWFLAKE_HOME': '/app/.snowflake'
}
)
create_venv >> create_connections_toml >> run_script
Sie können DAG mithilfe der Task-Graph-Ansicht oder der Strukturbaum-Ansicht der Apache Airflow-Benutzeroberfläche überwachen. Überprüfen Sie die Aufgabenprotokolle auf die folgenden Elemente:
Einrichtung der Umgebung
Status von Snowpark Connect for Spark
Ausgabe des Jobs mit snowpark-submit
Sie können Jobs, die in Snowflake ausgeführt wurden, auch anhand der im Snowflake-Stagingbereich gespeicherten Protokolle oder anhand von Ereignistabellen überwachen.