Snowpark Submit-Beispiele¶
Dieses Thema enthält Beispiele, die mit Snowpark Submit produktionsbereite Spark-Anwendungen übermitteln.
Eine einfache Spark-Anwendung schreiben und übermitteln¶
Das folgende Beispiel zeigt, wie Sie eine einfache Spark-Anwendung ohne Abhängigkeiten schreiben und übermitteln.
Erstellen Sie in Ihrer lokalen IDE eine neue Python-Datei namens
app.pymit folgendem Inhalt:from pyspark.sql import SparkSession from pyspark.sql.functions import col, lit, upper, concat # Create Spark session spark = SparkSession.builder.appName("SimpleSession").getOrCreate() # Create a DataFrame from inline data data = [ (1, "alice", "engineering", 95000), (2, "bob", "marketing", 72000), (3, "carol", "engineering", 105000), (4, "david", "sales", 68000), (5, "eva", "engineering", 88000), ] df = spark.createDataFrame(data, ["id", "name", "department", "salary"]) # Add a new column df_with_bonus = df.withColumn("bonus", col("salary") * 0.1) df_with_bonus.show() # Filter and transform engineers = df.filter(col("department") == "engineering") \ .withColumn("name_upper", upper(col("name"))) \ .withColumn("greeting", concat(lit("Hello, "), col("name"))) engineers.show() # Aggregate df.groupBy("department").avg("salary").show() # Stop the Spark session spark.stop()
Um die Anwendung zu übermitteln, verwenden Sie den folgenden Befehl:
snowpark-submit \ --snowflake-workload-name MY_JOB \ --snowflake-connection-name MY_CONNECTION \ /path/to/app.py
Sie können die
--wait-for-completion-Option verwenden, um auf die abschließende Ausführung des Jobs zu warten, die--workload-status-Option, um den Status des Jobs zu prüfen, und die--display-logs-Option, um die Protokolle des Jobs anzuzeigen. Eine vollständige Liste der Optionen finden Sie unter Snowpark Submit-Referenz.
Bereitstellen einer Anwendung aus einem Snowflake-Stagingbereich heraus¶
Wenn die Anwendung Abhängigkeiten hat, wie z. B. Dateien, die sie lesen muss, können Sie diese über einen Snowflake-Stagingbereich bereitstellen. Das folgende Beispiel zeigt, wie Sie eine Anwendung und ihre Abhängigkeiten aus einem Snowflake-Stagingbereich bereitstellen.
Um Dateien vom Terminal in einen Stagingbereich hochzuladen, können Sie die Snowflake-CLI verwenden. Beachten Sie, dass SnowSQL die alte CLI ist, und wenn Sie sie bereits verwenden, können Sie diese auch verwenden, um Dateien in einen Stagingbereich hochzuladen. Wenn Sie die Snowflake-CLI noch nicht installiert haben, können Sie sie installieren, indem Sie die Anweisungen unter Installieren von Snowflake CLI befolgen.
Erstellen Sie eine neue CSV-Datei in Ihrer lokalen IDE namens
sample_employees.csvmit folgendem Inhalt:employee_id,name,department,salary,years_employed 1,Alice Johnson,Engineering,95000,5 2,Bob Smith,Marketing,72000,3 3,Carol Williams,Engineering,105000,8 4,David Brown,Sales,68000,2 5,Eva Martinez,Engineering,88000,4 6,Frank Wilson,Marketing,75000,6 7,Grace Lee,Sales,92000,7 8,Henry Taylor,Engineering,110000,10 9,Ivy Chen,Marketing,65000,1 10,Jack Davis,Sales,78000,4 11,Karen White,Engineering,98000,6 12,Leo Harris,Marketing,71000,3 13,Maria Garcia,Sales,85000,5 14,Nathan Clark,Engineering,102000,9 15,Olivia Moore,Marketing,69000,2
Laden Sie Ihre Abhängigkeitsdateien in einen Stagingbereich hoch, indem Sie den folgenden Befehl verwenden, wobei
my_stageder Name eines Stagingbereichs in Ihrem Konto ist. (Wenn Sie keinen Stagingbereich erstellt haben, können Sie [snow stage create](/developer-guide/snowflake-cli/command-reference/stage-commands/create).) verwenden.snow stage copy sample_employees.csv @<database>.<schema>.<stage>/sample_employees.csv -c MY_CONNECTION
Um zu überprüfen, ob die Datei erfolgreich hochgeladen wurde, können Sie den folgenden Befehl verwenden, um die im Stagingbereich befindlichen Dateien aufzulisten:
snow sql -c MY_CONNECTION -q "ls @<database>.<schema>.<stage>"
Sie sollten die Datei
sample_employees.csvin der Liste sehen.Erstellen Sie in Ihrer lokalen IDE eine neue Python-Datei namens
app.pymit folgendem Inhalt:from pyspark.sql import SparkSession # Create Spark session spark = SparkSession.builder.appName("SimpleStageExample").getOrCreate() # Load data from stage (adjust stage name to match yours) df = spark.read.csv("/app/<YOUR_STAGE>/sample_employees.csv", header=True, inferSchema=True) df.show() # Filter: Engineering department only engineers = df.filter(df["department"] == "Engineering") engineers.show() # Filter: Salary > 80000 and years_employed > 3 senior_high_earners = df.filter((df["salary"] > 80000) & (df["years_employed"] > 3)) senior_high_earners.show() # Aggregate: Average salary by department df.groupBy("department").avg("salary").show() # Select specific columns result = senior_high_earners.select("name", "department", "salary") result.show() # Stop the Spark session spark.stop()
Um die Anwendung zu übermitteln, die die Dateien verwendet, die Sie in den Stagingbereich hochgeladen haben, verwenden Sie den folgenden Befehl:
snowpark-submit \ --snowflake-connection-name MY_CONNECTION \ --snowflake-workload-name MY_JOB \ --snowflake-stage @<database>.<schema>.<stage> \ /path/to/app.py
Beachten Sie, dass ein Computepool erforderlich ist, um die Anwendung auszuführen. Dieser muss entweder in der Datei
connections.tomloder in der Befehlszeile mit der Option--compute-poolangegeben sein. Weitere Informationen dazu finden Sie unter Snowpark Submit-Referenz.
Überwachen mit Warten und Protokollen¶
Das folgende Beispiel zeigt, wie Sie einen Job übermitteln, auf dessen Beendigung warten und dann Protokolle abrufen.
Übermitteln Sie den Job, und warten Sie auf die Fertigstellung, indem Sie den folgenden Befehl verwenden:
snowpark-submit \ --snowflake-workload-name MY_JOB \ --wait-for-completion \ --snowflake-connection-name MY_CONNECTION \ /path/to/app.py
Wenn der Job fehlschlägt, überprüfen Sie die detaillierten Protokolleinträge mit dem folgenden Befehl:
snowpark-submit --snowflake-workload-name MY_JOB \ --workload-status \ --display-logs \ --snowflake-connection-name MY_CONNECTION
Verwenden Sie Snowpark Submit in einem Apache Airflow DAG¶
Sie können einen Spark-Job über Snowpark Connect for Spark an Snowflake senden. Sie können snowpark-submit im Clustermodus verwenden, um einen Computepool zur Ausführung des Jobs zu nutzen.
Wenn Sie Apache Airflow auf diese Weise verwenden, muss der Docker-Dienst oder der Snowpark Container Services-Container, auf dem Apache Airflow ausgeführt wird, ordnungsgemäßen Zugriff auf Snowflake und die erforderlichen Dateien im Snowflake-Stagingbereich haben.
Mit dem Code im folgenden Beispiel wird die folgende Aufgabe ausgeführt:
Erstellt eine virtuelle Python-Umgebung unter
/tmp/myenv.In der
create_venv-Aufgabe verwendet der Codepipzur Installation dessnowpark-submit-Pakets mithilfe einer.whl-Datei.Erzeugt eine sichere
connections.toml-Datei mit Snowflake-Verbindungsdaten und einem OAuth-Token.In der
create_connections_toml-Aufgabe erstellt der Code das Verzeichnis/app/.snowflake, erstellt die.toml-Datei und ändert dann die Dateiberechtigungen so, dass nur der Eigentümer (Benutzer) Lese- und Schreibzugriff hat.Führt einen Spark-Job mit dem Befehl snowpark-submit aus.
In der Aufgabe
run_snowpark_scriptmacht der Code Folgendes:Aktiviert die virtuelle Umgebung.
Führt den Spark-Job mit dem Befehl snowpark-submit aus.
Führt eine Bereitstellung in Snowflake im Clustermodus durch.
Es wird die Snowpark Connect for Spark-Remote-URI sc://localhost:15002 verwendet.
Gibt die Spark-Anwendungsklasse
org.example.SnowparkConnectAppan.Ruft das Skript aus dem Stagingbereich @snowflake_stage ab.
Blockiert mit
--wait-for-completiondie Bereitstellung, bis der Job beendet ist.
import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
default_args = {
'start_date': airflow.utils.dates.days_ago(1),
'retries': 0,
}
with DAG(
'run_sparkconnect_python_script',
default_args=default_args,
schedule_interval=None,
catchup=False,
) as dag:
create_venv = BashOperator(
task_id='create_venv',
bash_command="""
python3 -m venv /tmp/myenv &&
source /tmp/myenv/bin/activate &&
export PIP_USER=false &&
pip install --upgrade pip &&
pip install --no-cache-dir grpcio-tools>=1.48.1 &&
pip install /app/snowpark_submit-<version>.whl
"""
)
create_connections_toml = BashOperator(
task_id='create_connections_toml',
bash_command="""
mkdir -p /app/.snowflake
echo "${SNOWFLAKE_USER}"
cat <<EOF > /app/.snowflake/connections.toml
[snowpark-submit]
host = "${SNOWFLAKE_HOST}"
port = "${SNOWFLAKE_PORT}"
protocol = "https"
account = "${SNOWFLAKE_ACCOUNT}"
authenticator = "oauth"
token = "$(cat /snowflake/session/token)"
warehouse = "airflow_wh"
database = "${SNOWFLAKE_DATABASE}"
schema = "${SNOWFLAKE_SCHEMA}"
client_session_keep_alive = true
EOF
chmod 600 /app/.snowflake/connections.toml
"""
)
run_script = BashOperator(
task_id='run_snowpark_script',
bash_command="""
set -e
echo "Using SNOWFLAKE_HOME: $SNOWFLAKE_HOME"
echo "Running Python script with Snowpark..."
source /tmp/myenv/bin/activate &&
snowpark-submit --deploy-mode cluster --class org.example.SnowparkConnectApp --compute-pool="snowparksubmit" --snowflake-workload-name="spcstest" --snowflake-stage="@AIRFLOW_APP_FILES" --wait-for-completion "@AIRFLOW_APP_FILES/transformation.py" --snowflake-connection-name snowpark-submit
""",
env={
'SNOWFLAKE_HOME': '/app/.snowflake'
}
)
create_venv >> create_connections_toml >> run_script
Sie können DAG mithilfe der Task-Graph-Ansicht oder der Strukturbaum-Ansicht der Apache Airflow-Benutzeroberfläche überwachen. Überprüfen Sie die Aufgabenprotokolle auf die folgenden Elemente:
Einrichtung der Umgebung
Status von Snowpark Connect for Spark
Ausgabe des Jobs mit snowpark-submit
Sie können Jobs, die in Snowflake ausgeführt wurden, auch anhand der im Snowflake-Stagingbereich gespeicherten Protokolle oder anhand von Ereignistabellen überwachen.