Snowpark Submit 예

이 항목에는 Snowpark Submit 을 사용해 프로덕션용 Spark 애플리케이션을 제출하는 예제가 포함되어 있습니다.

간단한 Spark 애플리케이션 작성 및 제출

다음 예제에서는 종속성이 없는 간단한 Spark 애플리케이션을 작성하고 제출하는 방법을 보여줍니다.

  1. 로컬 IDE에서 다음 내용으로 이름이 ``app.py``인 새 Python 파일을 생성합니다.

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, lit, upper, concat
    
    # Create Spark session
    spark = SparkSession.builder.appName("SimpleSession").getOrCreate()
    
    # Create a DataFrame from inline data
    data = [
        (1, "alice", "engineering", 95000),
        (2, "bob", "marketing", 72000),
        (3, "carol", "engineering", 105000),
        (4, "david", "sales", 68000),
        (5, "eva", "engineering", 88000),
    ]
    df = spark.createDataFrame(data, ["id", "name", "department", "salary"])
    
    # Add a new column
    df_with_bonus = df.withColumn("bonus", col("salary") * 0.1)
    df_with_bonus.show()
    
    # Filter and transform
    engineers = df.filter(col("department") == "engineering") \
        .withColumn("name_upper", upper(col("name"))) \
        .withColumn("greeting", concat(lit("Hello, "), col("name")))
    engineers.show()
    
    # Aggregate
    df.groupBy("department").avg("salary").show()
    
    # Stop the Spark session
    spark.stop()
    
    Copy
  2. 애플리케이션을 제출하려면 다음 명령을 사용합니다.

    snowpark-submit \
      --snowflake-workload-name MY_JOB \
      --snowflake-connection-name MY_CONNECTION \
      /path/to/app.py
    
    Copy

    --wait-for-completion 옵션을 사용하여 작업이 완료될 때까지 대기하고, --workload-status 옵션을 사용하여 작업 상태를 확인하고, --display-logs 옵션을 사용하여 작업 로그를 표시할 수 있습니다. 전체 옵션 목록은 Snowpark Submit 참조 섹션을 참조하세요.

Snowflake 스테이지에서 애플리케이션 배포

애플리케이션에 읽어야 하는 파일과 같은 종속성이 있는 경우 Snowflake 스테이지에서 배포할 수 있습니다. 다음 예제에서는 Snowflake 스테이지에서 애플리케이션과 해당 종속성을 배포하는 방법을 보여줍니다.

  1. 터미널에서 스테이지로 파일을 업로드하려면 Snowflake CLI를 사용하면 됩니다. SnowSQL은 레거시 CLI이며, 이미 사용 중인 경우 해당 CLI를 사용하여 스테이지에 파일을 업로드할 수도 있습니다. 아직 Snowflake CLI를 설치하지 않은 경우 :doc:`/developer-guide/snowflake-cli/installation/installation`의 지침에 따라 설치할 수 있습니다.

  2. 로컬 IDE에서 다음 내용으로 이름이 ``sample_employees.csv``인 새 CSV 파일을 생성합니다.

    employee_id,name,department,salary,years_employed
    1,Alice Johnson,Engineering,95000,5
    2,Bob Smith,Marketing,72000,3
    3,Carol Williams,Engineering,105000,8
    4,David Brown,Sales,68000,2
    5,Eva Martinez,Engineering,88000,4
    6,Frank Wilson,Marketing,75000,6
    7,Grace Lee,Sales,92000,7
    8,Henry Taylor,Engineering,110000,10
    9,Ivy Chen,Marketing,65000,1
    10,Jack Davis,Sales,78000,4
    11,Karen White,Engineering,98000,6
    12,Leo Harris,Marketing,71000,3
    13,Maria Garcia,Sales,85000,5
    14,Nathan Clark,Engineering,102000,9
    15,Olivia Moore,Marketing,69000,2
    
    Copy

    다음 명령을 사용하여 종속성 파일을 스테이지에 업로드합니다. 여기서 my_stage`는 계정에 있는 스테이지의 이름입니다. (생성된 스테이지가 없으면 [`snow stage create](/developer-guide/snowflake-cli/command-reference/stage-commands/create)를 사용할 수 있습니다.)

    snow stage copy sample_employees.csv @<database>.<schema>.<stage>/sample_employees.csv -c MY_CONNECTION
    
    Copy

    파일이 성공적으로 업로드되었는지 확인하려면 다음 명령을 사용하여 스테이지의 파일을 나열할 수 있습니다.

    snow sql -c MY_CONNECTION -q "ls @<database>.<schema>.<stage>"
    
    Copy

    sample_employees.csv 파일이 목록에 표시되어야 합니다.

  3. 로컬 IDE에서 다음 내용으로 이름이 ``app.py``인 새 Python 파일을 생성합니다.

    from pyspark.sql import SparkSession
    
    # Create Spark session
    spark = SparkSession.builder.appName("SimpleStageExample").getOrCreate()
    
    # Load data from stage (adjust stage name to match yours)
    df = spark.read.csv("/app/<YOUR_STAGE>/sample_employees.csv", header=True, inferSchema=True)
    df.show()
    
    # Filter: Engineering department only
    engineers = df.filter(df["department"] == "Engineering")
    engineers.show()
    
    # Filter: Salary > 80000 and years_employed > 3
    senior_high_earners = df.filter((df["salary"] > 80000) & (df["years_employed"] > 3))
    senior_high_earners.show()
    
    # Aggregate: Average salary by department
    df.groupBy("department").avg("salary").show()
    
    # Select specific columns
    result = senior_high_earners.select("name", "department", "salary")
    result.show()
    
    # Stop the Spark session
    spark.stop()
    
    Copy

    스테이지에 업로드한 파일을 사용하는 애플리케이션을 제출하려면 다음 명령을 사용합니다.

    snowpark-submit \
      --snowflake-connection-name MY_CONNECTION \
      --snowflake-workload-name MY_JOB \
      --snowflake-stage @<database>.<schema>.<stage> \
      /path/to/app.py
    
    Copy

    컴퓨팅 풀은 애플리케이션을 실행하는 데 필요하며, connections.toml 파일에 지정하거나 --compute-pool 옵션을 사용하여 명령줄에 지정해야 합니다. 자세한 내용은 Snowpark Submit 참조 섹션을 참조하십시오.

대기 및 로그를 사용하여 모니터링하기

다음 예제에서는 작업을 제출하고 완료될 때까지 기다렸다가 로그를 검색하는 방법을 보여줍니다.

  1. 다음 명령을 사용하여 작업을 제출하고 완료될 때까지 기다립니다.

    snowpark-submit \
      --snowflake-workload-name MY_JOB \
      --wait-for-completion \
      --snowflake-connection-name MY_CONNECTION \
      /path/to/app.py
    
    Copy
  2. 작업에 실패하면 다음 명령을 사용하여 자세한 로그를 확인합니다.

    snowpark-submit
      --snowflake-workload-name MY_JOB \
      --workload-status \
      --display-logs \
      --snowflake-connection-name MY_CONNECTION
    
    Copy

Apache Airflow DAG에서 Snowpark Submit 사용하기

Snowpark Connect for Spark 를 통해 Spark 작업을 Snowflake에 제출할 수 있습니다. 클러스터 모드에서 snowpark-submit 명령을 사용하면 컴퓨팅 풀을 활용해 작업을 실행할 수 있습니다.

이 방식으로 Apache Airflow를 사용하는 경우 Apache Airflow를 실행하는 Docker 서비스 또는 Snowpark Container Services 컨테이너가 Snowflake 및 Snowflake 스테이지의 필수 파일에 적절한 액세스 권한을 보유하고 있는지 확인하세요.

다음 예제의 코드는 다음과 같은 작업을 수행합니다.

  • :file:`/tmp/myenv`에서 Python 가상 환경을 만듭니다.

    create_venv 작업 시 코드는 .whl 파일을 이용해 snowpark-submit 패키지를 설치하는 :code:`pip`를 사용할 수 있습니다.

  • Snowflake 연결 자격 증명과 OAuth 토큰으로 보안 connections.toml 파일을 생성합니다

    create_connections_toml 작업 시 코드는 /app/.snowflake 디렉터리와 .toml 파일을 차례대로 만든 다음 파일 권한을 변경하여 소유자(사용자)만 읽기/쓰기 액세스 권한을 보유하도록 허용합니다.

  • snowpark-submit 명령을 사용하여 Spark 작업을 실행합니다.

    run_snowpark_script 작업 시 코드는 다음과 같은 작업을 수행합니다.

    • 가상 환경을 활성화합니다.

    • snowpark-submit 명령을 사용하여 Spark 작업을 실행합니다.

    • 클러스터 모드를 사용하여 Snowflake에 배포합니다.

    • Snowpark Connect for Spark remote URI sc://localhost:15002를 사용합니다.

    • org.example.SnowparkConnectApp Spark 애플리케이션 클래스를 지정합니다.

    • @snowflake_stage 스테이지에서 스크립트를 가져옵니다.

    • :code:`–wait-for-completion`을 사용하여 작업이 완료될 때까지 배포를 차단합니다.

import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
from airflow.operators.trigger_dagrun import TriggerDagRunOperator


default_args = {
  'start_date': airflow.utils.dates.days_ago(1),
  'retries': 0,
}


with DAG(
  'run_sparkconnect_python_script',
  default_args=default_args,
  schedule_interval=None,
  catchup=False,
) as dag:

  create_venv = BashOperator(
      task_id='create_venv',
      bash_command="""
      python3 -m venv /tmp/myenv &&
      source /tmp/myenv/bin/activate &&
      export PIP_USER=false &&
      pip install --upgrade pip &&
      pip install --no-cache-dir grpcio-tools>=1.48.1 &&
      pip install /app/snowpark_submit-<version>.whl
      """
  )

  create_connections_toml = BashOperator(
      task_id='create_connections_toml',
      bash_command="""
      mkdir -p /app/.snowflake
      echo "${SNOWFLAKE_USER}"
      cat <<EOF > /app/.snowflake/connections.toml

[snowpark-submit]
host = "${SNOWFLAKE_HOST}"
port = "${SNOWFLAKE_PORT}"
protocol = "https"
account = "${SNOWFLAKE_ACCOUNT}"
authenticator = "oauth"
token = "$(cat /snowflake/session/token)"
warehouse = "airflow_wh"
database = "${SNOWFLAKE_DATABASE}"
schema = "${SNOWFLAKE_SCHEMA}"
client_session_keep_alive = true
EOF
  chmod 600 /app/.snowflake/connections.toml
  """
  )

  run_script = BashOperator(
      task_id='run_snowpark_script',
      bash_command="""
      set -e
      echo "Using SNOWFLAKE_HOME: $SNOWFLAKE_HOME"

      echo "Running Python script with Snowpark..."
      source /tmp/myenv/bin/activate &&
      snowpark-submit --deploy-mode cluster --class org.example.SnowparkConnectApp --compute-pool="snowparksubmit" --snowflake-workload-name="spcstest" --snowflake-stage="@AIRFLOW_APP_FILES" --wait-for-completion "@AIRFLOW_APP_FILES/transformation.py" --snowflake-connection-name snowpark-submit
      """,
      env={
          'SNOWFLAKE_HOME': '/app/.snowflake'
      }
  )

create_venv >> create_connections_toml >> run_script
Copy

Apache Airflow 사용자 인터페이스의 그래프 뷰 또는 트리 뷰를 사용하여 DAG를 모니터링할 수 있습니다. 작업 로그에서 다음 항목을 검사합니다.

  • 환경 설정

  • Snowpark Connect for Spark 의 상태

  • snowpark-submit 작업 출력

Snowflake 스테이지에 저장된 로그 또는 이벤트 테이블에서 Snowflake에 실행된 작업을 모니터링할 수도 있습니다.