Snowpark Submit-Beispiele

Dieses Thema enthält Beispiele, die mit Snowpark Submit produktionsbereite Spark-Anwendungen übermitteln.

Bereitstellen einer Anwendung aus einem Snowflake-Stagingbereich heraus

Das folgende Beispiel zeigt, wie Sie eine Anwendung und ihre Abhängigkeiten aus einem Snowflake-Stagingbereich heraus bereitstellen.

  1. Laden Sie Ihre Anwendungsdateien mit dem folgenden Befehl in einen Stagingbereich hoch:

    PUT file:///<abs_path>/app.py @my_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE
    PUT file:///<abs_path>/dependencies.zip @my_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE
    
    Copy
  2. Um den Job unter Verwendung der Dateien zu übermitteln, die Sie in den Stagingbereich hochgeladen haben, verwenden Sie den folgenden Befehl:

    snowpark-submit \
      --py-files @my_stage/dependencies.zip \
      --snowflake-stage @my_stage \
      --snowflake-workload-name MY_JOB \
      --snowflake-connection-name MY_CONNECTION\
      --compute-pool MY_COMPUTE_POOL \
      @my_stage/app.py
    
    Copy

Überwachen mit Warten und Protokollen

Das folgende Beispiel zeigt, wie Sie einen Job übermitteln, auf dessen Beendigung warten und dann Protokolle abrufen.

  1. Übermitteln Sie den Job, und warten Sie auf die Fertigstellung, indem Sie den folgenden Befehl verwenden:

    snowpark-submit \
      --snowflake-workload-name MY_JOB \
      --wait-for-completion \
      --snowflake-connection-name MY_CONNECTION \
      --compute-pool MY_COMPUTE_POOL \
      app.py
    
    Copy
  2. Wenn der Job fehlschlägt, überprüfen Sie die detaillierten Protokolleinträge mit dem folgenden Befehl:

    snowpark-submit
      --snowflake-workload-name MY_JOB
      --workload-status
      --display-logs
      --snowflake-connection-name MY_CONNECTION
    
    Copy

Verwenden Sie Snowpark Submit in einem Apache Airflow DAG

Sie können einen Spark-Job über Snowpark Connect for Spark an Snowflake senden. Sie können snowpark-submit im Clustermodus verwenden, um einen Computepool zur Ausführung des Jobs zu nutzen.

Wenn Sie Apache Airflow auf diese Weise verwenden, muss der Docker-Dienst oder der Snowpark Container Services-Container, auf dem Apache Airflow ausgeführt wird, ordnungsgemäßen Zugriff auf Snowflake und die erforderlichen Dateien im Snowflake-Stagingbereich haben.

Mit dem Code im folgenden Beispiel wird die folgende Aufgabe ausgeführt:

  • Erstellt eine virtuelle Python-Umgebung unter /tmp/myenv.

    In der create_venv-Aufgabe verwendet der Code pip zur Installation des snowpark-submit-Pakets mithilfe einer .whl-Datei.

  • Erzeugt eine sichere connections.toml-Datei mit Snowflake-Verbindungsdaten und einem OAuth-Token.

    In der create_connections_toml-Aufgabe erstellt der Code das Verzeichnis /app/.snowflake, erstellt die .toml-Datei und ändert dann die Dateiberechtigungen so, dass nur der Eigentümer (Benutzer) Lese- und Schreibzugriff hat.

  • Führt einen Spark-Job mit dem Befehl snowpark-submit aus.

    In der Aufgabe run_snowpark_script macht der Code Folgendes:

    • Aktiviert die virtuelle Umgebung.

    • Führt den Spark-Job mit dem Befehl snowpark-submit aus.

    • Führt eine Bereitstellung in Snowflake im Clustermodus durch.

    • Es wird die Snowpark Connect for Spark-Remote-URI sc://localhost:15002 verwendet.

    • Gibt die Spark-Anwendungsklasse org.example.SnowparkConnectApp an.

    • Ruft das Skript aus dem Stagingbereich @snowflake_stage ab.

    • Blockiert mit --wait-for-completion die Bereitstellung, bis der Job beendet ist.

import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
from airflow.operators.trigger_dagrun import TriggerDagRunOperator


default_args = {
  'start_date': airflow.utils.dates.days_ago(1),
  'retries': 0,
}


with DAG(
  'run_sparkconnect_python_script',
  default_args=default_args,
  schedule_interval=None,
  catchup=False,
) as dag:

  create_venv = BashOperator(
      task_id='create_venv',
      bash_command="""
      python3 -m venv /tmp/myenv &&
      source /tmp/myenv/bin/activate &&
      export PIP_USER=false &&
      pip install --upgrade pip &&
      pip install --no-cache-dir grpcio-tools>=1.48.1 &&
      pip install /app/snowpark_submit-<version>.whl
      """
  )

  create_connections_toml = BashOperator(
      task_id='create_connections_toml',
      bash_command="""
      mkdir -p /app/.snowflake
      echo "${SNOWFLAKE_USER}"
      cat <<EOF > /app/.snowflake/connections.toml

[snowpark-submit]
host = "${SNOWFLAKE_HOST}"
port = "${SNOWFLAKE_PORT}"
protocol = "https"
account = "${SNOWFLAKE_ACCOUNT}"
authenticator = "oauth"
token = "$(cat /snowflake/session/token)"
warehouse = "airflow_wh"
database = "${SNOWFLAKE_DATABASE}"
schema = "${SNOWFLAKE_SCHEMA}"
client_session_keep_alive = true
EOF
  chmod 600 /app/.snowflake/connections.toml
  """
  )

  run_script = BashOperator(
      task_id='run_snowpark_script',
      bash_command="""
      set -e
      echo "Using SNOWFLAKE_HOME: $SNOWFLAKE_HOME"

      echo "Running Python script with Snowpark..."
      source /tmp/myenv/bin/activate &&
      snowpark-submit --deploy-mode cluster --class org.example.SnowparkConnectApp --compute-pool="snowparksubmit" --snowflake-workload-name="spcstest" --snowflake-stage="@AIRFLOW_APP_FILES" --wait-for-completion "@AIRFLOW_APP_FILES/transformation.py" --snowflake-connection-name snowpark-submit
      """,
      env={
          'SNOWFLAKE_HOME': '/app/.snowflake'
      }
  )

create_venv >> create_connections_toml >> run_script
Copy

Sie können DAG mithilfe der Task-Graph-Ansicht oder der Strukturbaum-Ansicht der Apache Airflow-Benutzeroberfläche überwachen. Überprüfen Sie die Aufgabenprotokolle auf die folgenden Elemente:

  • Einrichtung der Umgebung

  • Status von Snowpark Connect for Spark

  • Ausgabe des Jobs mit snowpark-submit

Sie können Jobs, die in Snowflake ausgeführt wurden, auch anhand der im Snowflake-Stagingbereich gespeicherten Protokolle oder anhand von Ereignistabellen überwachen.