Snowpark Submit 예

이 항목에는 |spsubmit|을 사용해 프로덕션용 Spark 애플리케이션을 제출하는 예제가 포함되어 있습니다.

Snowflake 스테이지에서 애플리케이션 배포

다음 예제는 Snowflake 스테이지에서 애플리케이션과 해당 종속성을 배포하는 방법을 보여줍니다.

  1. 다음 명령을 사용하여 애플리케이션 파일을 스테이지에 업로드합니다.

    PUT file:///<abs_path>/app.py @my_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE
    PUT file:///<abs_path>/dependencies.zip @my_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE
    
    Copy
  2. 스테이지에 업로드한 파일을 사용하여 작업을 제출하려면 다음 명령을 사용합니다.

    snowpark-submit \
      --py-files @my_stage/dependencies.zip \
      --snowflake-stage @my_stage \
      --snowflake-workload-name MY_JOB \
      --snowflake-connection-name MY_CONNECTION\
      --compute-pool MY_COMPUTE_POOL \
      @my_stage/app.py
    
    Copy

대기 및 로그를 사용하여 모니터링하기

다음 예제에서는 작업을 제출하고 완료될 때까지 기다렸다가 로그를 검색하는 방법을 보여줍니다.

  1. 다음 명령을 사용하여 작업을 제출하고 완료될 때까지 기다립니다.

    snowpark-submit \
      --snowflake-workload-name MY_JOB \
      --wait-for-completion \
      --snowflake-connection-name MY_CONNECTION \
      --compute-pool MY_COMPUTE_POOL \
      app.py
    
    Copy
  2. 작업에 실패하면 다음 명령을 사용하여 자세한 로그를 확인합니다.

    snowpark-submit
      --snowflake-workload-name MY_JOB
      --workload-status
      --display-logs
      --snowflake-connection-name MY_CONNECTION
    
    Copy

Apache Airflow DAG에서 Snowpark Submit 사용하기

|spconnect|를 통해 Spark 작업을 Snowflake에 제출할 수 있습니다. 클러스터 모드에서 snowpark-submit 명령을 사용하면 컴퓨팅 풀을 활용해 작업을 실행할 수 있습니다.

이 방식으로 Apache Airflow를 사용하는 경우 Apache Airflow를 실행하는 Docker 서비스 또는 Snowpark Container Services 컨테이너가 Snowflake 및 Snowflake 스테이지의 필수 파일에 적절한 액세스 권한을 보유하고 있는지 확인하세요.

다음 예제의 코드는 다음과 같은 작업을 수행합니다.

  • :file:`/tmp/myenv`에서 Python 가상 환경을 만듭니다.

    create_venv 작업 시 코드는 .whl 파일을 이용해 snowpark-submit 패키지를 설치하는 :code:`pip`를 사용할 수 있습니다.

  • Snowflake 연결 자격 증명과 OAuth 토큰으로 보안 connections.toml 파일을 생성합니다

    create_connections_toml 작업 시 코드는 /app/.snowflake 디렉터리와 .toml 파일을 차례대로 만든 다음 파일 권한을 변경하여 소유자(사용자)만 읽기/쓰기 액세스 권한을 보유하도록 허용합니다.

  • snowpark-submit 명령을 사용하여 Spark 작업을 실행합니다.

    run_snowpark_script 작업 시 코드는 다음과 같은 작업을 수행합니다.

    • 가상 환경을 활성화합니다.

    • snowpark-submit 명령을 사용하여 Spark 작업을 실행합니다.

    • 클러스터 모드를 사용하여 Snowflake에 배포합니다.

    • Snowpark Connect for Spark remote URI sc://localhost:15002를 사용합니다.

    • org.example.SnowparkConnectApp Spark 애플리케이션 클래스를 지정합니다.

    • @snowflake_stage 스테이지에서 스크립트를 가져옵니다.

    • :code:`–wait-for-completion`을 사용하여 작업이 완료될 때까지 배포를 차단합니다.

import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
from airflow.operators.trigger_dagrun import TriggerDagRunOperator


default_args = {
  'start_date': airflow.utils.dates.days_ago(1),
  'retries': 0,
}


with DAG(
  'run_sparkconnect_python_script',
  default_args=default_args,
  schedule_interval=None,
  catchup=False,
) as dag:

  create_venv = BashOperator(
      task_id='create_venv',
      bash_command="""
      python3 -m venv /tmp/myenv &&
      source /tmp/myenv/bin/activate &&
      export PIP_USER=false &&
      pip install --upgrade pip &&
      pip install --no-cache-dir grpcio-tools>=1.48.1 &&
      pip install /app/snowpark_submit-<version>.whl
      """
  )

  create_connections_toml = BashOperator(
      task_id='create_connections_toml',
      bash_command="""
      mkdir -p /app/.snowflake
      echo "${SNOWFLAKE_USER}"
      cat <<EOF > /app/.snowflake/connections.toml

[snowpark-submit]
host = "${SNOWFLAKE_HOST}"
port = "${SNOWFLAKE_PORT}"
protocol = "https"
account = "${SNOWFLAKE_ACCOUNT}"
authenticator = "oauth"
token = "$(cat /snowflake/session/token)"
warehouse = "airflow_wh"
database = "${SNOWFLAKE_DATABASE}"
schema = "${SNOWFLAKE_SCHEMA}"
client_session_keep_alive = true
EOF
  chmod 600 /app/.snowflake/connections.toml
  """
  )

  run_script = BashOperator(
      task_id='run_snowpark_script',
      bash_command="""
      set -e
      echo "Using SNOWFLAKE_HOME: $SNOWFLAKE_HOME"

      echo "Running Python script with Snowpark..."
      source /tmp/myenv/bin/activate &&
      snowpark-submit --deploy-mode cluster --class org.example.SnowparkConnectApp --compute-pool="snowparksubmit" --snowflake-workload-name="spcstest" --snowflake-stage="@AIRFLOW_APP_FILES" --wait-for-completion "@AIRFLOW_APP_FILES/transformation.py" --snowflake-connection-name snowpark-submit
      """,
      env={
          'SNOWFLAKE_HOME': '/app/.snowflake'
      }
  )

create_venv >> create_connections_toml >> run_script
Copy

Apache Airflow 사용자 인터페이스의 그래프 뷰 또는 트리 뷰를 사용하여 DAG를 모니터링할 수 있습니다. 작업 로그에서 다음 항목을 검사합니다.

  • 환경 설정

  • |spconnect|의 상태

  • snowpark-submit 작업 출력

Snowflake 스테이지에 저장된 로그 또는 이벤트 테이블에서 Snowflake에 실행된 작업을 모니터링할 수도 있습니다.