チュートリアル2: Snowpark Container Servicesジョブを作成する¶
重要
Snowpark Container Servicesのジョブ機能は現在プライベートプレビュー中であり、 https://snowflake.com/legal のプレビュー規約に従うものとします。詳細については、Snowflakeの担当者にお問い合わせください。
紹介¶
チュートリアル共通セットアップ を完了すると、ジョブを作成する準備が整います。このチュートリアルでは、Snowflakeに接続し、 SQL SELECT クエリを実行し、結果をテーブルに保存する単純なジョブを作成します。
このチュートリアルには2つのパートがあります。
パート1: ジョブを作成してテストする。 このチュートリアルで提供されるコードをダウンロードし、ステップバイステップの手順に従います。
このチュートリアルのジョブコードをダウンロードします。
Snowpark Container Services用のDockerイメージをビルドし、アカウントのリポジトリにイメージをアップロードします。
Snowflakeにコンテナー構成情報を与えるジョブ仕様ファイルをステージします。コンテナーの起動に使用するイメージの名前に加えて、仕様ファイルは以下を提供します。
3つの引数: SELECT クエリ、クエリを実行する仮想ウェアハウス、および結果を保存するテーブル名。
SELECT ステートメントを実行するウェアハウス。
ジョブを実行します。EXECUTE SERVICE コマンドを使用して、仕様ファイルとSnowflakeがコンテナーを実行できるコンピューティングプールを指定すると、ジョブを実行できます。そして最後に、ジョブの結果を確認します。
パート2: ジョブコードを理解する。このセクションでは、ジョブコードの概要を説明し、さまざまなコンポーネントがどのように連携しているかを明らかにします。
1: サービスコードをダウンロードする¶
ジョブを作成するためのコード(Pythonアプリケーション)が提供されます。
zipファイル をディレクトリにダウンロードします。
ファイルを解凍します。チュートリアルごとに1つのディレクトリが含まれています。
Tutorial-2
ディレクトリには以下のファイルがあります。main.py
Dockerfile
my_job_spec.yaml
2: イメージをビルドしてアップロードする¶
Snowpark Container Servicesがサポートするlinux/amd64プラットフォーム用のイメージをビルドし、アカウントのイメージリポジトリにイメージをアップロードします(共通セットアップ を参照)。
イメージをビルドしてアップロードする前に、リポジトリに関する情報(リポジトリ URL とレジストリのホスト名)が必要です。詳細については、 レジストリおよびリポジトリ をご参照ください。
リポジトリに関する情報を取得する
リポジトリ URL を取得するには、 SHOW IMAGE REPOSITORIES SQL コマンドを実行します。
SHOW IMAGE REPOSITORIES;
出力の
repository_url
列は、 URL を提供します。以下に例を示します。<orgname>-<acctname>.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository
リポジトリ URL のホスト名はレジストリのホスト名です。以下に例を示します。
<orgname>-<acctname>.registry.snowflakecomputing.com
イメージをビルドし、リポジトリにアップロードする
ターミナルウィンドウを開き、解凍したファイルのあるディレクトリに移動します。
Dockerイメージをビルドするには、Docker CLI を使用して以下の
docker build
コマンドを実行します。このコマンドは、イメージのビルドに使用するファイルのPATH
として、現在の作業ディレクトリ(.)を指定していることに注意してください。docker build --rm --platform linux/amd64 -t <repository_url>/<image_name> .
image_name
には、my_job_image:latest
を使用します。
例
docker build --rm --platform linux/amd64 -t myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest .
Snowflakeアカウントのリポジトリにイメージをアップロードします。Dockerがあなたの代わりにリポジトリにイメージをアップロードするには、まずSnowflakeでDockerを認証する必要があります。
SnowflakeレジストリでDockerを認証するには、以下のコマンドを実行します。
docker login <registry_hostname> -u <username>
username
には、Snowflakeのユーザー名を指定します。Dockerは、パスワードの入力を求めるプロンプトを表示します。
イメージをアップロードするには、以下のコマンドを実行します。
docker push <repository_url>/<image_name>
例
docker push myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest
3: 仕様ファイルをステージする¶
ジョブ仕様ファイル(my_job_spec.yaml)をステージにアップロードするには、以下のオプションのいずれかを使用します。
Snowsightウェブインターフェイス: 手順については、 ローカルファイルに対する内部ステージの選択 をご参照ください。
SnowSQL CLI: 次の PUT コマンドを実行します。
PUT file://<file-path>[/\]my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
例:
Linuxまたは macOS
PUT file:///tmp/my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
Windows
PUT file://C:\temp\my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
相対パスを指定することもできます。
PUT file://./my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
このコマンドは OVERWRITE=TRUE を設定するため、必要な場合(例: 仕様ファイルのエラーを修正した場合)は、ファイルを再度アップロードすることができます。PUT コマンドが正常に実行されると、アップロードされたファイルに関する情報がプリントアウトされます。
4: ジョブを実行する¶
これでジョブを作成する準備が整いました。
ジョブを開始するには、 EXECUTE SERVICE コマンドを実行します。
EXECUTE SERVICE IN COMPUTE POOL tutorial_compute_pool FROM @tutorial_stage SPEC='my_job_spec.yaml';
次の点に注意してください。
FROM と SPEC は、ステージ名とジョブ仕様ファイル名を提供します。ジョブが実行されると、 SQL ステートメントが実行され、
my_job_spec.yaml
で指定されたテーブルに結果が保存されます。ジョブの SQL ステートメントは、Dockerコンテナー内では実行されません。代わりに、実行中のコンテナーはSnowflakeに接続し、Snowflakeウェアハウスで SQL ステートメントを実行します。
COMPUTE_POOL は、Snowflakeがジョブを実行するコンピューティングリソースを提供します。
EXECUTE SERVICE は、次の出力例に示すように、ジョブのSnowflake割り当て UUID を含む出力を返します。
+------------------------------------------------------------------------------------+ | status | -------------------------------------------------------------------------------------+ | Job 01af7ee6-0001-cb52-0020-c5870077223a completed successfully with status: DONE. | +------------------------------------------------------------------------------------+
実行したクエリの ID を取得します(EXECUTE SERVICE はクエリ)。
SET jobid = LAST_QUERY_ID();
以下のステップで ID を使用して、ジョブステータスとジョブログ情報を取得します。
注釈
EXECUTE SERVICE を呼び出した直後に LAST_QUERY_ID を呼び出して、コマンドの返すジョブ ID が EXECUTE SERVICE コマンド用であることを確認することが重要です。
LAST_QUERY_ID ジョブのクエリ ID はジョブが完了した後にのみ返されます。進行中の長時間実行ジョブは、リアルタイムのステータス情報を取得するには適していません。代わりに、テーブル関数の QUERY HISTORY ファミリーを使用して、ジョブのクエリ ID を取得するようにします。詳細については、 ジョブの操作 をご参照ください。
このジョブは単純なクエリを実行し、結果を結果テーブルに保存します。ジョブが正常に完了したことは、結果テーブルをクエリすると確認できます。
SELECT * FROM results;
サンプル出力:
+----------+-----------+ | TIME | TEXT | |----------+-----------| | 10:56:52 | hello | +----------+-----------+
ジョブの実行をデバッグする場合は、システム関数を使用します。たとえば、 SYSTEM$GET_JOB_STATUS を使用して、ジョブがまだ実行中なのか、起動に失敗したのか、起動に失敗した場合はなぜ失敗したのかを判断します。また、コードが有用なログを標準出力または標準エラーに出力すると想定し、 SYSTEM$GET_JOB_LOGS を使用してログにアクセスできます。
ジョブステータスを取得するには、システム関数 SYSTEM$GET_JOB_STATUS を呼び出します。
SELECT SYSTEM$GET_JOB_STATUS($jobid);
サンプル出力:
[ { "status":"DONE", "message":"Completed successfully", "containerName":"main", "instanceId":"0", "serviceName":"01af7ee6-0001-cb52-0020-c5870077223a", "image":"orgname-acctname.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest", "restartCount":0, "startTime":"" } ]
出力では、ジョブに名前がないため、
serviceName
はジョブのSnowflake割り当て UUID (クエリ ID)です。ジョブログ情報を取得するには、システム関数 SYSTEM$GET_JOB_LOGS を使用します。
SELECT SYSTEM$GET_JOB_LOGS($jobid, 'main')
job-tutorial - INFO - Job started job-tutorial - INFO - Connection succeeded. Current session context: database="TUTORIAL_DB", schema="DATA_SCHEMA", warehouse="TUTORIAL_WAREHOUSE", role="TEST_ROLE" job-tutorial - INFO - Executing query [select current_time() as time,'hello'] and writing result to table [results] job-tutorial - INFO - Job finished
5: クリーンアップする¶
チュートリアル3 に進む予定がない場合は、作成した請求対象リソースを削除する必要があります。詳細については、 チュートリアル3 のステップ5をご参照ください。
6: ジョブコードを確認する¶
このセクションでは、以下のトピックを取り上げます。
提供されたファイルの調査: ジョブを実装するさまざまなコードファイルを確認します。
ローカルでのイメージの構築とテスト。このセクションでは、Snowflakeアカウントのリポジトリにアップロードする前に、ローカルでDockerイメージをテストする方法について説明します。
提供されたファイルの調査¶
チュートリアルの最初にダウンロードしたzipファイルには、以下のファイルが含まれています。
main.py
Dockerfile
my_job_spec.yaml
このセクションでは、コードがジョブを実装する方法の概要を説明します。
main.pyファイル¶
#!/opt/conda/bin/python3
import argparse
import logging
import os
import sys
from snowflake.snowpark import Session
from snowflake.snowpark.exceptions import *
# Environment variables below will be automatically populated by Snowflake.
SNOWFLAKE_ACCOUNT = os.getenv("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_HOST = os.getenv("SNOWFLAKE_HOST")
SNOWFLAKE_DATABASE = os.getenv("SNOWFLAKE_DATABASE")
SNOWFLAKE_SCHEMA = os.getenv("SNOWFLAKE_SCHEMA")
# Custom environment variables
SNOWFLAKE_USER = os.getenv("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = os.getenv("SNOWFLAKE_PASSWORD")
SNOWFLAKE_ROLE = os.getenv("SNOWFLAKE_ROLE")
SNOWFLAKE_WAREHOUSE = os.getenv("SNOWFLAKE_WAREHOUSE")
def get_arg_parser():
"""
Input argument list.
"""
parser = argparse.ArgumentParser()
parser.add_argument("--query", required=True, help="query text to execute")
parser.add_argument(
"--result_table",
required=True,
help="name of the table to store result of query specified by flag --query")
return parser
def get_logger():
"""
Get a logger for local logging.
"""
logger = logging.getLogger("job-tutorial")
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(name)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def get_login_token():
"""
Read the login token supplied automatically by Snowflake. These tokens
are short lived and should always be read right before creating any new connection.
"""
with open("/snowflake/session/token", "r") as f:
return f.read()
def get_connection_params():
"""
Construct Snowflake connection params from environment variables.
"""
if os.path.exists("/snowflake/session/token"):
return {
"account": SNOWFLAKE_ACCOUNT,
"host": SNOWFLAKE_HOST,
"authenticator": "oauth",
"token": get_login_token(),
"warehouse": SNOWFLAKE_WAREHOUSE,
"database": SNOWFLAKE_DATABASE,
"schema": SNOWFLAKE_SCHEMA
}
else:
return {
"account": SNOWFLAKE_ACCOUNT,
"host": SNOWFLAKE_HOST,
"user": SNOWFLAKE_USER,
"password": SNOWFLAKE_PASSWORD,
"role": SNOWFLAKE_ROLE,
"warehouse": SNOWFLAKE_WAREHOUSE,
"database": SNOWFLAKE_DATABASE,
"schema": SNOWFLAKE_SCHEMA
}
def run_job():
"""
Main body of this job.
"""
logger = get_logger()
logger.info("Job started")
# Parse input arguments
args = get_arg_parser().parse_args()
query = args.query
result_table = args.result_table
# Start a Snowflake session, run the query and write results to specified table
with Session.builder.configs(get_connection_params()).create() as session:
# Print out current session context information.
database = session.get_current_database()
schema = session.get_current_schema()
warehouse = session.get_current_warehouse()
role = session.get_current_role()
logger.info(
f"Connection succeeded. Current session context: database={database}, schema={schema}, warehouse={warehouse}, role={role}"
)
# Execute query and persist results in a table.
logger.info(
f"Executing query [{query}] and writing result to table [{result_table}]"
)
res = session.sql(query)
# If the table already exists, the query result must match the table scheme.
# If the table does not exist, this will create a new table.
res.write.mode("append").save_as_table(result_table)
logger.info("Job finished")
if __name__ == "__main__":
run_job()
コードで、
Pythonコードが
main
で実行され、次にrun_job()
関数が実行されます。if __name__ == "__main__": run_job()
run_job()
関数は環境変数を読み取り、それを使用してさまざまなパラメーターのデフォルト値を設定します。コンテナーは、これらのパラメーターを使用してSnowflakeに接続します。次に注意してください。これらのデフォルトのパラメーター値は、上書きすることができます。詳細については、 サービス仕様リファレンス をご参照ください。
イメージがSnowflakeで実行されると、Snowflakeはこれらのパラメーターの一部(ソースコードを参照)を自動的に入力します。しかし、イメージをローカルでテストする場合は、これらのパラメーターを明示的に指定する必要があります(次のセクション ローカルでのイメージの構築とテスト に示すように)。
Dockerfile¶
このファイルには、Dockerを使用してイメージを構築するためのすべてのコマンドが含まれています。
ARG BASE_IMAGE=continuumio/miniconda3:4.12.0
FROM $BASE_IMAGE
RUN conda install python=3.8 && \
conda install snowflake-snowpark-python
COPY main.py ./
ENTRYPOINT ["python3", "main.py"]
my_job_spec.yamlファイル(ジョブ仕様)¶
Snowflakeは、この仕様で提供された情報を使用して、ジョブを構成および実行します。
spec:
container:
- name: main
image: /tutorial_db/data_schema/tutorial_repository/my_job_image:latest
env:
SNOWFLAKE_WAREHOUSE: tutorial_warehouse
args:
- "--query=select current_time() as time,'hello'"
- "--result_table=results"
container.name
と container.image
の必須フィールド(サービス仕様リファレンス を参照)に加えて、この仕様には引数をリストするためのオプションの container.args
フィールドがあります。
--query
ジョブの実行時に実行するクエリを提供します。--result_table
は、クエリ結果を保存するテーブルを識別します。
ローカルでのイメージの構築とテスト¶
Snowflakeアカウントのリポジトリにアップロードする前に、ローカルでDockerイメージをテストすることができます。ローカルテストでは、コンテナーはスタンドアロンで実行されます(Snowflakeが実行するジョブではありません)。
以下のステップを使用して、チュートリアル2Dockerイメージをテストします。
Dockerイメージを作成するには、Docker CLIで
docker build
コマンドを実行します。docker build --rm -t my_service:local .
コードを起動するには、
docker run
コマンドを実行し、<組織名>-<アカウント名>
、<ユーザー名>
、<パスワード>
を提供します。docker run --rm \ -e SNOWFLAKE_ACCOUNT=<orgname>-<acctname> \ -e SNOWFLAKE_HOST=<orgname>-<acctname>.snowflakecomputing.com \ -e SNOWFLAKE_DATABASE=tutorial_db \ -e SNOWFLAKE_SCHEMA=data_schema \ -e SNOWFLAKE_ROLE=test_role \ -e SNOWFLAKE_USER=<username> \ -e SNOWFLAKE_PASSWORD=<password> \ -e SNOWFLAKE_WAREHOUSE=tutorial_warehouse \ my_job:local \ --query="select current_time() as time,'hello'" \ --result_table=tutorial_db.data_schema.results
ローカルでイメージをテストする場合は、3つの引数(クエリ、クエリを実行するウェアハウス、結果を保存するテーブル)に加えて、ローカルで実行するコンテナーがSnowflakeに接続するための接続パラメーターも提供することに注意してください。
コンテナーをジョブとして実行すると、Snowflake はこれらのパラメーターを環境変数としてコンテナーに提供します。詳細については、 Snowflakeクライアントを構成する をご参照ください。
ジョブはクエリ(
select current_time() as time,'hello'
)を実行し、結果をテーブル(tutorial_db.data_schema.results
)に書き込みます。テーブルが存在しない場合は作成されます。テーブルが存在する場合、ジョブは行を追加します。結果テーブルのクエリ結果の例:
+----------+----------+ | TIME | TEXT | |----------+----------| | 10:56:52 | hello | +----------+----------+
次の内容¶
これで チュートリアル3 をテストし、サービス間の通信方法を表示することができます。