Spark-Workloads über VS Code, Jupyter Notebooks oder ein -Terminal ausführen

Sie können Spark-Workloads interaktiv über Jupyter Notebooks, VS Code oder eine beliebige Python-basierte Weboberfläche ausführen, ohne dass ein Spark-Cluster verwaltet werden muss. Die Workloads werden auf der Snowflake-Infrastruktur ausgeführt.

Sie können zum Beispiel folgende Aufgaben ausführen:

  1. Stellen Sie sicher, dass Sie die Voraussetzungen haben.

  2. Richten Sie Ihre Umgebung für die Verbindung mit Snowpark Connect for Spark auf Snowflake ein.

  3. Installieren Sie Snowpark Connect for Spark.

  4. Führen Sie PySpark-Code von Ihrem Client aus in Snowflake aus.

Voraussetzungen

Prüfen Sie, ob Ihre Python- und Java-Installationen auf derselben Computerarchitektur basieren. Wenn Python beispielsweise auf arm64 basiert, muss Java ebenfalls darauf basieren (und nicht z. B. auf x86_64).

Einrichten Ihrer Umgebung

Sie können Ihre Entwicklungsumgebung einrichten, indem Sie dafür sorgen, dass Ihr Code mit Snowpark Connect for Spark auf Snowflake verbunden werden kann. Um eine Verbindung zu Snowflake herzustellen, verwendet der Clientcode eine .toml-Datei mit Verbindungsdetails.

Wenn Sie Snowflake CLI installiert haben, können Sie damit eine Verbindung definieren. Andernfalls können Sie die Verbindungsparameter manuell in eine config.toml-Datei schreiben.

Hinzufügen von Verbindungen mit Snowflake CLI

Sie können mit Snowflake CLI Verbindungseigenschaften hinzufügen, mit denen Snowpark Connect for Spark eine Verbindung zu Snowflake herstellen kann. Ihre Änderungen werden in einer config.toml-Datei gespeichert.

  1. Führen Sie den folgenden Befehl aus, um eine Verbindung über den snow connection-Befehl :add hinzuzufügen.

    snow connection add
    
    Copy
  2. Folgen Sie den Aufforderungen, um eine Verbindung zu definieren.

    Geben Sie spark-connect als Verbindungsnamen an.

    Dieser Befehl fügt eine Verbindung zu Ihrer:file:config.toml-Datei hinzu, wie im folgenden Beispiel:

    [connections.spark-connect]
    host = "example.snowflakecomputing.com"
    port = 443
    account = "example"
    user = "test_example"
    password = "password"
    protocol = "https"
    warehouse = "example_wh"
    database = "example_db"
    schema = "public"
    
    Copy
  3. Führen Sie den folgenden Befehl aus, um zu prüfen, ob die Verbindung funktioniert.

    Sie können die Verbindung auf diese Weise testen, nachdem Sie sie mit Snowflake CLI hinzugefügt haben.

    snow connection list
    snow connection test --connection spark-connect
    
    Copy

Hinzufügen einer Verbindung durch manuelles Schreiben einer Verbindungsdatei

Sie können eine connections.toml-Datei manuell schreiben oder aktualisieren, damit Ihr Code eine Verbindung zu Snowpark Connect for Spark auf Snowflake herstellen kann.

  1. Führen Sie den folgenden Befehl aus, damit Ihre connections.toml-Datei nur dem Eigentümer (Benutzer) Lese- und Schreibzugriff gewährt.

    chmod 0600 "~/.snowflake/connections.toml"
    
    Copy
  2. Bearbeiten Sie die connections.toml-Datei so, dass sie eine [spark-connect]-Verbindung mit den Verbindungseigenschaften im folgenden Beispiel enthält.

    Ersetzen Sie die Werte durch Ihre eigenen Verbindungsspezifikationen.

    [spark-connect]
    host="my_snowflake_account.snowflakecomputing.com"
    account="my_snowflake_account"
    user="my_user"
    password="&&&&&&&&"
    warehouse="my_wh"
    database="my_db"
    schema="public"
    
    Copy

Snowpark Connect for Spark installieren

Sie können Snowpark Connect for Spark als Python-Paket installieren.

  1. Erstellen Sie eine virtuelle Umgebung für Python.

    Überprüfen Sie mit python3 --version Ihre Python-Version. Sie muss zwischen 3.10 und 3.13 liegen.

    python3 -m venv .venv
    source .venv/bin/activate
    
    Copy
  2. Installieren Sie das Snowpark Connect for Spark-Paket.

    pip install --upgrade --force-reinstall 'snowpark-connect[jdk]'
    
    Copy
  3. Fügen Sie Python-Code hinzu, um einen|spconnect|-Server zu starten, und erstellen Sie eine Snowpark Connect for Spark-Sitzung.

    from snowflake import snowflake.snowpark_connect
    
    # Import snowpark_connect *before* importing pyspark libraries
    from pyspark.sql.types import Row
    
    spark = snowflake.snowpark_connect.server.init_spark_session()
    
    Copy

Ausführen von Python-Code auf Ihrem Client

Sobald Sie eine authentifizierte Verbindung eingerichtet haben, können Sie wie gewohnt Code verfassen.

Sie können PySpark-Code ausführen, der eine Verbindung mit Snowpark Connect for Spark durch Verwendung der PySpark-Clientbibliothek herstellt.

from pyspark.sql import Row

  df = spark.createDataFrame([
      Row(a=1, b=2.),
      Row(a=2, b=3.),
      Row(a=4, b=5.),])

  print(df.count())
Copy

Ausführen von Scala-Code von Ihrem Client aus

Sie können Scala-Anwendungen ausführen, die mithilfe der Spark Connect-Clientbibliothek eine Verbindung zu Snowpark Connect for Spark herstellt.

Diese Anleitung führt Sie durch das Einrichten von Snowpark Connect und das Herstellen einer Verbindung Ihrer Scala-Anwendungen mit dem Snowpark Connect for Spark-Server.

Schritt 1: Richten Sie Ihre Snowpark Connect for Spark-Umgebung ein

Richten Sie Ihre Umgebung ein, indem Sie die unter folgenden Themen beschriebenen Schritte ausführen:

  1. Erstellen Sie eine virtuelle Python-Umgebung, und installieren Sie Snowpark Connect.

  2. Stellen Sie eine Verbindung her.

Schritt 2: Erstellen Sie ein Snowpark Connect for Spark-Serverskript, und starten Sie den Server

  1. Erstellen Sie ein Python-Skript zum Starten des Snowpark Connect for Spark-Servers.

    # launch-snowpark-connect.py
    
    from snowflake import snowpark_connect
    
    def main():
        snowpark_connect.start_session(is_daemon=False, remote_url="sc://localhost:15002")
        print("SAS started on port 15002")
    
    if __name__ == "__main__":
        main()
    
    Copy
  2. Starten Sie den Snowpark Connect for Spark-Server.

    # Make sure you're in the correct Python environment
    pyenv activate your-snowpark-connect-env
    
    # Run the server script
    python launch-snowpark-connect.py
    
    Copy

Schritt 3: Richten Sie Ihre Scala-Anwendung ein

  1. Fügen Sie die Spark Connect-Clientabhängigkeit Ihrer „build.sbt“-Datei hinzu.

    libraryDependencies += "org.apache.spark" %% "spark-connect-client-jvm" % "3.5.6"
    
    // Add JVM options for Java 9+ module system compatibility
    javaOptions ++= Seq(
      "--add-opens=java.base/java.nio=ALL-UNNAMED"
    )
    
    Copy
  2. Führen Sie Scala-Code aus, um eine Verbindung mit dem Snowpark Connect for Spark-Server herzustellen.

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.connect.client.REPLClassDirMonitor
    
    object SnowparkConnectExample {
      def main(args: Array[String]): Unit = {
        // Create Spark session with Snowpark Connect
        val spark = SparkSession.builder().remote("sc://localhost:15002").getOrCreate()
    
        // Register ClassFinder for UDF support (if needed)
        // val classFinder = new REPLClassDirMonitor("target/scala-2.12/classes")
        // spark.registerClassFinder(classFinder)
    
        try {
          // Simple DataFrame operations
          import spark.implicits._
    
          val data = Seq(
            (1, "Alice", 25),
            (2, "Bob", 30),
            (3, "Charlie", 35)
          )
    
          val df = spark.createDataFrame(data).toDF("id", "name", "age")
    
          println("Original DataFrame:")
          df.show()
    
          println("Filtered DataFrame (age > 28):")
          df.filter($"age" > 28).show()
    
          println("Aggregated result:")
          df.groupBy().avg("age").show()
    
        } finally {
          spark.stop()
        }
      }
    }
    
    Copy
  3. Kompilieren Sie Ihre Anwendung, und führen Sie sie aus.

    # Compile your Scala application
    sbt compile
    
    # Run the application
    sbt "runMain SnowparkConnectExample"
    
    Copy

Scala-UDF-Unterstützung auf Snowpark Connect for Spark

Wenn Sie benutzerdefinierte Funktionen oder kundenspezifischen Code verwenden, führen Sie eine der folgenden Aktionen aus:

  • Registrieren Sie einen Klassensucher zum Überwachen und Hochladen von Klassendateien.

    import org.apache.spark.sql.connect.client.REPLClassDirMonitor
    
    val classFinder = new REPLClassDirMonitor("/absolute/path/to/target/scala-2.12/classes")
    spark.registerClassFinder(classFinder)
    
    Copy
  • Laden Sie JAR-Abhängigkeiten hoch, falls erforderlich. Sie können das Workload-JAR selbst einschließen, wenn kein Klassensucher verwendet wird.

    spark.addArtifact("/absolute/path/to/dependency.jar")
    
    Copy
  • Verwenden Sie ein Staging-JAR.

    spark.conf.set("snowpark.connect.udf.java.imports", "[@mystage/dependency.jar, @db.schema.stage/other_dependency.jar]")
    
    Copy

Verwenden von Scala 2.13

Standardmäßig verwendet Snowpark Connect for Spark Scala 2.12. Workloads, die mit Scala 2.13 erstellt werden, müssen die Scala-Version mit der Konfigurationsoption „snowpark.connect.scala.version“ angeben.

// Directly in the session builder
val spark = SparkSession.builder()
  .remote("sc://localhost:15002")
  .config("snowpark.connect.scala.version", "2.13")
  .getOrCreate()

// Or via session configuration
spark.conf.set("snowpark.connect.scala.version", "2.13")
Copy

Problembehandlung bei einer Snowpark Connect for Spark-Installation

Mit der folgenden Liste von Prüfungen können Sie Fehler in der Installation von Snowpark Connect for Spark beheben und das Tool nutzen.

  • Stellen Sie sicher, dass Java und Python auf der gleichen Architektur basieren.

  • Verwenden Sie die aktuelle Snowpark Connect for Spark-Paketdatei, wie unter Snowpark Connect for Spark installieren beschrieben.

  • Prüfen Sie, ob der python-Befehl mit PySpark-Code korrekt für die lokale Ausführung funktioniert, d. h. ohne Snowflake-Konnektivität.

    Führen Sie beispielsweise einen Befehl wie den folgenden aus:

    python your_pyspark_file.py
    
    Copy

Open Source-Clients

Sie können Spark-Client-Pakete mit standardmäßiger Open-Source-Software (OSS) – wie PySpark und Spark-Clients für Java oder Scala – aus Ihren bevorzugten lokalen Umgebungen verwenden, einschließlich Jupyter Notebooks und VS-Code. Auf diese Weise können Sie die Installation von Snowflake spezifischen Paketen vermeiden.

Dies kann nützlich sein, wenn Sie Spark-Code lokal schreiben möchten und der Code Snowflake-Computeressourcen und Enterprise Governance nutzen soll. In diesem Szenario führen Sie die Authentifizierung und Autorisierung über programmgesteuerte Zugriffstoken (PATs) durch.

Die folgenden Abschnitte behandeln Installation, Konfiguration und Authentifizierung. Sie finden auch ein einfaches PySpark-Beispiel zur Validierung Ihrer Verbindung.

Schritt 1: Installieren der erforderlichen Pakete

  • Installieren Sie pyspark. Sie müssen keine Snowflake-Pakete installieren.

    pip install "pyspark[connect]>=3.5.0,<4"
    
    Copy

Schritt 2: Einrichtung und Authentifizierung

  1. Generieren Sie ein programmgesteuertes Zugriffstoken (PAT).

    Weitere Informationen dazu finden Sie unter folgenden Themen:

    Im folgenden Beispiel wird ein PAT mit dem Namen TEST_PAT für den Benutzer sysadmin hinzugefügt und eine Laufdauer von 30 Tagen festgelegt.

    ALTER USER add PAT TEST_PAT ROLE_RESTRICTION = sysadmin DAYS_TO_EXPIRY = 30;
    
    Copy
  2. Suchen Sie Ihre Snowflake Spark Connect-Host-URL.

    Führen Sie den folgenden SQL-Befehl in Snowflake aus, um den Hostnamen für Ihr Konto zu ermitteln:

    SELECT t.VALUE:type::VARCHAR as type,
           t.VALUE:host::VARCHAR as host,
           t.VALUE:port as port
      FROM TABLE(FLATTEN(input => PARSE_JSON(SYSTEM$ALLOWLIST()))) AS t where type = 'SNOWPARK_CONNECT';
    
    Copy

Schritt 3: Herstellen einer Verbindung zum Spark Connect-Server

  • Um eine Verbindung zum Spark Connect-Server herzustellen, verwenden Sie zum Beispiel folgenden Code:

    from pyspark.sql import SparkSession
    import urllib.parse
    
    # Replace with your actual PAT.
    pat = urllib.parse.quote("<pat>", safe="")
    
    # Replace with your Snowpark Connect host from the above SQL query.
    snowpark_connect_host = ""
    
    # Define database/schema/warehouse for executing your Spark session in Snowflake (recommended); otherwise, it will be resolved from your default_namespace and default_warehouse
    
    db_name = urllib.parse.quote("TESTDB", safe="")
    schema_name = urllib.parse.quote("TESTSCHEMA", safe="")
    warehouse_name = urllib.parse.quote("TESTWH", safe="")
    
    spark = SparkSession.builder.remote(f"sc://{snowpark_connect_host}/;token={pat};token_type=PAT;database={db_name};schema={schema_name};warehouse={warehouse_name}").getOrCreate()
    
    # Spark session is ready to use. You can write regular Spark DataFrame code, as in the following example:
    
    from pyspark.sql import Row
    
    df = spark.createDataFrame([
        Row(a=1, b=2.),
        Row(a=2, b=3.),
        Row(a=4, b=5.),])
    print(df.count())
    
    Copy