Ausführen von Spark-Workloads von einem Drittanbieter-Client aus

Sie können Spark-Workloads interaktiv über Jupyter Notebooks, VS Code oder eine beliebige Python-basierte Weboberfläche ausführen, ohne dass ein Spark-Cluster verwaltet werden muss. Die Workloads werden auf der Snowflake-Infrastruktur ausgeführt.

Sie können zum Beispiel folgende Aufgaben ausführen:

  1. Stellen Sie sicher, dass Sie die Voraussetzungen haben.

  2. Richten Sie Ihre Umgebung für die Verbindung mit Snowpark Connect for Spark auf Snowflake ein.

  3. Installieren Sie Snowpark Connect for Spark.

  4. Führen Sie PySpark-Code von Ihrem Client aus in Snowflake aus.

Voraussetzungen

Prüfen Sie, ob Ihre Python- und Java-Installationen auf derselben Computerarchitektur basieren. Wenn Python beispielsweise auf arm64 basiert, muss Java ebenfalls darauf basieren (und nicht z. B. auf x86_64).

Einrichten Ihrer Umgebung

Sie können Ihre Entwicklungsumgebung einrichten, indem Sie dafür sorgen, dass Ihr Code mit Snowpark Connect for Spark auf Snowflake verbunden werden kann. Um eine Verbindung zu Snowflake herzustellen, verwendet der Clientcode eine .toml-Datei mit Verbindungsdetails.

Wenn Sie Snowflake CLI installiert haben, können Sie damit eine Verbindung definieren. Andernfalls können Sie die Verbindungsparameter manuell in eine config.toml-Datei schreiben.

Hinzufügen von Verbindungen mit Snowflake CLI

Sie können mit Snowflake CLI Verbindungseigenschaften hinzufügen, mit denen Snowpark Connect for Spark eine Verbindung zu Snowflake herstellen kann. Ihre Änderungen werden in einer config.toml-Datei gespeichert.

  1. Führen Sie den folgenden Befehl aus, um eine Verbindung über den snow connection-Befehl :add hinzuzufügen.

    snow connection add
    
    Copy
  2. Folgen Sie den Aufforderungen, um eine Verbindung zu definieren.

    Geben Sie spark-connect als Verbindungsnamen an.

    Dieser Befehl fügt eine Verbindung zu Ihrer:file:config.toml-Datei hinzu, wie im folgenden Beispiel:

    [connections.spark-connect]
    host = "example.snowflakecomputing.com"
    port = 443
    account = "example"
    user = "test_example"
    password = "password"
    protocol = "https"
    warehouse = "example_wh"
    database = "example_db"
    schema = "public"
    
    Copy
  3. Führen Sie den folgenden Befehl aus, um zu prüfen, ob die Verbindung funktioniert.

    Sie können die Verbindung auf diese Weise testen, nachdem Sie sie mit Snowflake CLI hinzugefügt haben.

    snow connection list
    snow connection test --connection spark-connect
    
    Copy

Hinzufügen einer Verbindung durch manuelles Schreiben einer Verbindungsdatei

Sie können eine connections.toml-Datei manuell schreiben oder aktualisieren, damit Ihr Code eine Verbindung zu Snowpark Connect for Spark auf Snowflake herstellen kann.

  1. Führen Sie den folgenden Befehl aus, damit Ihre connections.toml-Datei nur dem Eigentümer (Benutzer) Lese- und Schreibzugriff gewährt.

    chmod 0600 "~/.snowflake/connections.toml"
    
    Copy
  2. Bearbeiten Sie die connections.toml-Datei so, dass sie eine [spark-connect]-Verbindung mit den Verbindungseigenschaften im folgenden Beispiel enthält.

    Ersetzen Sie die Werte durch Ihre eigenen Verbindungsspezifikationen.

    [spark-connect]
    host="my_snowflake_account.snowflakecomputing.com"
    account="my_snowflake_account"
    user="my_user"
    password="&&&&&&&&"
    warehouse="my_wh"
    database="my_db"
    schema="public"
    
    Copy

Snowpark Connect for Spark installieren

Sie können Snowpark Connect for Spark als Python-Paket installieren.

  1. Erstellen Sie eine virtuelle Umgebung für Python.

    Sie können zum Beispiel Conda verwenden, wie im folgenden Beispiel.

    conda create -n xxxx pip python=3.12
    conda activate xxxx
    
    Copy
  2. Installieren Sie das Snowpark Connect for Spark-Paket.

    pip install --upgrade --force-reinstall snowpark-connect
    
    Copy
  3. Fügen Sie Python-Code hinzu, um einen|spconnect|-Server zu starten, und erstellen Sie eine Snowpark Connect for Spark-Sitzung.

    import os
    import snowflake.snowpark
    from snowflake import snowpark_connect
    # Import snowpark_connect before importing pyspark libraries
    from pyspark.sql.types import Row
    
    os.environ["SPARK_CONNECT_MODE_ENABLED"] = "1"
    snowpark_connect.start_session()  # Start the local Snowpark Connect for Spark session
    spark = snowpark_connect.get_session()
    
    Copy

Run Python code from your client

Once you have an authenticated connection in place, you can write code as you normally would.

You can run PySpark code that connects to Snowpark Connect for Spark by using the PySpark client library.

from pyspark.sql import Row

  df = spark.createDataFrame([
      Row(a=1, b=2.),
      Row(a=2, b=3.),
      Row(a=4, b=5.),])

  print(df.count())
Copy

Run Scala code from your client

You can run Scala applications that connect to Snowpark Connect for Spark by using the Spark Connect client library.

This guide walks you through setting up Snowpark Connect and connecting your Scala applications to the Snowpark Connect for Spark server.

Step 1: Set up your Snowpark Connect for Spark environment

Set up your environment by using steps described in the following topics:

  1. Create a Python virtual environment and install Snowpark Connect.

  2. Set up a connection.

Step 2: Create a Snowpark Connect for Spark server script and launch the server

  1. Create a Python script to launch the Snowpark Connect for Spark server.

    # launch-snowpark-connect.py
    
    from snowflake import snowpark_connect
    
    def main():
        snowpark_connect.start_session(is_daemon=False, remote_url="sc://localhost:15002")
        print("SAS started on port 15002")
    
    if __name__ == "__main__":
        main()
    
    Copy
  2. Launch the Snowpark Connect for Spark server.

    # Make sure you're in the correct Python environment
    pyenv activate your-snowpark-connect-env
    
    # Run the server script
    python launch-snowpark-connect.py
    
    Copy

Step 3: Set up your Scala application

  1. Add the Spark Connect client dependency to your build.sbt file.

    libraryDependencies += "org.apache.spark" %% "spark-connect-client-jvm" % "3.5.3"
    
    // Add JVM options for Java 9+ module system compatibility
    javaOptions ++= Seq(
      "--add-opens=java.base/java.nio=ALL-UNNAMED"
    )
    
    Copy
  2. Execute Scala code to connect to the Snowpark Connect for Spark server.

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.connect.client.REPLClassDirMonitor
    
    object SnowparkConnectExample {
      def main(args: Array[String]): Unit = {
        // Create Spark session with Snowpark Connect
        val spark = SparkSession.builder().remote("sc://localhost:15002").getOrCreate()
    
        // Register ClassFinder for UDF support (if needed)
        // val classFinder = new REPLClassDirMonitor("target/scala-2.12/classes")
        // spark.registerClassFinder(classFinder)
    
        try {
          // Simple DataFrame operations
          import spark.implicits._
    
          val data = Seq(
            (1, "Alice", 25),
            (2, "Bob", 30),
            (3, "Charlie", 35)
          )
    
          val df = spark.createDataFrame(data).toDF("id", "name", "age")
    
          println("Original DataFrame:")
          df.show()
    
          println("Filtered DataFrame (age > 28):")
          df.filter($"age" > 28).show()
    
          println("Aggregated result:")
          df.groupBy().avg("age").show()
    
        } finally {
          spark.stop()
        }
      }
    }
    
    Copy
  3. Compile and run your application.

    # Compile your Scala application
    sbt compile
    
    # Run the application
    sbt "runMain SnowparkConnectExample"
    
    Copy

Scala UDF support on Snowpark Connect for Spark

When using user-defined functions or custom code, do one of the following:

  • Register a class finder to monitor and upload class files.

    import org.apache.spark.sql.connect.client.REPLClassDirMonitor
    
    val classFinder = new REPLClassDirMonitor("/absolute/path/to/target/scala-2.12/classes")
    spark.registerClassFinder(classFinder)
    
    Copy
  • Upload JAR dependencies if needed.

    spark.addArtifact("/absolute/path/to/dependency.jar")
    
    Copy

Problembehandlung bei einer Snowpark Connect for Spark-Installation

With the following list of checks, you can troubleshoot Snowpark Connect for Spark installation and use.

  • Stellen Sie sicher, dass Java und Python auf der gleichen Architektur basieren.

  • Use the most recent Snowpark Connect for Spark package file, as described in Snowpark Connect for Spark installieren.

  • Prüfen Sie, ob der python-Befehl mit PySpark-Code korrekt für die lokale Ausführung funktioniert, d. h. ohne Snowflake-Konnektivität.

    Führen Sie beispielsweise einen Befehl wie den folgenden aus:

    python your_pyspark_file.py
    
    Copy