Snowpark Submit 例

このトピックには、実稼働対応 Sparkアプリケーションを送信するために Snowpark Submit を使用する例が含まれています。

シンプルなSparkアプリケーションの作成と送信

次の例は、依存関係のないシンプルなSparkアプリケーションを作成して送信する方法を示しています。

  1. ローカルIDEで、以下の内容の``app.py``という新しいPythonファイルを作成します。

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, lit, upper, concat
    
    # Create Spark session
    spark = SparkSession.builder.appName("SimpleSession").getOrCreate()
    
    # Create a DataFrame from inline data
    data = [
        (1, "alice", "engineering", 95000),
        (2, "bob", "marketing", 72000),
        (3, "carol", "engineering", 105000),
        (4, "david", "sales", 68000),
        (5, "eva", "engineering", 88000),
    ]
    df = spark.createDataFrame(data, ["id", "name", "department", "salary"])
    
    # Add a new column
    df_with_bonus = df.withColumn("bonus", col("salary") * 0.1)
    df_with_bonus.show()
    
    # Filter and transform
    engineers = df.filter(col("department") == "engineering") \
        .withColumn("name_upper", upper(col("name"))) \
        .withColumn("greeting", concat(lit("Hello, "), col("name")))
    engineers.show()
    
    # Aggregate
    df.groupBy("department").avg("salary").show()
    
    # Stop the Spark session
    spark.stop()
    
    Copy
  2. アプリケーションを送信するには、以下のコマンドを使用します。

    snowpark-submit \
      --snowflake-workload-name MY_JOB \
      --snowflake-connection-name MY_CONNECTION \
      /path/to/app.py
    
    Copy

    ジョブの完了を待つ:code:--wait-for-completion`オプション、ジョブのステータスを確認する:code:--workload-status`オプション、ジョブのログを表示する:code:--display-logs`オプションを使用することができます。オプションの包括的なリストについては、:doc:/developer-guide/snowpark-connect/snowpark-submit-reference`をご参照ください。

Snowflakeステージからアプリケーションをデプロイします。

アプリケーションに、読み取りが必要なファイルなどの依存関係がある場合は、Snowflakeステージからそれらをデプロイすることができます。次の例は、Snowflakeステージからアプリケーションとその依存関係をデプロイする方法を示しています。

  1. ターミナルからステージにファイルをアップロードするには、Snowflake CLIを使用できます。SnowSQLはレガシーCLIであり、すでに使用している場合は、ステージにファイルをアップロードするためにも使用できます。Snowflake CLIをまだインストールしていない場合は、:doc:`/developer-guide/snowflake-cli/installation/installation`の指示に従ってインストールできます。

  2. ローカルIDEで、以下の内容の``sample_employees.csv``という名前の新しいCSVを作成します。

    employee_id,name,department,salary,years_employed
    1,Alice Johnson,Engineering,95000,5
    2,Bob Smith,Marketing,72000,3
    3,Carol Williams,Engineering,105000,8
    4,David Brown,Sales,68000,2
    5,Eva Martinez,Engineering,88000,4
    6,Frank Wilson,Marketing,75000,6
    7,Grace Lee,Sales,92000,7
    8,Henry Taylor,Engineering,110000,10
    9,Ivy Chen,Marketing,65000,1
    10,Jack Davis,Sales,78000,4
    11,Karen White,Engineering,98000,6
    12,Leo Harris,Marketing,71000,3
    13,Maria Garcia,Sales,85000,5
    14,Nathan Clark,Engineering,102000,9
    15,Olivia Moore,Marketing,69000,2
    
    Copy

    次のコマンドを使用して、依存関係ファイルをステージにアップロードします。my_stage`は、アカウント内のステージの名前です。(ステージが作成されていない場合は、[`snow stage create](/developer-guide/snowflake-cli/command-reference/stage-commands/create)を使用できます。)

    snow stage copy sample_employees.csv @<database>.<schema>.<stage>/sample_employees.csv -c MY_CONNECTION
    
    Copy

    ファイルが正常にアップロードされたことを確認するには、次のコマンドを使用してステージ内のファイルを一覧表示します。

    snow sql -c MY_CONNECTION -q "ls @<database>.<schema>.<stage>"
    
    Copy

    リスト内に``sample_employees.csv``ファイルが表示されるはずです。

  3. ローカルIDEで、以下の内容の``app.py``という新しいPythonファイルを作成します。

    from pyspark.sql import SparkSession
    
    # Create Spark session
    spark = SparkSession.builder.appName("SimpleStageExample").getOrCreate()
    
    # Load data from stage (adjust stage name to match yours)
    df = spark.read.csv("/app/<YOUR_STAGE>/sample_employees.csv", header=True, inferSchema=True)
    df.show()
    
    # Filter: Engineering department only
    engineers = df.filter(df["department"] == "Engineering")
    engineers.show()
    
    # Filter: Salary > 80000 and years_employed > 3
    senior_high_earners = df.filter((df["salary"] > 80000) & (df["years_employed"] > 3))
    senior_high_earners.show()
    
    # Aggregate: Average salary by department
    df.groupBy("department").avg("salary").show()
    
    # Select specific columns
    result = senior_high_earners.select("name", "department", "salary")
    result.show()
    
    # Stop the Spark session
    spark.stop()
    
    Copy

    ステージにアップロードしたファイルを使用するアプリケーションを送信するには、以下のコマンドを使用します。

    snowpark-submit \
      --snowflake-connection-name MY_CONNECTION \
      --snowflake-workload-name MY_JOB \
      --snowflake-stage @<database>.<schema>.<stage> \
      /path/to/app.py
    
    Copy

    アプリケーションを実行するにはコンピューティングプールが必要であり、connections.toml`ファイルまたはコマンドラインで:code:--compute-pool`オプションを使用して指定する必要があることに注意してください。詳細については、 Snowpark Submit リファレンス をご参照ください。

待機とログによるモニター

次の例では、ジョブを送信し、完了を待ち、ログを取得する方法を示します。

  1. 以下のコマンドを使用してジョブを送信し、完了を待ちます。

    snowpark-submit \
      --snowflake-workload-name MY_JOB \
      --wait-for-completion \
      --snowflake-connection-name MY_CONNECTION \
      /path/to/app.py
    
    Copy
  2. ジョブが失敗した場合は、以下のコマンドを使用して詳細ログを確認します。

    snowpark-submit
      --snowflake-workload-name MY_JOB \
      --workload-status \
      --display-logs \
      --snowflake-connection-name MY_CONNECTION
    
    Copy

Apache Airflow DAG での Snowpark Submit を使用します。

Sparkジョブを Snowpark Connect for Spark 経由でSnowflakeに送信できます。クラスターモードの Snowpark送信 コマンドを使用して、ジョブを実行するためコンピューティングプールを活用できます。

この方法で Apache Airflow を使用する場合は、Apache Airflow を実行する Docker サービスまたはSnowpark Container Servicesコンテナが、Snowflakeステージで必要なファイルと Snowflake に適切にアクセスできることを確認してください。

次の例のコードは、以下のタスクを実行します。

  • /tmp/myenv でPythonの仮想環境を作成します。

    create_venv タスク内で、 .whl ファイルを使用して snowpark-submit パッケージをインストールするために、コードは pip を使用します。

  • Snowflake接続認証情報と OAuth トークンを含むセキュア connections.toml ファイルを生成します。

    create_connections_toml タスク内で、コードは /app/.snowflake ディレクトリ を作成し、 .toml ファイルを作成します。それから所有者(ユーザー)のみが読み取りと書き込みアクセスできるようにファイル権限を変更します。

  • Snowpark送信 コマンドを使用してSparkジョブを実行します。

    run_snowpark_script タスク内で、コードは以下のことを実行します。

    • 仮想環境をアクティブにします。

    • Snowpark送信 コマンドを使用してSparkジョブを実行します。

    • クラスターモードを使用してSnowflakeにデプロイします。

    • Snowpark Connect for Spark リモート URI sc://localhost:15002 を使用します。

    • Sparkアプリケーションクラス org.example.SnowparkConnectApp を指定します。

    • @snowflake_stage ステージからスクリプトをプルします。

    • --wait-for-completion を使用して、ジョブが終了するまで展開をブロックします。

import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
from airflow.operators.trigger_dagrun import TriggerDagRunOperator


default_args = {
  'start_date': airflow.utils.dates.days_ago(1),
  'retries': 0,
}


with DAG(
  'run_sparkconnect_python_script',
  default_args=default_args,
  schedule_interval=None,
  catchup=False,
) as dag:

  create_venv = BashOperator(
      task_id='create_venv',
      bash_command="""
      python3 -m venv /tmp/myenv &&
      source /tmp/myenv/bin/activate &&
      export PIP_USER=false &&
      pip install --upgrade pip &&
      pip install --no-cache-dir grpcio-tools>=1.48.1 &&
      pip install /app/snowpark_submit-<version>.whl
      """
  )

  create_connections_toml = BashOperator(
      task_id='create_connections_toml',
      bash_command="""
      mkdir -p /app/.snowflake
      echo "${SNOWFLAKE_USER}"
      cat <<EOF > /app/.snowflake/connections.toml

[snowpark-submit]
host = "${SNOWFLAKE_HOST}"
port = "${SNOWFLAKE_PORT}"
protocol = "https"
account = "${SNOWFLAKE_ACCOUNT}"
authenticator = "oauth"
token = "$(cat /snowflake/session/token)"
warehouse = "airflow_wh"
database = "${SNOWFLAKE_DATABASE}"
schema = "${SNOWFLAKE_SCHEMA}"
client_session_keep_alive = true
EOF
  chmod 600 /app/.snowflake/connections.toml
  """
  )

  run_script = BashOperator(
      task_id='run_snowpark_script',
      bash_command="""
      set -e
      echo "Using SNOWFLAKE_HOME: $SNOWFLAKE_HOME"

      echo "Running Python script with Snowpark..."
      source /tmp/myenv/bin/activate &&
      snowpark-submit --deploy-mode cluster --class org.example.SnowparkConnectApp --compute-pool="snowparksubmit" --snowflake-workload-name="spcstest" --snowflake-stage="@AIRFLOW_APP_FILES" --wait-for-completion "@AIRFLOW_APP_FILES/transformation.py" --snowflake-connection-name snowpark-submit
      """,
      env={
          'SNOWFLAKE_HOME': '/app/.snowflake'
      }
  )

create_venv >> create_connections_toml >> run_script
Copy

Apache Airflowユーザーインターフェースのグラフビューまたはツリービューを使用して、 DAG をモニターできます。次の項目のタスクログを確認します。

  • 環境設定

  • Snowpark Connect for Spark のステータス

  • コマンド:Snowpark送信 ジョブ出力

Snowflakeステージに保存されたログまたはイベントテーブルから、Snowflakeで実行されたジョブをモニターすることもできます。