VS コード、Jupyter Notebooks、またはターミナルからSparkワークロードを実行する¶
Jupyterノートブック、 VS コード、またはPythonベースのインターフェースからSparkワークロードをインタラクティブに実行できます。Sparkクラスターを管理する必要はありません。ワークロードはSnowflakeインフラストラクチャ上で実行されます。
たとえば、次のタスクを実行できます。
前提条件があることを確認します。
Snowflake上の Snowpark Connect for Spark と接続するための環境を設定します。
Snowpark Connect for Spark をインストールします。
Snowflakeで実行するクライアントからの PySpark コードを実行します。
前提条件¶
PythonとJavaのインストールが同じコンピューターアーキテクチャに基づいていることを確認します。たとえば、 Pythonが arm64 に基づいている場合、Java も arm64 である必要があります(例:x86_64ではない)。
環境の設定¶
コードがSnowflake上の Snowpark Connect for Spark に接続できるようにすることで、開発環境をセットアップできます。Snowflakeクライアントコードに接続するには、接続の詳細を含む .toml ファイルを使用します。
Snowflake CLI がインストールされている場合、それを使用して接続を定義できます。それ以外の場合は、 config.toml ファイルに接続パラメーターを手動で書き込むことができます。
Snowflake CLI を使用して接続を追加します。¶
Snowpark Connect for Spark がSnowflakeに接続するために使える接続プロパティを追加するために、 Snowflake CLI を使用できます。変更は config.toml ファイルに保存されます。
snow connection コマンドを使用して接続を追加するには、次のコマンドを実行します:
追加コマンドsnow connection add
プロンプトに従って接続を定義します。
必ず接続名に
spark-connectを指定してください。このコマンドは次の例のように、
config.tomlファイルに接続を追加します。[connections.spark-connect] host = "example.snowflakecomputing.com" port = 443 account = "example" user = "test_example" password = "password" protocol = "https" warehouse = "example_wh" database = "example_db" schema = "public"
以下のコマンドを実行して、接続が機能することを確認します。
Snowflake CLI を使用して接続を追加した場合は、この方法でテストできます。
snow connection list snow connection test --connection spark-connect
接続ファイルを手動で書き込んで接続を追加する¶
Snowflake上でコードが Snowpark Connect for Spark に接続できるように、 connections.toml ファイルを手動で記述または更新できます。
次のコマンドを実行して、
connections.tomlファイルでは、所有者(ユーザー)のみが読み取りおよび書き込みアクセスできることを確認します。chmod 0600 "~/.snowflake/connections.toml"
次の例の接続プロパティを使用した
[spark-connect]接続が含まれるように、connections.tomlファイルを編集します。値を独自の接続仕様に置き換えてください。
[spark-connect] host="my_snowflake_account.snowflakecomputing.com" account="my_snowflake_account" user="my_user" password="&&&&&&&&" warehouse="my_wh" database="my_db" schema="public"
Snowpark Connect for Spark をインストールする¶
Pythonパッケージとして Snowpark Connect for Spark をインストールできます。
Pythonの仮想環境を作成します。
python3 --versionを実行して、Pythonバージョンが3.10以降で3.13以前であることを確認します。python3 -m venv .venv source .venv/bin/activate
Snowpark Connect for Spark パッケージをインストールします。
pip install --upgrade --force-reinstall 'snowpark-connect[jdk]'
Snowpark Connect for Spark サーバーを開始して Snowpark Connect for Spark セッションを作成するには、Pythonコードを追加します。
from snowflake import snowflake.snowpark_connect # Import snowpark_connect *before* importing pyspark libraries from pyspark.sql.types import Row spark = snowflake.snowpark_connect.server.init_spark_session()
クライアントからPythonコードを実行する¶
認証済みの接続が確立されると、通常と同様にコードを書き込みできます。
PySpark クライアントライブラリを使用して、Snowpark Connect for Spark に接続する PySpark コードを実行できます。
from pyspark.sql import Row
df = spark.createDataFrame([
Row(a=1, b=2.),
Row(a=2, b=3.),
Row(a=4, b=5.),])
print(df.count())
クライアントからScalaコードを実行する¶
Spark Connectクライアントライブラリを使用して、 |spconnect|に接続するScalaアプリケーションを実行できます。
このガイドでは、Snowpark Connectを設定し、Scalaアプリケーションを Snowpark Connect for Spark サーバーに接続する手順について説明します。
ステップ1: Snowpark Connect for Spark 環境を設定する¶
次のトピックで説明されているステップを使用して、環境を設定します。
ステップ2: Snowpark Connect for Spark サーバースクリプトを作成してサーバーを起動する¶
Pythonスクリプトを作成して Snowpark Connect for Spark サーバーを起動します。
# launch-snowpark-connect.py from snowflake import snowpark_connect def main(): snowpark_connect.start_session(is_daemon=False, remote_url="sc://localhost:15002") print("SAS started on port 15002") if __name__ == "__main__": main()
Snowpark Connect for Spark サーバーを起動します。
# Make sure you're in the correct Python environment pyenv activate your-snowpark-connect-env # Run the server script python launch-snowpark-connect.py
ステップ3:ステップ3: Scalaアプリケーションを設定する¶
Spark Connectクライアントの依存関係をbuild.sbtファイルに追加します。
libraryDependencies += "org.apache.spark" %% "spark-connect-client-jvm" % "3.5.6" // Add JVM options for Java 9+ module system compatibility javaOptions ++= Seq( "--add-opens=java.base/java.nio=ALL-UNNAMED" )
Scalaコードを実行して Snowpark Connect for Spark サーバーに接続します。
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connect.client.REPLClassDirMonitor object SnowparkConnectExample { def main(args: Array[String]): Unit = { // Create Spark session with Snowpark Connect val spark = SparkSession.builder().remote("sc://localhost:15002").getOrCreate() // Register ClassFinder for UDF support (if needed) // val classFinder = new REPLClassDirMonitor("target/scala-2.12/classes") // spark.registerClassFinder(classFinder) try { // Simple DataFrame operations import spark.implicits._ val data = Seq( (1, "Alice", 25), (2, "Bob", 30), (3, "Charlie", 35) ) val df = spark.createDataFrame(data).toDF("id", "name", "age") println("Original DataFrame:") df.show() println("Filtered DataFrame (age > 28):") df.filter($"age" > 28).show() println("Aggregated result:") df.groupBy().avg("age").show() } finally { spark.stop() } } }
アプリケーションをコンパイルして実行します。
# Compile your Scala application sbt compile # Run the application sbt "runMain SnowparkConnectExample"
Snowpark Connect for Spark でのScala UDF サポート¶
ユーザー定義関数またはカスタムコードを使用する場合は、以下のいずれかを実行します。
クラスファイルを監視およびアップロードするためのクラス検索者を登録します。
import org.apache.spark.sql.connect.client.REPLClassDirMonitor val classFinder = new REPLClassDirMonitor("/absolute/path/to/target/scala-2.12/classes") spark.registerClassFinder(classFinder)
必要に応じて、JAR 依存関係をアップロードします。クラスファインダーが使用されていない場合は、ワークロード JAR 自体を含めることができます。
spark.addArtifact("/absolute/path/to/dependency.jar")
ステージングされた JAR を使用します。
spark.conf.set("snowpark.connect.udf.java.imports", "[@mystage/dependency.jar, @db.schema.stage/other_dependency.jar]")
Scala 2.13の使用¶
デフォルトでは Snowpark Connect for Spark はScala 2.12を使用します。Scala 2.13で構築されたワークロードは、「snowpark.connect.scala.version」設定オプションを使用してScalaバージョンを指定する必要があります。
// Directly in the session builder
val spark = SparkSession.builder()
.remote("sc://localhost:15002")
.config("snowpark.connect.scala.version", "2.13")
.getOrCreate()
// Or via session configuration
spark.conf.set("snowpark.connect.scala.version", "2.13")
トラブルシューティング Snowpark Connect for Spark インストール¶
次のチェックリストを使用して、Snowpark Connect for Spark のインストールと使用のトラブルシューティングを行うことができます。
JavaおよびPythonが :ref:` 同じアーキテクチャに基づく <label-snowpark_connect_jupyter_prereq>` ことを確認します。
Snowpark Connect for Spark をインストールする で説明されているように、最新の Snowpark Connect for Spark パッケージファイルを作成します。
PySpark コードを使用した
Pythonコマンドが、ローカル実行に対して正しく動作すること、つまり、Snowflakeへの接続なしで実行されることを確認します。たとえば、次のようなコマンドを実行します。
python your_pyspark_file.py
ソースクライアントを開く¶
標準のすぐに使えるオープンソースソフトウェア(OSS)Sparkクライアントパッケージ(PySpark、JavaまたはScala用のSparkクライアントなど)を、Jupyterノートブックや VS コードなど好みのローカル環境から使用できます。このようにして、Snowflakeに固有のパッケージのインストールを回避できます。
これは、Sparkコードをローカルで記述し、そのコードでSnowflakeのコンピューティングリソースとエンタープライズガバナンスを使用する場合に役立つ可能性があります。このシナリオでは、プログラムによるアクセストークン(PATs)を使用して認証と承認を実行します。
以下のセクションでは、インストール、構成、認証について説明します。接続を検証するための簡単な PySpark 例も見つかります。
ステップ1:必要なパッケージをインストールする¶
pysparkをインストールします。Snowflakeパッケージをインストールする必要はありません。pip install "pyspark[connect]>=3.5.0,<4"
ステップ2: 設定と認証¶
プログラムによるアクセストークン(PAT)を生成します。
詳細については、次のトピックをご参照ください。
次の例では、ユーザー
sysadminに対してTEST_PATという名前の PAT を追加し、有効期限を30日に設定します。ALTER USER add PAT TEST_PAT ROLE_RESTRICTION = sysadmin DAYS_TO_EXPIRY = 30;
SnowflakeのSpark Connectホスト URL を見つけます。
Snowflakeで次の SQL を実行し、アカウントのホスト名を検索します。
SELECT t.VALUE:type::VARCHAR as type, t.VALUE:host::VARCHAR as host, t.VALUE:port as port FROM TABLE(FLATTEN(input => PARSE_JSON(SYSTEM$ALLOWLIST()))) AS t where type = 'SNOWPARK_CONNECT';
ステップ3:ステップ3: Spark Connectサーバーに接続する¶
Spark Connectサーバーに接続するには、次のようなコードを使用します。
from pyspark.sql import SparkSession import urllib.parse # Replace with your actual PAT. pat = urllib.parse.quote("<pat>", safe="") # Replace with your Snowpark Connect host from the above SQL query. snowpark_connect_host = "" # Define database/schema/warehouse for executing your Spark session in Snowflake (recommended); otherwise, it will be resolved from your default_namespace and default_warehouse db_name = urllib.parse.quote("TESTDB", safe="") schema_name = urllib.parse.quote("TESTSCHEMA", safe="") warehouse_name = urllib.parse.quote("TESTWH", safe="") spark = SparkSession.builder.remote(f"sc://{snowpark_connect_host}/;token={pat};token_type=PAT;database={db_name};schema={schema_name};warehouse={warehouse_name}").getOrCreate() # Spark session is ready to use. You can write regular Spark DataFrame code, as in the following example: from pyspark.sql import Row df = spark.createDataFrame([ Row(a=1, b=2.), Row(a=2, b=3.), Row(a=4, b=5.),]) print(df.count())