Snowpark Submit-Beispiele

Dieses Thema enthält Beispiele, die mit Snowpark Submit produktionsbereite Spark-Anwendungen übermitteln.

Eine einfache Spark-Anwendung schreiben und übermitteln

Das folgende Beispiel zeigt, wie Sie eine einfache Spark-Anwendung ohne Abhängigkeiten schreiben und übermitteln.

  1. Erstellen Sie in Ihrer lokalen IDE eine neue Python-Datei namens app.py mit folgendem Inhalt:

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, lit, upper, concat
    
    # Create Spark session
    spark = SparkSession.builder.appName("SimpleSession").getOrCreate()
    
    # Create a DataFrame from inline data
    data = [
        (1, "alice", "engineering", 95000),
        (2, "bob", "marketing", 72000),
        (3, "carol", "engineering", 105000),
        (4, "david", "sales", 68000),
        (5, "eva", "engineering", 88000),
    ]
    df = spark.createDataFrame(data, ["id", "name", "department", "salary"])
    
    # Add a new column
    df_with_bonus = df.withColumn("bonus", col("salary") * 0.1)
    df_with_bonus.show()
    
    # Filter and transform
    engineers = df.filter(col("department") == "engineering") \
        .withColumn("name_upper", upper(col("name"))) \
        .withColumn("greeting", concat(lit("Hello, "), col("name")))
    engineers.show()
    
    # Aggregate
    df.groupBy("department").avg("salary").show()
    
    # Stop the Spark session
    spark.stop()
    
    Copy
  2. Um die Anwendung zu übermitteln, verwenden Sie den folgenden Befehl:

    snowpark-submit \
      --snowflake-workload-name MY_JOB \
      --snowflake-connection-name MY_CONNECTION \
      /path/to/app.py
    
    Copy

    Sie können die --wait-for-completion-Option verwenden, um auf die abschließende Ausführung des Jobs zu warten, die --workload-status-Option, um den Status des Jobs zu prüfen, und die --display-logs-Option, um die Protokolle des Jobs anzuzeigen. Eine vollständige Liste der Optionen finden Sie unter Snowpark Submit-Referenz.

Bereitstellen einer Anwendung aus einem Snowflake-Stagingbereich heraus

Wenn die Anwendung Abhängigkeiten hat, wie z. B. Dateien, die sie lesen muss, können Sie diese über einen Snowflake-Stagingbereich bereitstellen. Das folgende Beispiel zeigt, wie Sie eine Anwendung und ihre Abhängigkeiten aus einem Snowflake-Stagingbereich bereitstellen.

  1. Um Dateien vom Terminal in einen Stagingbereich hochzuladen, können Sie die Snowflake-CLI verwenden. Beachten Sie, dass SnowSQL die alte CLI ist, und wenn Sie sie bereits verwenden, können Sie diese auch verwenden, um Dateien in einen Stagingbereich hochzuladen. Wenn Sie die Snowflake-CLI noch nicht installiert haben, können Sie sie installieren, indem Sie die Anweisungen unter Installieren von Snowflake CLI befolgen.

  2. Erstellen Sie eine neue CSV-Datei in Ihrer lokalen IDE namens sample_employees.csv mit folgendem Inhalt:

    employee_id,name,department,salary,years_employed
    1,Alice Johnson,Engineering,95000,5
    2,Bob Smith,Marketing,72000,3
    3,Carol Williams,Engineering,105000,8
    4,David Brown,Sales,68000,2
    5,Eva Martinez,Engineering,88000,4
    6,Frank Wilson,Marketing,75000,6
    7,Grace Lee,Sales,92000,7
    8,Henry Taylor,Engineering,110000,10
    9,Ivy Chen,Marketing,65000,1
    10,Jack Davis,Sales,78000,4
    11,Karen White,Engineering,98000,6
    12,Leo Harris,Marketing,71000,3
    13,Maria Garcia,Sales,85000,5
    14,Nathan Clark,Engineering,102000,9
    15,Olivia Moore,Marketing,69000,2
    
    Copy

    Laden Sie Ihre Abhängigkeitsdateien in einen Stagingbereich hoch, indem Sie den folgenden Befehl verwenden, wobei my_stage der Name eines Stagingbereichs in Ihrem Konto ist. (Wenn Sie keinen Stagingbereich erstellt haben, können Sie [snow stage create](/developer-guide/snowflake-cli/command-reference/stage-commands/create).) verwenden.

    snow stage copy sample_employees.csv @<database>.<schema>.<stage>/sample_employees.csv -c MY_CONNECTION
    
    Copy

    Um zu überprüfen, ob die Datei erfolgreich hochgeladen wurde, können Sie den folgenden Befehl verwenden, um die im Stagingbereich befindlichen Dateien aufzulisten:

    snow sql -c MY_CONNECTION -q "ls @<database>.<schema>.<stage>"
    
    Copy

    Sie sollten die Datei sample_employees.csv in der Liste sehen.

  3. Erstellen Sie in Ihrer lokalen IDE eine neue Python-Datei namens app.py mit folgendem Inhalt:

    from pyspark.sql import SparkSession
    
    # Create Spark session
    spark = SparkSession.builder.appName("SimpleStageExample").getOrCreate()
    
    # Load data from stage (adjust stage name to match yours)
    df = spark.read.csv("/app/<YOUR_STAGE>/sample_employees.csv", header=True, inferSchema=True)
    df.show()
    
    # Filter: Engineering department only
    engineers = df.filter(df["department"] == "Engineering")
    engineers.show()
    
    # Filter: Salary > 80000 and years_employed > 3
    senior_high_earners = df.filter((df["salary"] > 80000) & (df["years_employed"] > 3))
    senior_high_earners.show()
    
    # Aggregate: Average salary by department
    df.groupBy("department").avg("salary").show()
    
    # Select specific columns
    result = senior_high_earners.select("name", "department", "salary")
    result.show()
    
    # Stop the Spark session
    spark.stop()
    
    Copy

    Um die Anwendung zu übermitteln, die die Dateien verwendet, die Sie in den Stagingbereich hochgeladen haben, verwenden Sie den folgenden Befehl:

    snowpark-submit \
      --snowflake-connection-name MY_CONNECTION \
      --snowflake-workload-name MY_JOB \
      --snowflake-stage @<database>.<schema>.<stage> \
      /path/to/app.py
    
    Copy

    Beachten Sie, dass ein Computepool erforderlich ist, um die Anwendung auszuführen. Dieser muss entweder in der Datei connections.toml oder in der Befehlszeile mit der Option --compute-pool angegeben sein. Weitere Informationen dazu finden Sie unter Snowpark Submit-Referenz.

Überwachen mit Warten und Protokollen

Das folgende Beispiel zeigt, wie Sie einen Job übermitteln, auf dessen Beendigung warten und dann Protokolle abrufen.

  1. Übermitteln Sie den Job, und warten Sie auf die Fertigstellung, indem Sie den folgenden Befehl verwenden:

    snowpark-submit \
      --snowflake-workload-name MY_JOB \
      --wait-for-completion \
      --snowflake-connection-name MY_CONNECTION \
      /path/to/app.py
    
    Copy
  2. Wenn der Job fehlschlägt, überprüfen Sie die detaillierten Protokolleinträge mit dem folgenden Befehl:

    snowpark-submit
      --snowflake-workload-name MY_JOB \
      --workload-status \
      --display-logs \
      --snowflake-connection-name MY_CONNECTION
    
    Copy

Verwenden Sie Snowpark Submit in einem Apache Airflow DAG

Sie können einen Spark-Job über Snowpark Connect for Spark an Snowflake senden. Sie können snowpark-submit im Clustermodus verwenden, um einen Computepool zur Ausführung des Jobs zu nutzen.

Wenn Sie Apache Airflow auf diese Weise verwenden, muss der Docker-Dienst oder der Snowpark Container Services-Container, auf dem Apache Airflow ausgeführt wird, ordnungsgemäßen Zugriff auf Snowflake und die erforderlichen Dateien im Snowflake-Stagingbereich haben.

Mit dem Code im folgenden Beispiel wird die folgende Aufgabe ausgeführt:

  • Erstellt eine virtuelle Python-Umgebung unter /tmp/myenv.

    In der create_venv-Aufgabe verwendet der Code pip zur Installation des snowpark-submit-Pakets mithilfe einer .whl-Datei.

  • Erzeugt eine sichere connections.toml-Datei mit Snowflake-Verbindungsdaten und einem OAuth-Token.

    In der create_connections_toml-Aufgabe erstellt der Code das Verzeichnis /app/.snowflake, erstellt die .toml-Datei und ändert dann die Dateiberechtigungen so, dass nur der Eigentümer (Benutzer) Lese- und Schreibzugriff hat.

  • Führt einen Spark-Job mit dem Befehl snowpark-submit aus.

    In der Aufgabe run_snowpark_script macht der Code Folgendes:

    • Aktiviert die virtuelle Umgebung.

    • Führt den Spark-Job mit dem Befehl snowpark-submit aus.

    • Führt eine Bereitstellung in Snowflake im Clustermodus durch.

    • Es wird die Snowpark Connect for Spark-Remote-URI sc://localhost:15002 verwendet.

    • Gibt die Spark-Anwendungsklasse org.example.SnowparkConnectApp an.

    • Ruft das Skript aus dem Stagingbereich @snowflake_stage ab.

    • Blockiert mit --wait-for-completion die Bereitstellung, bis der Job beendet ist.

import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
from airflow.operators.trigger_dagrun import TriggerDagRunOperator


default_args = {
  'start_date': airflow.utils.dates.days_ago(1),
  'retries': 0,
}


with DAG(
  'run_sparkconnect_python_script',
  default_args=default_args,
  schedule_interval=None,
  catchup=False,
) as dag:

  create_venv = BashOperator(
      task_id='create_venv',
      bash_command="""
      python3 -m venv /tmp/myenv &&
      source /tmp/myenv/bin/activate &&
      export PIP_USER=false &&
      pip install --upgrade pip &&
      pip install --no-cache-dir grpcio-tools>=1.48.1 &&
      pip install /app/snowpark_submit-<version>.whl
      """
  )

  create_connections_toml = BashOperator(
      task_id='create_connections_toml',
      bash_command="""
      mkdir -p /app/.snowflake
      echo "${SNOWFLAKE_USER}"
      cat <<EOF > /app/.snowflake/connections.toml

[snowpark-submit]
host = "${SNOWFLAKE_HOST}"
port = "${SNOWFLAKE_PORT}"
protocol = "https"
account = "${SNOWFLAKE_ACCOUNT}"
authenticator = "oauth"
token = "$(cat /snowflake/session/token)"
warehouse = "airflow_wh"
database = "${SNOWFLAKE_DATABASE}"
schema = "${SNOWFLAKE_SCHEMA}"
client_session_keep_alive = true
EOF
  chmod 600 /app/.snowflake/connections.toml
  """
  )

  run_script = BashOperator(
      task_id='run_snowpark_script',
      bash_command="""
      set -e
      echo "Using SNOWFLAKE_HOME: $SNOWFLAKE_HOME"

      echo "Running Python script with Snowpark..."
      source /tmp/myenv/bin/activate &&
      snowpark-submit --deploy-mode cluster --class org.example.SnowparkConnectApp --compute-pool="snowparksubmit" --snowflake-workload-name="spcstest" --snowflake-stage="@AIRFLOW_APP_FILES" --wait-for-completion "@AIRFLOW_APP_FILES/transformation.py" --snowflake-connection-name snowpark-submit
      """,
      env={
          'SNOWFLAKE_HOME': '/app/.snowflake'
      }
  )

create_venv >> create_connections_toml >> run_script
Copy

Sie können DAG mithilfe der Task-Graph-Ansicht oder der Strukturbaum-Ansicht der Apache Airflow-Benutzeroberfläche überwachen. Überprüfen Sie die Aufgabenprotokolle auf die folgenden Elemente:

  • Einrichtung der Umgebung

  • Status von Snowpark Connect for Spark

  • Ausgabe des Jobs mit snowpark-submit

Sie können Jobs, die in Snowflake ausgeführt wurden, auch anhand der im Snowflake-Stagingbereich gespeicherten Protokolle oder anhand von Ereignistabellen überwachen.