Exemples Snowpark Submit¶
Cette rubrique comprend des exemples utilisant Snowpark Submit pour soumettre des applications Spark prêtes pour la production.
Écrire et soumettre une application Spark simple¶
L’exemple suivant montre comment écrire et soumettre une application Spark simple sans dépendances.
Dans votre IDE local, créez un nouveau fichier Python appelé
app.pyavec le contenu suivant :from pyspark.sql import SparkSession from pyspark.sql.functions import col, lit, upper, concat # Create Spark session spark = SparkSession.builder.appName("SimpleSession").getOrCreate() # Create a DataFrame from inline data data = [ (1, "alice", "engineering", 95000), (2, "bob", "marketing", 72000), (3, "carol", "engineering", 105000), (4, "david", "sales", 68000), (5, "eva", "engineering", 88000), ] df = spark.createDataFrame(data, ["id", "name", "department", "salary"]) # Add a new column df_with_bonus = df.withColumn("bonus", col("salary") * 0.1) df_with_bonus.show() # Filter and transform engineers = df.filter(col("department") == "engineering") \ .withColumn("name_upper", upper(col("name"))) \ .withColumn("greeting", concat(lit("Hello, "), col("name"))) engineers.show() # Aggregate df.groupBy("department").avg("salary").show() # Stop the Spark session spark.stop()
Pour soumettre l’application, utilisez la commande suivante :
snowpark-submit \ --snowflake-workload-name MY_JOB \ --snowflake-connection-name MY_CONNECTION \ /path/to/app.py
Vous pouvez utiliser l’option
--wait-for-completionpour attendre que la tâche soit terminée, l’option--workload-statuspour vérifier le statut de la tâche et l’option:code:--display-logspour afficher les journaux de la tâche. Pour obtenir la liste complète des options, voir Référence Snowpark Submit.
Déployer une application à partir d’une zone de préparation Snowflake¶
Si l’application a des dépendances, comme des fichiers qu’elle doit lire, vous pouvez les déployer à partir d’une zone de préparation Snowflake. L’exemple suivant montre comment déployer une application et ses dépendances à partir d’une zone de préparation Snowflake.
Pour importer des fichiers vers une zone de préparation à partir du terminal, vous pouvez utiliser le CLI Snowflake. Remarque : SnowSQL est l’ancien CLI et si vous l’utilisez déjà, vous pouvez l’utiliser également pour importer des fichiers vers une zone de préparation. Si vous n’avez pas encore installé le CLI Snowflake, suivez les instructions d’installation suivantes : Installation de Snowflake CLI.
Créez un nouveau fichier CSV dans votre IDE local appelé
sample_employees.csvavec le contenu suivant :employee_id,name,department,salary,years_employed 1,Alice Johnson,Engineering,95000,5 2,Bob Smith,Marketing,72000,3 3,Carol Williams,Engineering,105000,8 4,David Brown,Sales,68000,2 5,Eva Martinez,Engineering,88000,4 6,Frank Wilson,Marketing,75000,6 7,Grace Lee,Sales,92000,7 8,Henry Taylor,Engineering,110000,10 9,Ivy Chen,Marketing,65000,1 10,Jack Davis,Sales,78000,4 11,Karen White,Engineering,98000,6 12,Leo Harris,Marketing,71000,3 13,Maria Garcia,Sales,85000,5 14,Nathan Clark,Engineering,102000,9 15,Olivia Moore,Marketing,69000,2
Importez vos fichiers de dépendances vers une zone de préparation en utilisant la commande suivante, où
my_stageest le nom d’une zone de préparation de votre compte. (Si vous n’avez pas de zone de préparation créée, vous pouvez utiliser [snow stage create](/developer-guide/snowflake-cli/command-reference/stage-commands/create).)snow stage copy sample_employees.csv @<database>.<schema>.<stage>/sample_employees.csv -c MY_CONNECTION
Pour vérifier que le fichier a bien été importé, vous pouvez utiliser la commande suivante pour répertorier les fichiers de la zone de préparation :
snow sql -c MY_CONNECTION -q "ls @<database>.<schema>.<stage>"
Vous devriez voir le fichier
sample_employees.csvdans la liste.Dans votre IDE local, créez un nouveau fichier Python appelé
app.pyavec le contenu suivant :from pyspark.sql import SparkSession # Create Spark session spark = SparkSession.builder.appName("SimpleStageExample").getOrCreate() # Load data from stage (adjust stage name to match yours) df = spark.read.csv("/app/<YOUR_STAGE>/sample_employees.csv", header=True, inferSchema=True) df.show() # Filter: Engineering department only engineers = df.filter(df["department"] == "Engineering") engineers.show() # Filter: Salary > 80000 and years_employed > 3 senior_high_earners = df.filter((df["salary"] > 80000) & (df["years_employed"] > 3)) senior_high_earners.show() # Aggregate: Average salary by department df.groupBy("department").avg("salary").show() # Select specific columns result = senior_high_earners.select("name", "department", "salary") result.show() # Stop the Spark session spark.stop()
Pour soumettre l’application qui utilise les fichiers que vous avez importés dans la zone de préparation, utilisez la commande suivante :
snowpark-submit \ --snowflake-connection-name MY_CONNECTION \ --snowflake-workload-name MY_JOB \ --snowflake-stage @<database>.<schema>.<stage> \ /path/to/app.py
Notez qu’un pool de calcul est nécessaire pour exécuter l’application et doit être soit spécifié dans le fichier
connections.tomlou sur la ligne de commande en utilisant l’option--compute-pool. Pour plus d’informations, voir Référence Snowpark Submit.
Surveillance avec attente et journaux¶
L’exemple suivant illustre comment soumettre une tâche, attendre son achèvement, puis récupérer les journaux.
Soumettez la tâche et attendez qu’elle soit terminée à l’aide de la commande suivante :
snowpark-submit \ --snowflake-workload-name MY_JOB \ --wait-for-completion \ --snowflake-connection-name MY_CONNECTION \ /path/to/app.py
Si la tâche échoue, consultez les journaux détaillés à l’aide de la commande suivante :
snowpark-submit --snowflake-workload-name MY_JOB \ --workload-status \ --display-logs \ --snowflake-connection-name MY_CONNECTION
Utiliser Snowpark Submit dans un DAG Apache Airflow¶
Vous pouvez soumettre une tâche Spark à Snowflake via Snowpark Connect for Spark. Vous pouvez utiliser la commande snowpark-submit en mode cluster pour exploiter un pool de calcul afin d’exécuter la tâche.
Lorsque vous utilisez Apache Airflow de cette manière, assurez-vous que le service Docker ou le conteneur Snowpark Container Services qui exécute Apache Airflow a un accès approprié à Snowflake et aux fichiers requis dans la zone de préparation Snowflake.
Le code de l’exemple suivant effectue les tâches suivantes :
Crée un environnement virtuel Python dans
/tmp/myenv.Dans la tâche
create_venv, le code utilisepippour installer le paquetsnowpark-submità l’aide d’un fichier.whl.Génère un fichier
connections.tomlsécurisé avec des identifiants de connexion Snowflake et un jeton OAuth.Dans la tâche
create_connections_toml, le code crée le répertoire/app/.snowflake, crée le fichier.toml, puis modifie les autorisations du fichier pour n’autoriser que le propriétaire (utilisateur) à disposer d’un accès en lecture et en écriture.Exécute une tâche Spark en utilisant la commande snowpark-submit.
Dans la tâche
run_snowpark_script, le code effectue les opérations suivantes :Active l’environnement virtuel.
Exécute la tâche Spark en utilisant la commande snowpark-submit.
Déploie sur Snowflake en utilisant le mode cluster.
Utilise l’URI distante sc://localhost:15002 de Snowpark Connect for Spark.
Spécifie la classe d’application Spark
org.example.SnowparkConnectApp.Extrait le script de la zone de préparation @snowflake_stage.
Bloque le déploiement jusqu’à la fin de la tâche en utilisant
--wait-for-completion.
import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
default_args = {
'start_date': airflow.utils.dates.days_ago(1),
'retries': 0,
}
with DAG(
'run_sparkconnect_python_script',
default_args=default_args,
schedule_interval=None,
catchup=False,
) as dag:
create_venv = BashOperator(
task_id='create_venv',
bash_command="""
python3 -m venv /tmp/myenv &&
source /tmp/myenv/bin/activate &&
export PIP_USER=false &&
pip install --upgrade pip &&
pip install --no-cache-dir grpcio-tools>=1.48.1 &&
pip install /app/snowpark_submit-<version>.whl
"""
)
create_connections_toml = BashOperator(
task_id='create_connections_toml',
bash_command="""
mkdir -p /app/.snowflake
echo "${SNOWFLAKE_USER}"
cat <<EOF > /app/.snowflake/connections.toml
[snowpark-submit]
host = "${SNOWFLAKE_HOST}"
port = "${SNOWFLAKE_PORT}"
protocol = "https"
account = "${SNOWFLAKE_ACCOUNT}"
authenticator = "oauth"
token = "$(cat /snowflake/session/token)"
warehouse = "airflow_wh"
database = "${SNOWFLAKE_DATABASE}"
schema = "${SNOWFLAKE_SCHEMA}"
client_session_keep_alive = true
EOF
chmod 600 /app/.snowflake/connections.toml
"""
)
run_script = BashOperator(
task_id='run_snowpark_script',
bash_command="""
set -e
echo "Using SNOWFLAKE_HOME: $SNOWFLAKE_HOME"
echo "Running Python script with Snowpark..."
source /tmp/myenv/bin/activate &&
snowpark-submit --deploy-mode cluster --class org.example.SnowparkConnectApp --compute-pool="snowparksubmit" --snowflake-workload-name="spcstest" --snowflake-stage="@AIRFLOW_APP_FILES" --wait-for-completion "@AIRFLOW_APP_FILES/transformation.py" --snowflake-connection-name snowpark-submit
""",
env={
'SNOWFLAKE_HOME': '/app/.snowflake'
}
)
create_venv >> create_connections_toml >> run_script
Vous pouvez surveiller le DAG à l’aide de la vue graphique ou de la vue arborescente de l’interface utilisateur Apache Airflow. Inspectez les journaux des tâches pour les éléments suivants :
Configuration de l’environnement
Statut de Snowpark Connect for Spark
Sortie de tâche snowpark-submit
Vous pouvez également surveiller les tâches qui se sont exécutées dans Snowflake à partir des journaux stockés dans la zone de préparation Snowflake ou à partir des tables d’événements.