Snowpark Submit 例

このトピックには、実稼働対応 Sparkアプリケーションを送信するために Snowpark Submit を使用する例が含まれています。

Snowflakeステージからアプリケーションをデプロイします。

次の例は、Snowflakeステージからアプリケーションとその依存関係をデプロイする方法を示しています。

  1. 以下のコマンドを使用して、アプリケーションファイルをステージにアップロードします。

    PUT file:///<abs_path>/app.py @my_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE
    PUT file:///<abs_path>/dependencies.zip @my_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE
    
    Copy
  2. ステージにアップロードしたファイルを使用してジョブを送信するには、以下のコマンドを使用します。

    snowpark-submit \
      --py-files @my_stage/dependencies.zip \
      --snowflake-stage @my_stage \
      --snowflake-workload-name MY_JOB \
      --snowflake-connection-name MY_CONNECTION\
      --compute-pool MY_COMPUTE_POOL \
      @my_stage/app.py
    
    Copy

待機とログによるモニター

次の例では、ジョブを送信し、完了を待ち、ログを取得する方法を示します。

  1. 以下のコマンドを使用してジョブを送信し、完了を待ちます。

    snowpark-submit \
      --snowflake-workload-name MY_JOB \
      --wait-for-completion \
      --snowflake-connection-name MY_CONNECTION \
      --compute-pool MY_COMPUTE_POOL \
      app.py
    
    Copy
  2. ジョブが失敗した場合は、以下のコマンドを使用して詳細ログを確認します。

    snowpark-submit
      --snowflake-workload-name MY_JOB
      --workload-status
      --display-logs
      --snowflake-connection-name MY_CONNECTION
    
    Copy

Apache Airflow DAG での Snowpark Submit を使用します。

Sparkジョブを Snowpark Connect for Spark 経由でSnowflakeに送信できます。クラスターモードの Snowpark送信 コマンドを使用して、ジョブを実行するためコンピューティングプールを活用できます。

この方法で Apache Airflow を使用する場合は、Apache Airflow を実行する Docker サービスまたはSnowpark Container Servicesコンテナが、Snowflakeステージで必要なファイルと Snowflake に適切にアクセスできることを確認してください。

次の例のコードは、以下のタスクを実行します。

  • /tmp/myenv でPythonの仮想環境を作成します。

    create_venv タスク内で、 .whl ファイルを使用して snowpark-submit パッケージをインストールするために、コードは pip を使用します。

  • Snowflake接続認証情報と OAuth トークンを含むセキュア connections.toml ファイルを生成します。

    create_connections_toml タスク内で、コードは /app/.snowflake ディレクトリ を作成し、 .toml ファイルを作成します。それから所有者(ユーザー)のみが読み取りと書き込みアクセスできるようにファイル権限を変更します。

  • Snowpark送信 コマンドを使用してSparkジョブを実行します。

    run_snowpark_script タスク内で、コードは以下のことを実行します。

    • 仮想環境をアクティブにします。

    • Snowpark送信 コマンドを使用してSparkジョブを実行します。

    • クラスターモードを使用してSnowflakeにデプロイします。

    • Snowpark Connect for Spark リモート URI sc://localhost:15002 を使用します。

    • Sparkアプリケーションクラス org.example.SnowparkConnectApp を指定します。

    • @snowflake_stage ステージからスクリプトをプルします。

    • --wait-for-completion を使用して、ジョブが終了するまで展開をブロックします。

import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
from airflow.operators.trigger_dagrun import TriggerDagRunOperator


default_args = {
  'start_date': airflow.utils.dates.days_ago(1),
  'retries': 0,
}


with DAG(
  'run_sparkconnect_python_script',
  default_args=default_args,
  schedule_interval=None,
  catchup=False,
) as dag:

  create_venv = BashOperator(
      task_id='create_venv',
      bash_command="""
      python3 -m venv /tmp/myenv &&
      source /tmp/myenv/bin/activate &&
      export PIP_USER=false &&
      pip install --upgrade pip &&
      pip install --no-cache-dir grpcio-tools>=1.48.1 &&
      pip install /app/snowpark_submit-<version>.whl
      """
  )

  create_connections_toml = BashOperator(
      task_id='create_connections_toml',
      bash_command="""
      mkdir -p /app/.snowflake
      echo "${SNOWFLAKE_USER}"
      cat <<EOF > /app/.snowflake/connections.toml

[snowpark-submit]
host = "${SNOWFLAKE_HOST}"
port = "${SNOWFLAKE_PORT}"
protocol = "https"
account = "${SNOWFLAKE_ACCOUNT}"
authenticator = "oauth"
token = "$(cat /snowflake/session/token)"
warehouse = "airflow_wh"
database = "${SNOWFLAKE_DATABASE}"
schema = "${SNOWFLAKE_SCHEMA}"
client_session_keep_alive = true
EOF
  chmod 600 /app/.snowflake/connections.toml
  """
  )

  run_script = BashOperator(
      task_id='run_snowpark_script',
      bash_command="""
      set -e
      echo "Using SNOWFLAKE_HOME: $SNOWFLAKE_HOME"

      echo "Running Python script with Snowpark..."
      source /tmp/myenv/bin/activate &&
      snowpark-submit --deploy-mode cluster --class org.example.SnowparkConnectApp --compute-pool="snowparksubmit" --snowflake-workload-name="spcstest" --snowflake-stage="@AIRFLOW_APP_FILES" --wait-for-completion "@AIRFLOW_APP_FILES/transformation.py" --snowflake-connection-name snowpark-submit
      """,
      env={
          'SNOWFLAKE_HOME': '/app/.snowflake'
      }
  )

create_venv >> create_connections_toml >> run_script
Copy

Apache Airflowユーザーインターフェースのグラフビューまたはツリービューを使用して、 DAG をモニターできます。次の項目のタスクログを確認します。

  • 環境設定

  • Snowpark Connect for Spark のステータス

  • コマンド:Snowpark送信 ジョブ出力

Snowflakeステージに保存されたログまたはイベントテーブルから、Snowflakeで実行されたジョブをモニターすることもできます。