Exemples Snowpark Submit

Cette rubrique comprend des exemples utilisant Snowpark Submit pour soumettre des applications Spark prêtes pour la production.

Déployer une application à partir d’une zone de préparation Snowflake

L’exemple suivant illustre comment déployer une application et ses dépendances à partir d’une zone de préparation Snowflake.

  1. Importez vos fichiers d’application dans une zone de préparation à l’aide de la commande suivante :

    PUT file:///<abs_path>/app.py @my_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE
    PUT file:///<abs_path>/dependencies.zip @my_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE
    
    Copy
  2. Pour soumettre la tâche en utilisant les fichiers que vous avez importés dans la zone de préparation, utilisez la commande suivante :

    snowpark-submit \
      --py-files @my_stage/dependencies.zip \
      --snowflake-stage @my_stage \
      --snowflake-workload-name MY_JOB \
      --snowflake-connection-name MY_CONNECTION\
      --compute-pool MY_COMPUTE_POOL \
      @my_stage/app.py
    
    Copy

Surveillance avec attente et journaux

L’exemple suivant illustre comment soumettre une tâche, attendre son achèvement, puis récupérer les journaux.

  1. Soumettez la tâche et attendez qu’elle soit terminée à l’aide de la commande suivante :

    snowpark-submit \
      --snowflake-workload-name MY_JOB \
      --wait-for-completion \
      --snowflake-connection-name MY_CONNECTION \
      --compute-pool MY_COMPUTE_POOL \
      app.py
    
    Copy
  2. Si la tâche échoue, consultez les journaux détaillés à l’aide de la commande suivante :

    snowpark-submit
      --snowflake-workload-name MY_JOB
      --workload-status
      --display-logs
      --snowflake-connection-name MY_CONNECTION
    
    Copy

Utiliser Snowpark Submit dans un DAG Apache Airflow

Vous pouvez soumettre une tâche Spark à Snowflake via Snowpark Connect for Spark. Vous pouvez utiliser la commande snowpark-submit en mode cluster pour exploiter un pool de calcul afin d’exécuter la tâche.

Lorsque vous utilisez Apache Airflow de cette manière, assurez-vous que le service Docker ou le conteneur Snowpark Container Services qui exécute Apache Airflow a un accès approprié à Snowflake et aux fichiers requis dans la zone de préparation Snowflake.

Le code de l’exemple suivant effectue les tâches suivantes :

  • Crée un environnement virtuel Python dans /tmp/myenv.

    Dans la tâche create_venv, le code utilise pip pour installer le paquet snowpark-submit à l’aide d’un fichier .whl.

  • Génère un fichier connections.toml sécurisé avec des identifiants de connexion Snowflake et un jeton OAuth.

    Dans la tâche create_connections_toml, le code crée le répertoire /app/.snowflake, crée le fichier .toml, puis modifie les autorisations du fichier pour n’autoriser que le propriétaire (utilisateur) à disposer d’un accès en lecture et en écriture.

  • Exécute une tâche Spark en utilisant la commande snowpark-submit.

    Dans la tâche run_snowpark_script, le code effectue les opérations suivantes :

    • Active l’environnement virtuel.

    • Exécute la tâche Spark en utilisant la commande snowpark-submit.

    • Déploie sur Snowflake en utilisant le mode cluster.

    • Utilise l’URI distante sc://localhost:15002 de Snowpark Connect for Spark.

    • Spécifie la classe d’application Spark org.example.SnowparkConnectApp.

    • Extrait le script de la zone de préparation @snowflake_stage.

    • Bloque le déploiement jusqu’à la fin de la tâche en utilisant --wait-for-completion.

import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
from airflow.operators.trigger_dagrun import TriggerDagRunOperator


default_args = {
  'start_date': airflow.utils.dates.days_ago(1),
  'retries': 0,
}


with DAG(
  'run_sparkconnect_python_script',
  default_args=default_args,
  schedule_interval=None,
  catchup=False,
) as dag:

  create_venv = BashOperator(
      task_id='create_venv',
      bash_command="""
      python3 -m venv /tmp/myenv &&
      source /tmp/myenv/bin/activate &&
      export PIP_USER=false &&
      pip install --upgrade pip &&
      pip install --no-cache-dir grpcio-tools>=1.48.1 &&
      pip install /app/snowpark_submit-<version>.whl
      """
  )

  create_connections_toml = BashOperator(
      task_id='create_connections_toml',
      bash_command="""
      mkdir -p /app/.snowflake
      echo "${SNOWFLAKE_USER}"
      cat <<EOF > /app/.snowflake/connections.toml

[snowpark-submit]
host = "${SNOWFLAKE_HOST}"
port = "${SNOWFLAKE_PORT}"
protocol = "https"
account = "${SNOWFLAKE_ACCOUNT}"
authenticator = "oauth"
token = "$(cat /snowflake/session/token)"
warehouse = "airflow_wh"
database = "${SNOWFLAKE_DATABASE}"
schema = "${SNOWFLAKE_SCHEMA}"
client_session_keep_alive = true
EOF
  chmod 600 /app/.snowflake/connections.toml
  """
  )

  run_script = BashOperator(
      task_id='run_snowpark_script',
      bash_command="""
      set -e
      echo "Using SNOWFLAKE_HOME: $SNOWFLAKE_HOME"

      echo "Running Python script with Snowpark..."
      source /tmp/myenv/bin/activate &&
      snowpark-submit --deploy-mode cluster --class org.example.SnowparkConnectApp --compute-pool="snowparksubmit" --snowflake-workload-name="spcstest" --snowflake-stage="@AIRFLOW_APP_FILES" --wait-for-completion "@AIRFLOW_APP_FILES/transformation.py" --snowflake-connection-name snowpark-submit
      """,
      env={
          'SNOWFLAKE_HOME': '/app/.snowflake'
      }
  )

create_venv >> create_connections_toml >> run_script
Copy

Vous pouvez surveiller le DAG à l’aide de la vue graphique ou de la vue arborescente de l’interface utilisateur Apache Airflow. Inspectez les journaux des tâches pour les éléments suivants :

  • Configuration de l’environnement

  • Statut de Snowpark Connect for Spark

  • Sortie de tâche snowpark-submit

Vous pouvez également surveiller les tâches qui se sont exécutées dans Snowflake à partir des journaux stockés dans la zone de préparation Snowflake ou à partir des tables d’événements.