チュートリアル2: Snowpark Container Servicesジョブサービスを作成する¶
概要¶
チュートリアル共通セットアップ を完了すると、ジョブサービスを作成する準備が整います。このチュートリアルでは、Snowflakeに接続し、 SQL SELECT クエリを実行し、結果をテーブルに保存する単純なジョブサービスを作成します。
このチュートリアルには2つのパートがあります。
パート1: ジョブサービスを作成してテストする。 このチュートリアルで提供されるコードをダウンロードし、ステップバイステップの手順に従います。
このチュートリアルのジョブサービスコードをダウンロードします。
Snowpark Container Services用のDockerイメージをビルドし、アカウントのリポジトリにイメージをアップロードします。
Snowflakeにコンテナ構成情報を提供するサービス仕様ファイルをステージします。仕様ファイルには、コンテナの起動に使用するイメージ名に加えて、以下の3つの引数を指定します: SELECT クエリ、クエリを実行する仮想ウェアハウス、および結果を保存するテーブル名。
ジョブサービスを実行します。EXECUTE JOBSERVICE コマンドを使用して、仕様ファイルとSnowflakeがコンテナを実行できるコンピューティングプールを指定すると、ジョブサービスを実行できます。そして最後に、サービスの結果を確認します。
パート2: ジョブサービスコードを理解する。このセクションでは、ジョブサービスコードの概要を説明し、さまざまなコンポーネントがどのように連携しているかを明らかにします。
1: ジョブサービスコードのダウンロード¶
ジョブサービスを作成するためのコード(Pythonアプリケーション)が提供されます。
ダウンロード:
SnowparkContainerServices -Tutorials.zip </samples/spcs/SnowparkContainerServices-Tutorials.zip>
をダウンロードします。ファイルを解凍します。チュートリアルごとに1つのディレクトリが含まれています。
Tutorial-2
ディレクトリには以下のファイルがあります。main.py
Dockerfile
my_job_spec.yaml
2: イメージをビルドしてアップロードする¶
Snowpark Container Servicesがサポートするlinux/amd64プラットフォーム用のイメージをビルドし、アカウントのイメージリポジトリにイメージをアップロードします(共通セットアップ を参照)。
イメージをビルドしてアップロードする前に、リポジトリに関する情報(リポジトリ URL とレジストリのホスト名)が必要です。詳細については、 レジストリおよびリポジトリ をご参照ください。
リポジトリに関する情報を取得する
リポジトリ URL を取得するには、 SHOW IMAGE REPOSITORIES SQL コマンドを実行します。
SHOW IMAGE REPOSITORIES;
出力の
repository_url
列は、 URL を提供します。以下に例を示します。<orgname>-<acctname>.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository
リポジトリ URL のホスト名はレジストリのホスト名です。以下に例を示します。
<orgname>-<acctname>.registry.snowflakecomputing.com
イメージをビルドし、リポジトリにアップロードする
ターミナルウィンドウを開き、解凍したファイルのあるディレクトリに移動します。
Dockerイメージをビルドするには、Docker CLI を使用して以下の
docker build
コマンドを実行します。このコマンドは、イメージのビルドに使用するファイルのPATH
として、現在の作業ディレクトリ(.)を指定していることに注意してください。docker build --rm --platform linux/amd64 -t <repository_url>/<image_name> .
image_name
には、my_job_image:latest
を使用します。
例
docker build --rm --platform linux/amd64 -t myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest .
Snowflakeアカウントのリポジトリにイメージをアップロードします。Dockerがあなたの代わりにリポジトリにイメージをアップロードするには、まず レジストリでDockerを認証する 必要があります。
SnowflakeレジストリでDockerを認証するには、以下のコマンドを実行します。
docker login <registry_hostname> -u <username>
username
には、Snowflakeのユーザー名を指定します。Dockerは、パスワードの入力を求めるプロンプトを表示します。
イメージをアップロードするには、以下のコマンドを実行します。
docker push <repository_url>/<image_name>
例
docker push myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest
3: 仕様ファイルをステージする¶
サービス仕様ファイル(
my_job_spec.yaml
)をステージにアップロードするには、以下のオプションのいずれかを使用します。Snowsightウェブインターフェイス: 手順については、 ローカルファイルに対する内部ステージの選択 をご参照ください。
SnowSQL CLI: 次の PUT コマンドを実行します。
PUT file://<file-path>[/\]my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
例:
Linuxまたは macOS
PUT file:///tmp/my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
Windows
PUT file://C:\temp\my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
相対パスを指定することもできます。
PUT file://./my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
このコマンドは OVERWRITE=TRUE を設定するため、必要な場合(例: 仕様ファイルのエラーを修正した場合)は、ファイルを再度アップロードすることができます。PUT コマンドが正常に実行されると、アップロードされたファイルに関する情報がプリントアウトされます。
4: ジョブサービスを実行する¶
これでジョブを作成する準備が整いました。
ジョブサービスを開始するには、 EXECUTE JOBSERVICE コマンドを実行します。
EXECUTE JOB SERVICE IN COMPUTE POOL tutorial_compute_pool NAME=tutorial_2_job_service FROM @tutorial_stage SPEC='my_job_spec.yaml';
次の点に注意してください。
FROM と SPEC は、ステージ名とジョブサービス仕様ファイル名を提供します。ジョブサービスが実行されると、 SQL ステートメントが実行され、
my_job_spec.yaml
で指定されたテーブルに結果が保存されます。SQL ステートメントは、Dockerコンテナ内では実行されません。代わりに、実行中のコンテナーはSnowflakeに接続し、Snowflakeウェアハウスで SQL ステートメントを実行します。
COMPUTE_POOL は、Snowflakeがジョブサービスを実行するコンピューティングリソースを提供します。
EXECUTE JOBSERVICE は、次のサンプル出力に示すように、ジョブ名を含む出力を返します。
+------------------------------------------------------------------------------------+ | status | -------------------------------------------------------------------------------------+ | Job TUTORIAL_2_JOB_SERVICE completed successfully with status: DONE. | +------------------------------------------------------------------------------------+
このジョブサービスは単純なクエリを実行し、結果を結果テーブルに保存します。ジョブサービス正常に完了したことは、結果テーブルをクエリすると確認できます。
SELECT * FROM results;
サンプル出力:
+----------+-----------+ | TIME | TEXT | |----------+-----------| | 10:56:52 | hello | +----------+-----------+
ジョブサービスの実行をデバッグする場合は、システム関数を使用します。たとえば、 SYSTEM$GET_SERVICE_STATUS を使用して、ジョブサービスがまだ実行中なのか、起動に失敗したのか、起動に失敗した場合はなぜ失敗したのかを判断します。また、コードが有用なログを標準出力または標準エラーに出力すると想定し、 SYSTEM$GET_SERVICE_LOGS を使用してログにアクセスできます。
ジョブサービスのステータスを取得するには、システム関数 SYSTEM$GET_SERVICE_STATUS --- 非推奨 を呼び出します。
SELECT SYSTEM$GET_SERVICE_STATUS('tutorial_2_job_service');
サンプル出力:
[ { "status":"DONE", "message":"Completed successfully", "containerName":"main", "instanceId":"0", "serviceName":"TUTORIAL_2_JOB_SERVICE", "image":"orgname-acctname.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest", "restartCount":0, "startTime":"" } ]
ジョブサービスのログ情報を取得するには、システム関数 SYSTEM$GET_SERVICE_LOGS を使用します。
SELECT SYSTEM$GET_SERVICE_LOGS('tutorial_2_job_service', 0, 'main')
job-tutorial - INFO - Job started job-tutorial - INFO - Connection succeeded. Current session context: database="TUTORIAL_DB", schema="DATA_SCHEMA", warehouse="TUTORIAL_WAREHOUSE", role="TEST_ROLE" job-tutorial - INFO - Executing query [select current_time() as time,'hello'] and writing result to table [results] job-tutorial - INFO - Job finished
5: クリーンアップする¶
チュートリアル3 に進む予定がない場合は、作成した請求対象リソースを削除する必要があります。詳細については、 チュートリアル3 のステップ5をご参照ください。
6: ジョブサービスコードを見直す¶
このセクションでは、以下のトピックを取り上げます。
提供されたファイルの調査: ジョブサービスを実装するさまざまなコードファイルを確認します。
ローカルでのイメージの構築とテスト。このセクションでは、Snowflakeアカウントのリポジトリにアップロードする前に、ローカルでDockerイメージをテストする方法について説明します。
提供されたファイルの調査¶
チュートリアルの最初にダウンロードしたzipファイルには、以下のファイルが含まれています。
main.py
Dockerfile
my_job_spec.yaml
このセクションでは、コードの概要を説明します。
main.pyファイル¶
#!/opt/conda/bin/python3
import argparse
import logging
import os
import sys
from snowflake.snowpark import Session
from snowflake.snowpark.exceptions import *
# Environment variables below will be automatically populated by Snowflake.
SNOWFLAKE_ACCOUNT = os.getenv("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_HOST = os.getenv("SNOWFLAKE_HOST")
SNOWFLAKE_DATABASE = os.getenv("SNOWFLAKE_DATABASE")
SNOWFLAKE_SCHEMA = os.getenv("SNOWFLAKE_SCHEMA")
# Custom environment variables
SNOWFLAKE_USER = os.getenv("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = os.getenv("SNOWFLAKE_PASSWORD")
SNOWFLAKE_ROLE = os.getenv("SNOWFLAKE_ROLE")
SNOWFLAKE_WAREHOUSE = os.getenv("SNOWFLAKE_WAREHOUSE")
def get_arg_parser():
"""
Input argument list.
"""
parser = argparse.ArgumentParser()
parser.add_argument("--query", required=True, help="query text to execute")
parser.add_argument(
"--result_table",
required=True,
help="name of the table to store result of query specified by flag --query")
return parser
def get_logger():
"""
Get a logger for local logging.
"""
logger = logging.getLogger("job-tutorial")
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(name)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def get_login_token():
"""
Read the login token supplied automatically by Snowflake. These tokens
are short lived and should always be read right before creating any new connection.
"""
with open("/snowflake/session/token", "r") as f:
return f.read()
def get_connection_params():
"""
Construct Snowflake connection params from environment variables.
"""
if os.path.exists("/snowflake/session/token"):
return {
"account": SNOWFLAKE_ACCOUNT,
"host": SNOWFLAKE_HOST,
"authenticator": "oauth",
"token": get_login_token(),
"warehouse": SNOWFLAKE_WAREHOUSE,
"database": SNOWFLAKE_DATABASE,
"schema": SNOWFLAKE_SCHEMA
}
else:
return {
"account": SNOWFLAKE_ACCOUNT,
"host": SNOWFLAKE_HOST,
"user": SNOWFLAKE_USER,
"password": SNOWFLAKE_PASSWORD,
"role": SNOWFLAKE_ROLE,
"warehouse": SNOWFLAKE_WAREHOUSE,
"database": SNOWFLAKE_DATABASE,
"schema": SNOWFLAKE_SCHEMA
}
def run_job():
"""
Main body of this job.
"""
logger = get_logger()
logger.info("Job started")
# Parse input arguments
args = get_arg_parser().parse_args()
query = args.query
result_table = args.result_table
# Start a Snowflake session, run the query and write results to specified table
with Session.builder.configs(get_connection_params()).create() as session:
# Print out current session context information.
database = session.get_current_database()
schema = session.get_current_schema()
warehouse = session.get_current_warehouse()
role = session.get_current_role()
logger.info(
f"Connection succeeded. Current session context: database={database}, schema={schema}, warehouse={warehouse}, role={role}"
)
# Execute query and persist results in a table.
logger.info(
f"Executing query [{query}] and writing result to table [{result_table}]"
)
res = session.sql(query)
# If the table already exists, the query result must match the table scheme.
# If the table does not exist, this will create a new table.
res.write.mode("append").save_as_table(result_table)
logger.info("Job finished")
if __name__ == "__main__":
run_job()
コードで、
Pythonコードが
main
で実行され、次にrun_job()
関数が実行されます。if __name__ == "__main__": run_job()
run_job()
関数は環境変数を読み取り、それを使用してさまざまなパラメーターのデフォルト値を設定します。コンテナーは、これらのパラメーターを使用してSnowflakeに接続します。次に注意してください。サービス仕様の
containers.env
およびcontainers.args
フィールドを使用して、サービスで使用されるパラメーター値を上書きできます。詳細については、 サービス仕様リファレンス をご参照ください。イメージがSnowflakeで実行されると、Snowflakeはこれらのパラメーターの一部(ソースコードを参照)を自動的に入力します。しかし、イメージをローカルでテストする場合は、これらのパラメーターを明示的に指定する必要があります(次のセクション ローカルでのイメージの構築とテスト に示すように)。
Dockerfile¶
このファイルには、Dockerを使用してイメージを構築するためのすべてのコマンドが含まれています。
ARG BASE_IMAGE=continuumio/miniconda3:4.12.0
FROM $BASE_IMAGE
RUN conda install python=3.8 && \
conda install snowflake-snowpark-python
COPY main.py ./
ENTRYPOINT ["python3", "main.py"]
my_job_spec.yamlファイル(サービス仕様)¶
Snowflakeは、この仕様で提供された情報を使用して、ジョブサービスを構成および実行します。
spec:
containers:
- name: main
image: /tutorial_db/data_schema/tutorial_repository/my_job_image:latest
env:
SNOWFLAKE_WAREHOUSE: tutorial_warehouse
args:
- "--query=select current_time() as time,'hello'"
- "--result_table=results"
container.name
と container.image
の必須フィールド(サービス仕様リファレンス を参照)に加えて、この仕様には引数をリストするためのオプションの container.args
フィールドがあります。
--query
は、サービス実行時に実行するクエリを提供します。--result_table
は、クエリ結果を保存するテーブルを識別します。
ローカルでのイメージの構築とテスト¶
Snowflakeアカウントのリポジトリにアップロードする前に、ローカルでDockerイメージをテストすることができます。ローカルテストでは、コンテナはスタンドアロンで実行されます(Snowflakeが実行するジョブサービスではありません)。
以下のステップを使用して、チュートリアル2Dockerイメージをテストします。
Dockerイメージを作成するには、Docker CLIで
docker build
コマンドを実行します。docker build --rm -t my_service:local .
コードを起動するには、
docker run
コマンドを実行し、<組織名>-<アカウント名>
、<ユーザー名>
、<パスワード>
を提供します。docker run --rm \ -e SNOWFLAKE_ACCOUNT=<orgname>-<acctname> \ -e SNOWFLAKE_HOST=<orgname>-<acctname>.snowflakecomputing.com \ -e SNOWFLAKE_DATABASE=tutorial_db \ -e SNOWFLAKE_SCHEMA=data_schema \ -e SNOWFLAKE_ROLE=test_role \ -e SNOWFLAKE_USER=<username> \ -e SNOWFLAKE_PASSWORD=<password> \ -e SNOWFLAKE_WAREHOUSE=tutorial_warehouse \ my_job:local \ --query="select current_time() as time,'hello'" \ --result_table=tutorial_db.data_schema.results
ローカルでイメージをテストする場合は、3つの引数(クエリ、クエリを実行するウェアハウス、結果を保存するテーブル)に加えて、ローカルで実行するコンテナーがSnowflakeに接続するための接続パラメーターも提供することに注意してください。
コンテナをサービスとして実行すると、Snowflakeはこれらのパラメーターを環境変数としてコンテナに提供します。詳細については、 Snowflakeクライアントを構成する をご参照ください。
ジョブサービスはクエリ(
select current_time() as time,'hello'
)を実行し、結果をテーブル(tutorial_db.data_schema.results
)に書き込みます。テーブルが存在しない場合は作成されます。テーブルが存在する場合、ジョブサービスは行を追加します。結果テーブルのクエリ結果の例:
+----------+----------+ | TIME | TEXT | |----------+----------| | 10:56:52 | hello | +----------+----------+
次の内容¶
これで チュートリアル3 をテストし、サービス間の通信方法を表示することができます。