VS コード、Jupyter Notebooks、またはターミナルからSparkワークロードを実行する

Jupyterノートブック、 VS コード、またはPythonベースのインターフェースからSparkワークロードをインタラクティブに実行できます。Sparkクラスターを管理する必要はありません。ワークロードはSnowflakeインフラストラクチャ上で実行されます。

たとえば、次のタスクを実行できます。

  1. 前提条件があることを確認します。

  2. Snowflake上の Snowpark Connect for Spark と接続するための環境を設定します。

  3. Snowpark Connect for Spark をインストールします。

  4. Snowflakeで実行するクライアントからの PySpark コードを実行します。

前提条件

PythonとJavaのインストールが同じコンピューターアーキテクチャに基づいていることを確認します。たとえば、 Pythonが arm64 に基づいている場合、Java も arm64 である必要があります(例:x86_64ではない)。

環境の設定

コードがSnowflake上の Snowpark Connect for Spark に接続できるようにすることで、開発環境をセットアップできます。Snowflakeクライアントコードに接続するには、接続の詳細を含む .toml ファイルを使用します。

Snowflake CLI がインストールされている場合、それを使用して接続を定義できます。それ以外の場合は、 config.toml ファイルに接続パラメーターを手動で書き込むことができます。

Snowflake CLI を使用して接続を追加します。

Snowpark Connect for Spark がSnowflakeに接続するために使える接続プロパティを追加するために、 Snowflake CLI を使用できます。変更は config.toml ファイルに保存されます。

  1. snow connection コマンドを使用して接続を追加するには、次のコマンドを実行します:追加 コマンド

    snow connection add
    
    Copy
  2. プロンプトに従って接続を定義します。

    必ず接続名に spark-connect を指定してください。

    このコマンドは次の例のように、 config.toml ファイルに接続を追加します。

    [connections.spark-connect]
    host = "example.snowflakecomputing.com"
    port = 443
    account = "example"
    user = "test_example"
    password = "password"
    protocol = "https"
    warehouse = "example_wh"
    database = "example_db"
    schema = "public"
    
    Copy
  3. 以下のコマンドを実行して、接続が機能することを確認します。

    Snowflake CLI を使用して接続を追加した場合は、この方法でテストできます。

    snow connection list
    snow connection test --connection spark-connect
    
    Copy

接続ファイルを手動で書き込んで接続を追加する

Snowflake上でコードが Snowpark Connect for Spark に接続できるように、 connections.toml ファイルを手動で記述または更新できます。

  1. 次のコマンドを実行して、 connections.toml ファイルでは、所有者(ユーザー)のみが読み取りおよび書き込みアクセスできることを確認します。

    chmod 0600 "~/.snowflake/connections.toml"
    
    Copy
  2. 次の例の接続プロパティを使用した [spark-connect] 接続が含まれるように、connections.toml ファイルを編集します。

    値を独自の接続仕様に置き換えてください。

    [spark-connect]
    host="my_snowflake_account.snowflakecomputing.com"
    account="my_snowflake_account"
    user="my_user"
    password="&&&&&&&&"
    warehouse="my_wh"
    database="my_db"
    schema="public"
    
    Copy

Snowpark Connect for Spark をインストールする

Pythonパッケージとして Snowpark Connect for Spark をインストールできます。

  1. Pythonの仮想環境を作成します。

    python3 --version を実行して、Pythonバージョンが3.10以降で3.13以前であることを確認します。

    python3 -m venv .venv
    source .venv/bin/activate
    
    Copy
  2. Snowpark Connect for Spark パッケージをインストールします。

    pip install --upgrade --force-reinstall 'snowpark-connect[jdk]'
    
    Copy
  3. Snowpark Connect for Spark サーバーを開始して Snowpark Connect for Spark セッションを作成するには、Pythonコードを追加します。

    from snowflake import snowflake.snowpark_connect
    
    # Import snowpark_connect *before* importing pyspark libraries
    from pyspark.sql.types import Row
    
    spark = snowflake.snowpark_connect.server.init_spark_session()
    
    Copy

クライアントからPythonコードを実行する

認証済みの接続が確立されると、通常と同様にコードを書き込みできます。

PySpark クライアントライブラリを使用して、Snowpark Connect for Spark に接続する PySpark コードを実行できます。

from pyspark.sql import Row

  df = spark.createDataFrame([
      Row(a=1, b=2.),
      Row(a=2, b=3.),
      Row(a=4, b=5.),])

  print(df.count())
Copy

クライアントからScalaコードを実行する

Spark Connectクライアントライブラリを使用して、 |spconnect|に接続するScalaアプリケーションを実行できます。

このガイドでは、Snowpark Connectを設定し、Scalaアプリケーションを Snowpark Connect for Spark サーバーに接続する手順について説明します。

ステップ1: Snowpark Connect for Spark 環境を設定する

次のトピックで説明されているステップを使用して、環境を設定します。

  1. Python仮想環境を作成し、Snowpark Connectをインストールする

  2. 接続を設定する

ステップ2: Snowpark Connect for Spark サーバースクリプトを作成してサーバーを起動する

  1. Pythonスクリプトを作成して Snowpark Connect for Spark サーバーを起動します。

    # launch-snowpark-connect.py
    
    from snowflake import snowpark_connect
    
    def main():
        snowpark_connect.start_session(is_daemon=False, remote_url="sc://localhost:15002")
        print("SAS started on port 15002")
    
    if __name__ == "__main__":
        main()
    
    Copy
  2. Snowpark Connect for Spark サーバーを起動します。

    # Make sure you're in the correct Python environment
    pyenv activate your-snowpark-connect-env
    
    # Run the server script
    python launch-snowpark-connect.py
    
    Copy

ステップ3:ステップ3: Scalaアプリケーションを設定する

  1. Spark Connectクライアントの依存関係をbuild.sbtファイルに追加します。

    libraryDependencies += "org.apache.spark" %% "spark-connect-client-jvm" % "3.5.6"
    
    // Add JVM options for Java 9+ module system compatibility
    javaOptions ++= Seq(
      "--add-opens=java.base/java.nio=ALL-UNNAMED"
    )
    
    Copy
  2. Scalaコードを実行して Snowpark Connect for Spark サーバーに接続します。

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.connect.client.REPLClassDirMonitor
    
    object SnowparkConnectExample {
      def main(args: Array[String]): Unit = {
        // Create Spark session with Snowpark Connect
        val spark = SparkSession.builder().remote("sc://localhost:15002").getOrCreate()
    
        // Register ClassFinder for UDF support (if needed)
        // val classFinder = new REPLClassDirMonitor("target/scala-2.12/classes")
        // spark.registerClassFinder(classFinder)
    
        try {
          // Simple DataFrame operations
          import spark.implicits._
    
          val data = Seq(
            (1, "Alice", 25),
            (2, "Bob", 30),
            (3, "Charlie", 35)
          )
    
          val df = spark.createDataFrame(data).toDF("id", "name", "age")
    
          println("Original DataFrame:")
          df.show()
    
          println("Filtered DataFrame (age > 28):")
          df.filter($"age" > 28).show()
    
          println("Aggregated result:")
          df.groupBy().avg("age").show()
    
        } finally {
          spark.stop()
        }
      }
    }
    
    Copy
  3. アプリケーションをコンパイルして実行します。

    # Compile your Scala application
    sbt compile
    
    # Run the application
    sbt "runMain SnowparkConnectExample"
    
    Copy

Snowpark Connect for Spark でのScala UDF サポート

ユーザー定義関数またはカスタムコードを使用する場合は、以下のいずれかを実行します。

  • クラスファイルを監視およびアップロードするためのクラス検索者を登録します。

    import org.apache.spark.sql.connect.client.REPLClassDirMonitor
    
    val classFinder = new REPLClassDirMonitor("/absolute/path/to/target/scala-2.12/classes")
    spark.registerClassFinder(classFinder)
    
    Copy
  • 必要に応じて、JAR 依存関係をアップロードします。クラスファインダーが使用されていない場合は、ワークロード JAR 自体を含めることができます。

    spark.addArtifact("/absolute/path/to/dependency.jar")
    
    Copy
  • ステージングされた JAR を使用します。

    spark.conf.set("snowpark.connect.udf.java.imports", "[@mystage/dependency.jar, @db.schema.stage/other_dependency.jar]")
    
    Copy

Scala 2.13の使用

デフォルトでは Snowpark Connect for Spark はScala 2.12を使用します。Scala 2.13で構築されたワークロードは、「snowpark.connect.scala.version」設定オプションを使用してScalaバージョンを指定する必要があります。

// Directly in the session builder
val spark = SparkSession.builder()
  .remote("sc://localhost:15002")
  .config("snowpark.connect.scala.version", "2.13")
  .getOrCreate()

// Or via session configuration
spark.conf.set("snowpark.connect.scala.version", "2.13")
Copy

トラブルシューティング Snowpark Connect for Spark インストール

次のチェックリストを使用して、Snowpark Connect for Spark のインストールと使用のトラブルシューティングを行うことができます。

  • JavaおよびPythonが :ref:` 同じアーキテクチャに基づく <label-snowpark_connect_jupyter_prereq>` ことを確認します。

  • Snowpark Connect for Spark をインストールする で説明されているように、最新の Snowpark Connect for Spark パッケージファイルを作成します。

  • PySpark コードを使用した Python コマンドが、ローカル実行に対して正しく動作すること、つまり、Snowflakeへの接続なしで実行されることを確認します。

    たとえば、次のようなコマンドを実行します。

    python your_pyspark_file.py
    
    Copy

ソースクライアントを開く

標準のすぐに使えるオープンソースソフトウェア(OSS)Sparkクライアントパッケージ(PySpark、JavaまたはScala用のSparkクライアントなど)を、Jupyterノートブックや VS コードなど好みのローカル環境から使用できます。このようにして、Snowflakeに固有のパッケージのインストールを回避できます。

これは、Sparkコードをローカルで記述し、そのコードでSnowflakeのコンピューティングリソースとエンタープライズガバナンスを使用する場合に役立つ可能性があります。このシナリオでは、プログラムによるアクセストークン(PATs)を使用して認証と承認を実行します。

以下のセクションでは、インストール、構成、認証について説明します。接続を検証するための簡単な PySpark 例も見つかります。

ステップ1:必要なパッケージをインストールする

  • pyspark をインストールします。Snowflakeパッケージをインストールする必要はありません。

    pip install "pyspark[connect]>=3.5.0,<4"
    
    Copy

ステップ2: 設定と認証

  1. プログラムによるアクセストークン(PAT)を生成します。

    詳細については、次のトピックをご参照ください。

    次の例では、ユーザー sysadmin に対して TEST_PAT という名前の PAT を追加し、有効期限を30日に設定します。

    ALTER USER add PAT TEST_PAT ROLE_RESTRICTION = sysadmin DAYS_TO_EXPIRY = 30;
    
    Copy
  2. SnowflakeのSpark Connectホスト URL を見つけます。

    Snowflakeで次の SQL を実行し、アカウントのホスト名を検索します。

    SELECT t.VALUE:type::VARCHAR as type,
           t.VALUE:host::VARCHAR as host,
           t.VALUE:port as port
      FROM TABLE(FLATTEN(input => PARSE_JSON(SYSTEM$ALLOWLIST()))) AS t where type = 'SNOWPARK_CONNECT';
    
    Copy

ステップ3:ステップ3: Spark Connectサーバーに接続する

  • Spark Connectサーバーに接続するには、次のようなコードを使用します。

    from pyspark.sql import SparkSession
    import urllib.parse
    
    # Replace with your actual PAT.
    pat = urllib.parse.quote("<pat>", safe="")
    
    # Replace with your Snowpark Connect host from the above SQL query.
    snowpark_connect_host = ""
    
    # Define database/schema/warehouse for executing your Spark session in Snowflake (recommended); otherwise, it will be resolved from your default_namespace and default_warehouse
    
    db_name = urllib.parse.quote("TESTDB", safe="")
    schema_name = urllib.parse.quote("TESTSCHEMA", safe="")
    warehouse_name = urllib.parse.quote("TESTWH", safe="")
    
    spark = SparkSession.builder.remote(f"sc://{snowpark_connect_host}/;token={pat};token_type=PAT;database={db_name};schema={schema_name};warehouse={warehouse_name}").getOrCreate()
    
    # Spark session is ready to use. You can write regular Spark DataFrame code, as in the following example:
    
    from pyspark.sql import Row
    
    df = spark.createDataFrame([
        Row(a=1, b=2.),
        Row(a=2, b=3.),
        Row(a=4, b=5.),])
    print(df.count())
    
    Copy