VSCode, Jupyter Notebooks 또는 터미널에서 Spark 워크로드 실행하기

Jupyter Notebooks, VS Code, Spark 클러스터를 관리할 필요가 없는 Python 기반 인터페이스에서 대화형으로 Spark 워크로드를 실행할 수 있습니다. 워크로드는 Snowflake 인프라에서 실행됩니다.

예를 들면, 다음 작업을 수행할 수 있습니다.

  1. 전제 조건을 갖추고 있는지 확인합니다.

  2. Snowflake에서 Snowpark Connect for Spark 와 연결할 수 있는 환경을 설정합니다.

  3. Snowpark Connect for Spark 를 설치합니다.

  4. 클라이언트에서 PySpark 코드를 실행하여 Snowflake에서 실행합니다.

전제 조건

Python 및 Java 설치가 동일한 컴퓨터 아키텍처에 기반을 두고 있는지 확인합니다. 예를 들어, Python이 arm64 기반인 경우 Java도 arm64여야 합니다(예: x86_64 아님).

환경 설정하기

코드로 Snowflake에서 Snowpark Connect for Spark 에 연결할 수 있도록 하여 개발 환경을 설정할 수 있습니다. Snowflake 클라이언트 코드에 연결하려면, 연결 세부 정보가 포함된 .toml 파일을 사용합니다.

Snowflake CLI 가 설치되어 있다면 이를 사용하여 연결을 정의할 수 있습니다. 그렇지 않으면 config.toml 파일에 연결 매개 변수를 수동으로 작성할 수 있습니다.

Snowflake CLI 를 사용하여 연결 추가하기

Snowflake CLI 를 사용하면 Snowpark Connect for Spark 에서 Snowflake에 연결할 수 있는 연결 속성을 추가할 수 있습니다. 변경 사항은 config.toml 파일에 저장됩니다.

  1. 다음 명령을 실행해 snow connection add 명령을 사용하는 연결을 추가합니다.

    snow connection add
    
    Copy
  2. 프롬프트에 따라 연결을 정의합니다.

    :code:`spark-connect`를 연결 이름으로 지정해야 합니다.

    다음 예제와 같이 이 명령은 config.toml 파일에 연결을 추가합니다

    [connections.spark-connect]
    host = "example.snowflakecomputing.com"
    port = 443
    account = "example"
    user = "test_example"
    password = "password"
    protocol = "https"
    warehouse = "example_wh"
    database = "example_db"
    schema = "public"
    
    Copy
  3. 다음 명령을 실행하여 연결이 작동하는지 확인합니다.

    Snowflake CLI 를 사용하여 추가한 경우 이 방법으로 연결을 테스트할 수 있습니다.

    snow connection list
    snow connection test --connection spark-connect
    
    Copy

연결 파일을 수동으로 작성하여 연결 추가하기

코드로 Snowflake에서 Snowpark Connect for Spark 에 연결할 수 있도록 connections.toml 파일을 수동으로 작성하거나 업데이트할 수 있습니다.

  1. 다음 명령을 실행하여 connections.toml 파일에서 소유자(사용자)만 읽기/쓰기 액세스 권한을 보유할 수 있도록 합니다.

    chmod 0600 "~/.snowflake/connections.toml"
    
    Copy
  2. 다음 예제의 연결 속성을 사용하여 [spark-connect] 연결을 포함하도록 connections.toml 파일을 편집합니다.

    값을 사용자 고유의 연결 세부 정보로 바꿔야 합니다.

    [spark-connect]
    host="my_snowflake_account.snowflakecomputing.com"
    account="my_snowflake_account"
    user="my_user"
    password="&&&&&&&&"
    warehouse="my_wh"
    database="my_db"
    schema="public"
    
    Copy

Snowpark Connect for Spark 설치하기

Python 패키지로 Snowpark Connect for Spark 를 설치할 수 있습니다.

  1. Python 가상 환경을 만듭니다.

    :code:`python3 –version`을 실행하여 Python 버전이 3.10 이상 및 3.13 이전인지 확인합니다.

    python3 -m venv .venv
    source .venv/bin/activate
    
    Copy
  2. Snowpark Connect for Spark 패키지를 설치합니다.

    pip install --upgrade --force-reinstall 'snowpark-connect[jdk]'
    
    Copy
  3. Python 코드를 추가하여 Snowpark Connect for Spark 서버를 시작하고 Snowpark Connect for Spark 세션을 만듭니다.

    from snowflake import snowflake.snowpark_connect
    
    # Import snowpark_connect *before* importing pyspark libraries
    from pyspark.sql.types import Row
    
    spark = snowflake.snowpark_connect.server.init_spark_session()
    
    Copy

클라이언트에서 Python 코드 실행

인증된 연결이 설정되면 평소와 같이 코드를 작성할 수 있습니다.

PySpark 클라이언트 라이브러리를 사용하여 Snowpark Connect for Spark 에 연결되는 PySpark 코드를 실행할 수 있습니다.

from pyspark.sql import Row

  df = spark.createDataFrame([
      Row(a=1, b=2.),
      Row(a=2, b=3.),
      Row(a=4, b=5.),])

  print(df.count())
Copy

클라이언트에서 Scala 코드 실행하기

Spark Connect 클라이언트 라이브러리를 사용하여 |spconnect|에 연결하는 Scala 애플리케이션을 실행할 수 있습니다.

이 가이드에서는 Snowpark Connect를 설정하고 Scala 애플리케이션을 Snowpark Connect for Spark 서버에 연결하는 방법을 안내합니다.

1단계: Snowpark Connect for Spark 환경 설정

다음 항목에 설명된 단계를 사용하여 환경을 설정합니다.

  1. Python 가상 환경을 생성하고 Snowpark Connect를 설치합니다.

  2. 연결을 설정합니다.

2단계: Snowpark Connect for Spark 서버 스크립트 생성 및 서버 실행

  1. Python 스크립트를 생성하고 Snowpark Connect for Spark 서버를 실행합니다.

    # launch-snowpark-connect.py
    
    from snowflake import snowpark_connect
    
    def main():
        snowpark_connect.start_session(is_daemon=False, remote_url="sc://localhost:15002")
        print("SAS started on port 15002")
    
    if __name__ == "__main__":
        main()
    
    Copy
  2. Snowpark Connect for Spark 서버를 실행합니다.

    # Make sure you're in the correct Python environment
    pyenv activate your-snowpark-connect-env
    
    # Run the server script
    python launch-snowpark-connect.py
    
    Copy

3단계: Scala 애플리케이션 설정하기

  1. build.sbt 파일에 Spark Connect 클라이언트 종속성을 추가합니다.

    libraryDependencies += "org.apache.spark" %% "spark-connect-client-jvm" % "3.5.6"
    
    // Add JVM options for Java 9+ module system compatibility
    javaOptions ++= Seq(
      "--add-opens=java.base/java.nio=ALL-UNNAMED"
    )
    
    Copy
  2. Scala Code를 실행하여 Snowpark Connect for Spark 서버에 연결합니다.

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.connect.client.REPLClassDirMonitor
    
    object SnowparkConnectExample {
      def main(args: Array[String]): Unit = {
        // Create Spark session with Snowpark Connect
        val spark = SparkSession.builder().remote("sc://localhost:15002").getOrCreate()
    
        // Register ClassFinder for UDF support (if needed)
        // val classFinder = new REPLClassDirMonitor("target/scala-2.12/classes")
        // spark.registerClassFinder(classFinder)
    
        try {
          // Simple DataFrame operations
          import spark.implicits._
    
          val data = Seq(
            (1, "Alice", 25),
            (2, "Bob", 30),
            (3, "Charlie", 35)
          )
    
          val df = spark.createDataFrame(data).toDF("id", "name", "age")
    
          println("Original DataFrame:")
          df.show()
    
          println("Filtered DataFrame (age > 28):")
          df.filter($"age" > 28).show()
    
          println("Aggregated result:")
          df.groupBy().avg("age").show()
    
        } finally {
          spark.stop()
        }
      }
    }
    
    Copy
  3. 애플리케이션을 컴파일하고 실행합니다.

    # Compile your Scala application
    sbt compile
    
    # Run the application
    sbt "runMain SnowparkConnectExample"
    
    Copy

|spconnect|에 대한 UDF 지원

사용자 정의 함수 또는 사용자 지정 코드를 사용할 때 다음 중 하나를 수행합니다.

  • 클래스 파인더를 등록하여 클래스 파일을 모니터링하고 업로드합니다.

    import org.apache.spark.sql.connect.client.REPLClassDirMonitor
    
    val classFinder = new REPLClassDirMonitor("/absolute/path/to/target/scala-2.12/classes")
    spark.registerClassFinder(classFinder)
    
    Copy
  • 필요한 경우 JAR 종속성을 업로드합니다. 클래스 찾기가 사용되지 않는 경우 워크로드 JAR 자체를 포함할 수 있습니다.

    spark.addArtifact("/absolute/path/to/dependency.jar")
    
    Copy
  • 스테이징된 JAR를 사용합니다.

    spark.conf.set("snowpark.connect.udf.java.imports", "[@mystage/dependency.jar, @db.schema.stage/other_dependency.jar]")
    
    Copy

Scala 2.13 사용하기

기본적으로, |spconnect|는 Scala 2.12를 사용합니다. Scala 2.13으로 빌드된 워크로드는 “snowpark.connect.scala.version” 구성 옵션을 사용하여 Scala 버전을 지정해야 합니다.

// Directly in the session builder
val spark = SparkSession.builder()
  .remote("sc://localhost:15002")
  .config("snowpark.connect.scala.version", "2.13")
  .getOrCreate()

// Or via session configuration
spark.conf.set("snowpark.connect.scala.version", "2.13")
Copy

Snowpark Connect for Spark 설치 문제 해결하기

다음 검사 목록을 통해 Snowpark Connect for Spark 설치 및 사용 문제를 해결할 수 있습니다.

  • Java와 Python이 :ref:`동일한 아키텍처에 기반<label-snowpark_connect_jupyter_prereq>`을 두고 있는지 확인합니다.

  • :ref:`label-snowpark_connect_jupyter_install_spconnect`에 설명된 대로 가장 최신의 Snowpark Connect for Spark 패키지 파일을 사용합니다.

  • PySpark 코드가 포함된 python 명령이 로컬 실행, 즉 Snowflake 연결 없이 올바르게 작동하는지 확인합니다.

    예를 들면, 다음과 같은 명령을 실행합니다.

    python your_pyspark_file.py
    
    Copy

오픈 소스 클라이언트

Jupyter Notebooks 및 VS 등 선호하는 로컬 환경에서 표준 기성 오픈 소스 소프트웨어(OSS) Spark 클라이언트 패키지(예: Java 또는 Scala용 PySpark 및 Spark 클라이언트)를 사용할 수 있습니다. 이 방식을 사용하면, Snowflake에 특정한 패키지를 설치하지 않아도 됩니다.

Spark 코드를 로컬에서 작성하고 코드에서 Snowflake 컴퓨팅 리소스 및 엔터프라이즈 거버넌스를 사용하도록 하려는 경우 이 방식이 유용할 수 있습니다. 이 시나리오에서는 프로그래밍 방식 액세스 토큰(PATs)을 통해 인증 및 권한 부여를 수행합니다.

다음 섹션에서는 설치, 구성 및 인증을 다룹니다. 또한 간단한 PySpark 예제를 확인하여 연결의 유효성을 검사합니다.

1단계: 필수 패키지 설치하기

  • pyspark 를 설치합니다. Snowflake 패키지를 설치할 필요가 없습니다.

    pip install "pyspark[connect]>=3.5.0,<4"
    
    Copy

2단계: 설정 및 인증

  1. 프로그래밍 방식 액세스 토큰(PAT)을 생성합니다.

    자세한 내용은 다음 항목을 참조하십시오.

    다음 예제에서는 TEST_PAT 사용자에 대해 :code:`sysadmin`이라는 PAT를 추가하고 만료일을 30일로 설정합니다.

    ALTER USER add PAT TEST_PAT ROLE_RESTRICTION = sysadmin DAYS_TO_EXPIRY = 30;
    
    Copy
  2. Snowflake Spark Connect 호스트 URL을 찾습니다.

    Snowflake에서 다음 SQL을 실행하여 계정의 호스트 이름을 찾습니다.

    SELECT t.VALUE:type::VARCHAR as type,
           t.VALUE:host::VARCHAR as host,
           t.VALUE:port as port
      FROM TABLE(FLATTEN(input => PARSE_JSON(SYSTEM$ALLOWLIST()))) AS t where type = 'SNOWPARK_CONNECT';
    
    Copy

3단계: Spark Connect 서버에 연결하기

  • Spark Connect 서버에 연결하려면 다음과 같은 코드를 사용합니다.

    from pyspark.sql import SparkSession
    import urllib.parse
    
    # Replace with your actual PAT.
    pat = urllib.parse.quote("<pat>", safe="")
    
    # Replace with your Snowpark Connect host from the above SQL query.
    snowpark_connect_host = ""
    
    # Define database/schema/warehouse for executing your Spark session in Snowflake (recommended); otherwise, it will be resolved from your default_namespace and default_warehouse
    
    db_name = urllib.parse.quote("TESTDB", safe="")
    schema_name = urllib.parse.quote("TESTSCHEMA", safe="")
    warehouse_name = urllib.parse.quote("TESTWH", safe="")
    
    spark = SparkSession.builder.remote(f"sc://{snowpark_connect_host}/;token={pat};token_type=PAT;database={db_name};schema={schema_name};warehouse={warehouse_name}").getOrCreate()
    
    # Spark session is ready to use. You can write regular Spark DataFrame code, as in the following example:
    
    from pyspark.sql import Row
    
    df = spark.createDataFrame([
        Row(a=1, b=2.),
        Row(a=2, b=3.),
        Row(a=4, b=5.),])
    print(df.count())
    
    Copy