Snowpark Submit 例¶
このトピックには、実稼働対応 Sparkアプリケーションを送信するために Snowpark Submit を使用する例が含まれています。
シンプルなSparkアプリケーションの作成と送信¶
次の例は、依存関係のないシンプルなSparkアプリケーションを作成して送信する方法を示しています。
ローカルIDEで、以下の内容の``app.py``という新しいPythonファイルを作成します。
from pyspark.sql import SparkSession from pyspark.sql.functions import col, lit, upper, concat # Create Spark session spark = SparkSession.builder.appName("SimpleSession").getOrCreate() # Create a DataFrame from inline data data = [ (1, "alice", "engineering", 95000), (2, "bob", "marketing", 72000), (3, "carol", "engineering", 105000), (4, "david", "sales", 68000), (5, "eva", "engineering", 88000), ] df = spark.createDataFrame(data, ["id", "name", "department", "salary"]) # Add a new column df_with_bonus = df.withColumn("bonus", col("salary") * 0.1) df_with_bonus.show() # Filter and transform engineers = df.filter(col("department") == "engineering") \ .withColumn("name_upper", upper(col("name"))) \ .withColumn("greeting", concat(lit("Hello, "), col("name"))) engineers.show() # Aggregate df.groupBy("department").avg("salary").show() # Stop the Spark session spark.stop()
アプリケーションを送信するには、以下のコマンドを使用します。
snowpark-submit \ --snowflake-workload-name MY_JOB \ --snowflake-connection-name MY_CONNECTION \ /path/to/app.py
ジョブの完了を待つ:code:
--wait-for-completion`オプション、ジョブのステータスを確認する:code:--workload-status`オプション、ジョブのログを表示する:code:--display-logs`オプションを使用することができます。オプションの包括的なリストについては、:doc:/developer-guide/snowpark-connect/snowpark-submit-reference`をご参照ください。
Snowflakeステージからアプリケーションをデプロイします。¶
アプリケーションに、読み取りが必要なファイルなどの依存関係がある場合は、Snowflakeステージからそれらをデプロイすることができます。次の例は、Snowflakeステージからアプリケーションとその依存関係をデプロイする方法を示しています。
ターミナルからステージにファイルをアップロードするには、Snowflake CLIを使用できます。SnowSQLはレガシーCLIであり、すでに使用している場合は、ステージにファイルをアップロードするためにも使用できます。Snowflake CLIをまだインストールしていない場合は、:doc:`/developer-guide/snowflake-cli/installation/installation`の指示に従ってインストールできます。
ローカルIDEで、以下の内容の``sample_employees.csv``という名前の新しいCSVを作成します。
employee_id,name,department,salary,years_employed 1,Alice Johnson,Engineering,95000,5 2,Bob Smith,Marketing,72000,3 3,Carol Williams,Engineering,105000,8 4,David Brown,Sales,68000,2 5,Eva Martinez,Engineering,88000,4 6,Frank Wilson,Marketing,75000,6 7,Grace Lee,Sales,92000,7 8,Henry Taylor,Engineering,110000,10 9,Ivy Chen,Marketing,65000,1 10,Jack Davis,Sales,78000,4 11,Karen White,Engineering,98000,6 12,Leo Harris,Marketing,71000,3 13,Maria Garcia,Sales,85000,5 14,Nathan Clark,Engineering,102000,9 15,Olivia Moore,Marketing,69000,2
次のコマンドを使用して、依存関係ファイルをステージにアップロードします。
my_stage`は、アカウント内のステージの名前です。(ステージが作成されていない場合は、[`snow stage create](/developer-guide/snowflake-cli/command-reference/stage-commands/create)を使用できます。)snow stage copy sample_employees.csv @<database>.<schema>.<stage>/sample_employees.csv -c MY_CONNECTION
ファイルが正常にアップロードされたことを確認するには、次のコマンドを使用してステージ内のファイルを一覧表示します。
snow sql -c MY_CONNECTION -q "ls @<database>.<schema>.<stage>"
リスト内に``sample_employees.csv``ファイルが表示されるはずです。
ローカルIDEで、以下の内容の``app.py``という新しいPythonファイルを作成します。
from pyspark.sql import SparkSession # Create Spark session spark = SparkSession.builder.appName("SimpleStageExample").getOrCreate() # Load data from stage (adjust stage name to match yours) df = spark.read.csv("/app/<YOUR_STAGE>/sample_employees.csv", header=True, inferSchema=True) df.show() # Filter: Engineering department only engineers = df.filter(df["department"] == "Engineering") engineers.show() # Filter: Salary > 80000 and years_employed > 3 senior_high_earners = df.filter((df["salary"] > 80000) & (df["years_employed"] > 3)) senior_high_earners.show() # Aggregate: Average salary by department df.groupBy("department").avg("salary").show() # Select specific columns result = senior_high_earners.select("name", "department", "salary") result.show() # Stop the Spark session spark.stop()
ステージにアップロードしたファイルを使用するアプリケーションを送信するには、以下のコマンドを使用します。
snowpark-submit \ --snowflake-connection-name MY_CONNECTION \ --snowflake-workload-name MY_JOB \ --snowflake-stage @<database>.<schema>.<stage> \ /path/to/app.py
アプリケーションを実行するにはコンピューティングプールが必要であり、
connections.toml`ファイルまたはコマンドラインで:code:--compute-pool`オプションを使用して指定する必要があることに注意してください。詳細については、 Snowpark Submit リファレンス をご参照ください。
待機とログによるモニター¶
次の例では、ジョブを送信し、完了を待ち、ログを取得する方法を示します。
以下のコマンドを使用してジョブを送信し、完了を待ちます。
snowpark-submit \ --snowflake-workload-name MY_JOB \ --wait-for-completion \ --snowflake-connection-name MY_CONNECTION \ /path/to/app.py
ジョブが失敗した場合は、以下のコマンドを使用して詳細ログを確認します。
snowpark-submit --snowflake-workload-name MY_JOB \ --workload-status \ --display-logs \ --snowflake-connection-name MY_CONNECTION
Apache Airflow DAG での Snowpark Submit を使用します。¶
Sparkジョブを Snowpark Connect for Spark 経由でSnowflakeに送信できます。クラスターモードの Snowpark送信 コマンドを使用して、ジョブを実行するためコンピューティングプールを活用できます。
この方法で Apache Airflow を使用する場合は、Apache Airflow を実行する Docker サービスまたはSnowpark Container Servicesコンテナが、Snowflakeステージで必要なファイルと Snowflake に適切にアクセスできることを確認してください。
次の例のコードは、以下のタスクを実行します。
/tmp/myenvでPythonの仮想環境を作成します。create_venvタスク内で、.whlファイルを使用してsnowpark-submitパッケージをインストールするために、コードはpipを使用します。Snowflake接続認証情報と OAuth トークンを含むセキュア
connections.tomlファイルを生成します。create_connections_tomlタスク内で、コードは/app/.snowflakeディレクトリ を作成し、.tomlファイルを作成します。それから所有者(ユーザー)のみが読み取りと書き込みアクセスできるようにファイル権限を変更します。Snowpark送信コマンドを使用してSparkジョブを実行します。run_snowpark_scriptタスク内で、コードは以下のことを実行します。仮想環境をアクティブにします。
Snowpark送信コマンドを使用してSparkジョブを実行します。クラスターモードを使用してSnowflakeにデプロイします。
Snowpark Connect for Spark リモート URI sc://localhost:15002 を使用します。
Sparkアプリケーションクラス
org.example.SnowparkConnectAppを指定します。@snowflake_stage ステージからスクリプトをプルします。
--wait-for-completionを使用して、ジョブが終了するまで展開をブロックします。
import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
default_args = {
'start_date': airflow.utils.dates.days_ago(1),
'retries': 0,
}
with DAG(
'run_sparkconnect_python_script',
default_args=default_args,
schedule_interval=None,
catchup=False,
) as dag:
create_venv = BashOperator(
task_id='create_venv',
bash_command="""
python3 -m venv /tmp/myenv &&
source /tmp/myenv/bin/activate &&
export PIP_USER=false &&
pip install --upgrade pip &&
pip install --no-cache-dir grpcio-tools>=1.48.1 &&
pip install /app/snowpark_submit-<version>.whl
"""
)
create_connections_toml = BashOperator(
task_id='create_connections_toml',
bash_command="""
mkdir -p /app/.snowflake
echo "${SNOWFLAKE_USER}"
cat <<EOF > /app/.snowflake/connections.toml
[snowpark-submit]
host = "${SNOWFLAKE_HOST}"
port = "${SNOWFLAKE_PORT}"
protocol = "https"
account = "${SNOWFLAKE_ACCOUNT}"
authenticator = "oauth"
token = "$(cat /snowflake/session/token)"
warehouse = "airflow_wh"
database = "${SNOWFLAKE_DATABASE}"
schema = "${SNOWFLAKE_SCHEMA}"
client_session_keep_alive = true
EOF
chmod 600 /app/.snowflake/connections.toml
"""
)
run_script = BashOperator(
task_id='run_snowpark_script',
bash_command="""
set -e
echo "Using SNOWFLAKE_HOME: $SNOWFLAKE_HOME"
echo "Running Python script with Snowpark..."
source /tmp/myenv/bin/activate &&
snowpark-submit --deploy-mode cluster --class org.example.SnowparkConnectApp --compute-pool="snowparksubmit" --snowflake-workload-name="spcstest" --snowflake-stage="@AIRFLOW_APP_FILES" --wait-for-completion "@AIRFLOW_APP_FILES/transformation.py" --snowflake-connection-name snowpark-submit
""",
env={
'SNOWFLAKE_HOME': '/app/.snowflake'
}
)
create_venv >> create_connections_toml >> run_script
Apache Airflowユーザーインターフェースのグラフビューまたはツリービューを使用して、 DAG をモニターできます。次の項目のタスクログを確認します。
環境設定
Snowpark Connect for Spark のステータス
コマンド:
Snowpark送信ジョブ出力
Snowflakeステージに保存されたログまたはイベントテーブルから、Snowflakeで実行されたジョブをモニターすることもできます。