VSCode, Jupyter Notebooks 또는 터미널에서 Spark 워크로드 실행하기¶
Jupyter Notebooks, VS Code, Spark 클러스터를 관리할 필요가 없는 Python 기반 인터페이스에서 대화형으로 Spark 워크로드를 실행할 수 있습니다. 워크로드는 Snowflake 인프라에서 실행됩니다.
예를 들면, 다음 작업을 수행할 수 있습니다.
전제 조건을 갖추고 있는지 확인합니다.
Snowflake에서 Snowpark Connect for Spark 와 연결할 수 있는 환경을 설정합니다.
Snowpark Connect for Spark 를 설치합니다.
클라이언트에서 PySpark 코드를 실행하여 Snowflake에서 실행합니다.
전제 조건¶
Python 및 Java 설치가 동일한 컴퓨터 아키텍처에 기반을 두고 있는지 확인합니다. 예를 들어, Python이 arm64 기반인 경우 Java도 arm64여야 합니다(예: x86_64 아님).
환경 설정하기¶
코드로 Snowflake에서 Snowpark Connect for Spark 에 연결할 수 있도록 하여 개발 환경을 설정할 수 있습니다. Snowflake 클라이언트 코드에 연결하려면, 연결 세부 정보가 포함된 .toml 파일을 사용합니다.
Snowflake CLI 가 설치되어 있다면 이를 사용하여 연결을 정의할 수 있습니다. 그렇지 않으면 config.toml 파일에 연결 매개 변수를 수동으로 작성할 수 있습니다.
Snowflake CLI 를 사용하여 연결 추가하기¶
Snowflake CLI 를 사용하면 Snowpark Connect for Spark 에서 Snowflake에 연결할 수 있는 연결 속성을 추가할 수 있습니다. 변경 사항은 config.toml 파일에 저장됩니다.
다음 명령을 실행해 snow connection add 명령을 사용하는 연결을 추가합니다.
snow connection add
프롬프트에 따라 연결을 정의합니다.
:code:`spark-connect`를 연결 이름으로 지정해야 합니다.
다음 예제와 같이 이 명령은
config.toml파일에 연결을 추가합니다[connections.spark-connect] host = "example.snowflakecomputing.com" port = 443 account = "example" user = "test_example" password = "password" protocol = "https" warehouse = "example_wh" database = "example_db" schema = "public"
다음 명령을 실행하여 연결이 작동하는지 확인합니다.
Snowflake CLI 를 사용하여 추가한 경우 이 방법으로 연결을 테스트할 수 있습니다.
snow connection list snow connection test --connection spark-connect
연결 파일을 수동으로 작성하여 연결 추가하기¶
코드로 Snowflake에서 Snowpark Connect for Spark 에 연결할 수 있도록 connections.toml 파일을 수동으로 작성하거나 업데이트할 수 있습니다.
다음 명령을 실행하여
connections.toml파일에서 소유자(사용자)만 읽기/쓰기 액세스 권한을 보유할 수 있도록 합니다.chmod 0600 "~/.snowflake/connections.toml"
다음 예제의 연결 속성을 사용하여
[spark-connect]연결을 포함하도록connections.toml파일을 편집합니다.값을 사용자 고유의 연결 세부 정보로 바꿔야 합니다.
[spark-connect] host="my_snowflake_account.snowflakecomputing.com" account="my_snowflake_account" user="my_user" password="&&&&&&&&" warehouse="my_wh" database="my_db" schema="public"
Snowpark Connect for Spark 설치하기¶
Python 패키지로 Snowpark Connect for Spark 를 설치할 수 있습니다.
Python 가상 환경을 만듭니다.
:code:`python3 –version`을 실행하여 Python 버전이 3.10 이상 및 3.13 이전인지 확인합니다.
python3 -m venv .venv source .venv/bin/activate
Snowpark Connect for Spark 패키지를 설치합니다.
pip install --upgrade --force-reinstall 'snowpark-connect[jdk]'
Python 코드를 추가하여 Snowpark Connect for Spark 서버를 시작하고 Snowpark Connect for Spark 세션을 만듭니다.
from snowflake import snowflake.snowpark_connect # Import snowpark_connect *before* importing pyspark libraries from pyspark.sql.types import Row spark = snowflake.snowpark_connect.server.init_spark_session()
클라이언트에서 Python 코드 실행¶
인증된 연결이 설정되면 평소와 같이 코드를 작성할 수 있습니다.
PySpark 클라이언트 라이브러리를 사용하여 Snowpark Connect for Spark 에 연결되는 PySpark 코드를 실행할 수 있습니다.
from pyspark.sql import Row
df = spark.createDataFrame([
Row(a=1, b=2.),
Row(a=2, b=3.),
Row(a=4, b=5.),])
print(df.count())
클라이언트에서 Scala 코드 실행하기¶
Spark Connect 클라이언트 라이브러리를 사용하여 |spconnect|에 연결하는 Scala 애플리케이션을 실행할 수 있습니다.
이 가이드에서는 Snowpark Connect를 설정하고 Scala 애플리케이션을 Snowpark Connect for Spark 서버에 연결하는 방법을 안내합니다.
1단계: Snowpark Connect for Spark 환경 설정¶
다음 항목에 설명된 단계를 사용하여 환경을 설정합니다.
2단계: Snowpark Connect for Spark 서버 스크립트 생성 및 서버 실행¶
Python 스크립트를 생성하고 Snowpark Connect for Spark 서버를 실행합니다.
# launch-snowpark-connect.py from snowflake import snowpark_connect def main(): snowpark_connect.start_session(is_daemon=False, remote_url="sc://localhost:15002") print("SAS started on port 15002") if __name__ == "__main__": main()
Snowpark Connect for Spark 서버를 실행합니다.
# Make sure you're in the correct Python environment pyenv activate your-snowpark-connect-env # Run the server script python launch-snowpark-connect.py
3단계: Scala 애플리케이션 설정하기¶
build.sbt 파일에 Spark Connect 클라이언트 종속성을 추가합니다.
libraryDependencies += "org.apache.spark" %% "spark-connect-client-jvm" % "3.5.6" // Add JVM options for Java 9+ module system compatibility javaOptions ++= Seq( "--add-opens=java.base/java.nio=ALL-UNNAMED" )
Scala Code를 실행하여 Snowpark Connect for Spark 서버에 연결합니다.
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connect.client.REPLClassDirMonitor object SnowparkConnectExample { def main(args: Array[String]): Unit = { // Create Spark session with Snowpark Connect val spark = SparkSession.builder().remote("sc://localhost:15002").getOrCreate() // Register ClassFinder for UDF support (if needed) // val classFinder = new REPLClassDirMonitor("target/scala-2.12/classes") // spark.registerClassFinder(classFinder) try { // Simple DataFrame operations import spark.implicits._ val data = Seq( (1, "Alice", 25), (2, "Bob", 30), (3, "Charlie", 35) ) val df = spark.createDataFrame(data).toDF("id", "name", "age") println("Original DataFrame:") df.show() println("Filtered DataFrame (age > 28):") df.filter($"age" > 28).show() println("Aggregated result:") df.groupBy().avg("age").show() } finally { spark.stop() } } }
애플리케이션을 컴파일하고 실행합니다.
# Compile your Scala application sbt compile # Run the application sbt "runMain SnowparkConnectExample"
|spconnect|에 대한 UDF 지원¶
사용자 정의 함수 또는 사용자 지정 코드를 사용할 때 다음 중 하나를 수행합니다.
클래스 파인더를 등록하여 클래스 파일을 모니터링하고 업로드합니다.
import org.apache.spark.sql.connect.client.REPLClassDirMonitor val classFinder = new REPLClassDirMonitor("/absolute/path/to/target/scala-2.12/classes") spark.registerClassFinder(classFinder)
필요한 경우 JAR 종속성을 업로드합니다. 클래스 찾기가 사용되지 않는 경우 워크로드 JAR 자체를 포함할 수 있습니다.
spark.addArtifact("/absolute/path/to/dependency.jar")
스테이징된 JAR를 사용합니다.
spark.conf.set("snowpark.connect.udf.java.imports", "[@mystage/dependency.jar, @db.schema.stage/other_dependency.jar]")
Scala 2.13 사용하기¶
기본적으로, |spconnect|는 Scala 2.12를 사용합니다. Scala 2.13으로 빌드된 워크로드는 “snowpark.connect.scala.version” 구성 옵션을 사용하여 Scala 버전을 지정해야 합니다.
// Directly in the session builder
val spark = SparkSession.builder()
.remote("sc://localhost:15002")
.config("snowpark.connect.scala.version", "2.13")
.getOrCreate()
// Or via session configuration
spark.conf.set("snowpark.connect.scala.version", "2.13")
Snowpark Connect for Spark 설치 문제 해결하기¶
다음 검사 목록을 통해 Snowpark Connect for Spark 설치 및 사용 문제를 해결할 수 있습니다.
Java와 Python이 :ref:`동일한 아키텍처에 기반<label-snowpark_connect_jupyter_prereq>`을 두고 있는지 확인합니다.
:ref:`label-snowpark_connect_jupyter_install_spconnect`에 설명된 대로 가장 최신의 Snowpark Connect for Spark 패키지 파일을 사용합니다.
PySpark 코드가 포함된 python 명령이 로컬 실행, 즉 Snowflake 연결 없이 올바르게 작동하는지 확인합니다.
예를 들면, 다음과 같은 명령을 실행합니다.
python your_pyspark_file.py
오픈 소스 클라이언트¶
Jupyter Notebooks 및 VS 등 선호하는 로컬 환경에서 표준 기성 오픈 소스 소프트웨어(OSS) Spark 클라이언트 패키지(예: Java 또는 Scala용 PySpark 및 Spark 클라이언트)를 사용할 수 있습니다. 이 방식을 사용하면, Snowflake에 특정한 패키지를 설치하지 않아도 됩니다.
Spark 코드를 로컬에서 작성하고 코드에서 Snowflake 컴퓨팅 리소스 및 엔터프라이즈 거버넌스를 사용하도록 하려는 경우 이 방식이 유용할 수 있습니다. 이 시나리오에서는 프로그래밍 방식 액세스 토큰(PATs)을 통해 인증 및 권한 부여를 수행합니다.
다음 섹션에서는 설치, 구성 및 인증을 다룹니다. 또한 간단한 PySpark 예제를 확인하여 연결의 유효성을 검사합니다.
1단계: 필수 패키지 설치하기¶
pyspark를 설치합니다. Snowflake 패키지를 설치할 필요가 없습니다.pip install "pyspark[connect]>=3.5.0,<4"
2단계: 설정 및 인증¶
프로그래밍 방식 액세스 토큰(PAT)을 생성합니다.
자세한 내용은 다음 항목을 참조하십시오.
다음 예제에서는
TEST_PAT사용자에 대해 :code:`sysadmin`이라는 PAT를 추가하고 만료일을 30일로 설정합니다.ALTER USER add PAT TEST_PAT ROLE_RESTRICTION = sysadmin DAYS_TO_EXPIRY = 30;
Snowflake Spark Connect 호스트 URL을 찾습니다.
Snowflake에서 다음 SQL을 실행하여 계정의 호스트 이름을 찾습니다.
SELECT t.VALUE:type::VARCHAR as type, t.VALUE:host::VARCHAR as host, t.VALUE:port as port FROM TABLE(FLATTEN(input => PARSE_JSON(SYSTEM$ALLOWLIST()))) AS t where type = 'SNOWPARK_CONNECT';
3단계: Spark Connect 서버에 연결하기¶
Spark Connect 서버에 연결하려면 다음과 같은 코드를 사용합니다.
from pyspark.sql import SparkSession import urllib.parse # Replace with your actual PAT. pat = urllib.parse.quote("<pat>", safe="") # Replace with your Snowpark Connect host from the above SQL query. snowpark_connect_host = "" # Define database/schema/warehouse for executing your Spark session in Snowflake (recommended); otherwise, it will be resolved from your default_namespace and default_warehouse db_name = urllib.parse.quote("TESTDB", safe="") schema_name = urllib.parse.quote("TESTSCHEMA", safe="") warehouse_name = urllib.parse.quote("TESTWH", safe="") spark = SparkSession.builder.remote(f"sc://{snowpark_connect_host}/;token={pat};token_type=PAT;database={db_name};schema={schema_name};warehouse={warehouse_name}").getOrCreate() # Spark session is ready to use. You can write regular Spark DataFrame code, as in the following example: from pyspark.sql import Row df = spark.createDataFrame([ Row(a=1, b=2.), Row(a=2, b=3.), Row(a=4, b=5.),]) print(df.count())