サードパーティクライアントからSparkワークロードを実行

Jupyterノートブック、 VS コード、またはPythonベースのインターフェースからSparkワークロードをインタラクティブに実行できます。Sparkクラスターを管理する必要はありません。ワークロードはSnowflakeインフラストラクチャ上で実行されます。

たとえば、次のタスクを実行できます。

  1. 前提条件があることを確認します。

  2. Snowflake上の Snowpark Connect for Spark と接続するための環境を設定します。

  3. Snowpark Connect for Spark をインストールします。

  4. Snowflakeで実行するクライアントからの PySpark コードを実行します。

前提条件

PythonとJavaのインストールが同じコンピューターアーキテクチャに基づいていることを確認します。たとえば、 Pythonが arm64 に基づいている場合、Java も arm64 である必要があります(例:x86_64ではない)。

環境の設定

コードがSnowflake上の Snowpark Connect for Spark に接続できるようにすることで、開発環境をセットアップできます。Snowflakeクライアントコードに接続するには、接続の詳細を含む .toml ファイルを使用します。

Snowflake CLI がインストールされている場合、それを使用して接続を定義できます。それ以外の場合は、 config.toml ファイルに接続パラメーターを手動で書き込むことができます。

Snowflake CLI を使用して接続を追加します。

Snowpark Connect for Spark がSnowflakeに接続するために使える接続プロパティを追加するために、 Snowflake CLI を使用できます。変更は config.toml ファイルに保存されます。

  1. snow connection コマンドを使用して接続を追加するには、次のコマンドを実行します:追加 コマンド

    snow connection add
    
    Copy
  2. プロンプトに従って接続を定義します。

    必ず接続名に spark-connect を指定してください。

    このコマンドは次の例のように、 config.toml ファイルに接続を追加します。

    [connections.spark-connect]
    host = "example.snowflakecomputing.com"
    port = 443
    account = "example"
    user = "test_example"
    password = "password"
    protocol = "https"
    warehouse = "example_wh"
    database = "example_db"
    schema = "public"
    
    Copy
  3. 以下のコマンドを実行して、接続が機能することを確認します。

    Snowflake CLI を使用して接続を追加した場合は、この方法でテストできます。

    snow connection list
    snow connection test --connection spark-connect
    
    Copy

接続ファイルを手動で書き込んで接続を追加する

Snowflake上でコードが Snowpark Connect for Spark に接続できるように、 connections.toml ファイルを手動で記述または更新できます。

  1. 次のコマンドを実行して、 connections.toml ファイルでは、所有者(ユーザー)のみが読み取りおよび書き込みアクセスできることを確認します。

    chmod 0600 "~/.snowflake/connections.toml"
    
    Copy
  2. 次の例の接続プロパティを使用した [spark-connect] 接続が含まれるように、connections.toml ファイルを編集します。

    値を独自の接続仕様に置き換えてください。

    [spark-connect]
    host="my_snowflake_account.snowflakecomputing.com"
    account="my_snowflake_account"
    user="my_user"
    password="&&&&&&&&"
    warehouse="my_wh"
    database="my_db"
    schema="public"
    
    Copy

Snowpark Connect for Spark をインストールする

Pythonパッケージとして Snowpark Connect for Spark をインストールできます。

  1. Pythonの仮想環境を作成します。

    たとえば、次の例のようにCondaを使用できます。

    conda create -n xxxx pip python=3.12
    conda activate xxxx
    
    Copy
  2. Snowpark Connect for Spark パッケージをインストールします。

    pip install --upgrade --force-reinstall snowpark-connect
    
    Copy
  3. Snowpark Connect for Spark サーバーを開始して Snowpark Connect for Spark セッションを作成するには、Pythonコードを追加します。

    import os
    import snowflake.snowpark
    from snowflake import snowpark_connect
    # Import snowpark_connect before importing pyspark libraries
    from pyspark.sql.types import Row
    
    os.environ["SPARK_CONNECT_MODE_ENABLED"] = "1"
    snowpark_connect.start_session()  # Start the local Snowpark Connect for Spark session
    spark = snowpark_connect.get_session()
    
    Copy

Run Python code from your client

Once you have an authenticated connection in place, you can write code as you normally would.

PySpark クライアントライブラリを使用して、Snowpark Connect for Spark に接続する PySpark コードを実行できます。

from pyspark.sql import Row

  df = spark.createDataFrame([
      Row(a=1, b=2.),
      Row(a=2, b=3.),
      Row(a=4, b=5.),])

  print(df.count())
Copy

Scalaアプリケーション用Snowparkコネクタをはじめるにあたり

このガイドでは、Snowpark Connectを設定し、ScalaアプリケーションをSnowpark Connectサーバーに接続する手順について説明します。

Step 1: Set Up Snowark Connect Environment

Python環境のセットアップとSnowpark Connectのインストールの手順については、公式のSnowpark Connectドキュメントに従ってください。

主要なステップには次が含まれます。

  1. Creating a Python virtual environment

  2. Installing Snowpark Connect for Spark

  3. 接続構成の設定

ステップ2:Snowpark Connectサーバースクリプトを作成する

Snowpark Connectサーバーを起動するPythonスクリプトを作成します。

  1. launch-snowpark-connect.py

    from snowflake import snowpark_connect
    
    def main():
        snowpark_connect.start_session(is_daemon=False, remote_url="sc://localhost:15002")
        print("SAS started on port 15002")
    
    if __name__ == "__main__":
        main()
    
    Copy
  2. Snowparkコネクタサーバーの起動

    # Make sure you're in the correct Python environment
    pyenv activate your-snowpark-connect-env
    
    # Run the server script
    python launch-snowpark-connect.py
    
    Copy

ステップ3:Scalaアプリケーションを設定する

  1. Spark Connectクライアントの依存関係をbuild.sbtファイルに追加します。

    libraryDependencies += "org.apache.spark" %% "spark-connect-client-jvm" % "3.5.3"
    
    // Add JVM options for Java 9+ module system compatibility
    javaOptions ++= Seq(
      "--add-opens=java.base/java.nio=ALL-UNNAMED"
    )
    
    Copy
  2. Scalaコードを実行して Snowpark Connect for Spark サーバー に接続する

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.connect.client.REPLClassDirMonitor
    
    object SnowparkConnectExample {
      def main(args: Array[String]): Unit = {
        // Create Spark session with Snowpark Connect
        val spark = SparkSession.builder().remote("sc://localhost:15002").getOrCreate()
    
        // Register ClassFinder for UDF support (if needed)
        // val classFinder = new REPLClassDirMonitor("target/scala-2.12/classes")
        // spark.registerClassFinder(classFinder)
    
        try {
          // Simple DataFrame operations
          import spark.implicits._
    
          val data = Seq(
            (1, "Alice", 25),
            (2, "Bob", 30),
            (3, "Charlie", 35)
          )
    
          val df = spark.createDataFrame(data).toDF("id", "name", "age")
    
          println("Original DataFrame:")
          df.show()
    
          println("Filtered DataFrame (age > 28):")
          df.filter($"age" > 28).show()
    
          println("Aggregated result:")
          df.groupBy().avg("age").show()
    
        } finally {
          spark.stop()
        }
      }
    }
    
    Copy
  3. アプリケーションをコンパイルして実行します。

    # Compile your Scala application
    sbt compile
    
    # Run the application
    sbt "runMain SnowparkConnectExample"
    
    Copy

Snowpark Connect for Spark での UDF サポート

ユーザー定義関数またはカスタムコードを使用する場合は、以下のいずれかを実行します。

  • ClassFinder を登録してクラスファイルをモニターしてアップロードします。

    import org.apache.spark.sql.connect.client.REPLClassDirMonitor
    
    val classFinder = new REPLClassDirMonitor("/absolute/path/to/target/scala-2.12/classes")
    spark.registerClassFinder(classFinder)
    
    Copy
  • 必要に応じて、JAR 依存関係をアップロードします。

    spark.addArtifact("/absolute/path/to/dependency.jar")
    
    Copy

トラブルシューティング Snowpark Connect for Spark インストール

With the following list of checks, you can troubleshoot Snowpark Connect for Spark installation and use.

  • JavaおよびPythonが :ref:` 同じアーキテクチャに基づく <label-snowpark_connect_jupyter_prereq>` ことを確認します。

  • Use the most recent Snowpark Connect for Spark package file, as described in Snowpark Connect for Spark をインストールする.

  • PySpark コードを使用した Python コマンドが、ローカル実行に対して正しく動作すること、つまり、Snowflakeへの接続なしで実行されることを確認します。

    たとえば、次のようなコマンドを実行します。

    python your_pyspark_file.py
    
    Copy