Snowpark Submit 例¶
このトピックには、実稼働対応 Sparkアプリケーションを送信するために Snowpark Submit を使用する例が含まれています。
シンプルなSparkアプリケーションの作成と送信¶
次の例は、依存関係のないシンプルなSparkアプリケーションを作成して送信する方法を示しています。
ローカルIDEで、以下の内容の``app.py``という新しいPythonファイルを作成します。
アプリケーションを送信するには、以下のコマンドを使用します。
ジョブの完了を待つ:code:
--wait-for-completion`オプション、ジョブのステータスを確認する:code:--workload-status`オプション、ジョブのログを表示する:code:--display-logs`オプションを使用することができます。オプションの包括的なリストについては、:doc:/developer-guide/snowpark-connect/snowpark-submit-reference`をご参照ください。
Snowflakeステージからアプリケーションをデプロイします。¶
アプリケーションに、読み取りが必要なファイルなどの依存関係がある場合は、Snowflakeステージからそれらをデプロイすることができます。次の例は、Snowflakeステージからアプリケーションとその依存関係をデプロイする方法を示しています。
ターミナルからステージにファイルをアップロードするには、Snowflake CLIを使用できます。SnowSQLはレガシーCLIであり、すでに使用している場合は、ステージにファイルをアップロードするためにも使用できます。Snowflake CLIをまだインストールしていない場合は、:doc:`/developer-guide/snowflake-cli/installation/installation`の指示に従ってインストールできます。
ローカルIDEで、以下の内容の``sample_employees.csv``という名前の新しいCSVを作成します。
次のコマンドを使用して、依存関係ファイルをステージにアップロードします。
my_stage`は、アカウント内のステージの名前です。(ステージが作成されていない場合は、[`snow stage create](/developer-guide/snowflake-cli/command-reference/stage-commands/create)を使用できます。)ファイルが正常にアップロードされたことを確認するには、次のコマンドを使用してステージ内のファイルを一覧表示します。
リスト内に``sample_employees.csv``ファイルが表示されるはずです。
ローカルIDEで、以下の内容の``app.py``という新しいPythonファイルを作成します。
ステージにアップロードしたファイルを使用するアプリケーションを送信するには、以下のコマンドを使用します。
アプリケーションを実行するにはコンピューティングプールが必要であり、
connections.toml`ファイルまたはコマンドラインで:code:--compute-pool`オプションを使用して指定する必要があることに注意してください。詳細については、 Snowpark Submit リファレンス をご参照ください。
待機とログによるモニター¶
次の例では、ジョブを送信し、完了を待ち、ログを取得する方法を示します。
以下のコマンドを使用してジョブを送信し、完了を待ちます。
ジョブが失敗した場合は、以下のコマンドを使用して詳細ログを確認します。
Apache Airflow DAG での Snowpark Submit を使用します。¶
Sparkジョブを Snowpark Connect for Spark 経由でSnowflakeに送信できます。クラスターモードの Snowpark送信 コマンドを使用して、ジョブを実行するためコンピューティングプールを活用できます。
この方法で Apache Airflow を使用する場合は、Apache Airflow を実行する Docker サービスまたはSnowpark Container Servicesコンテナが、Snowflakeステージで必要なファイルと Snowflake に適切にアクセスできることを確認してください。
次の例のコードは、以下のタスクを実行します。
/tmp/myenvでPythonの仮想環境を作成します。create_venvタスク内で、.whlファイルを使用してsnowpark-submitパッケージをインストールするために、コードはpipを使用します。Snowflake接続認証情報と OAuth トークンを含むセキュア
connections.tomlファイルを生成します。create_connections_tomlタスク内で、コードは/app/.snowflakeディレクトリ を作成し、.tomlファイルを作成します。それから所有者(ユーザー)のみが読み取りと書き込みアクセスできるようにファイル権限を変更します。Snowpark送信コマンドを使用してSparkジョブを実行します。run_snowpark_scriptタスク内で、コードは以下のことを実行します。仮想環境をアクティブにします。
Snowpark送信コマンドを使用してSparkジョブを実行します。クラスターモードを使用してSnowflakeにデプロイします。
Snowpark Connect for Spark リモート URI sc://localhost:15002 を使用します。
Sparkアプリケーションクラス
org.example.SnowparkConnectAppを指定します。@snowflake_stage ステージからスクリプトをプルします。
--wait-for-completionを使用して、ジョブが終了するまで展開をブロックします。
Apache Airflowユーザーインターフェースのグラフビューまたはツリービューを使用して、 DAG をモニターできます。次の項目のタスクログを確認します。
環境設定
Snowpark Connect for Spark のステータス
コマンド:
Snowpark送信ジョブ出力
Snowflakeステージに保存されたログまたはイベントテーブルから、Snowflakeで実行されたジョブをモニターすることもできます。