チュートリアル2: Snowpark Container Servicesジョブを作成する

重要

Snowpark Container Servicesのジョブ機能は現在プライベートプレビュー中であり、 https://snowflake.com/legal のプレビュー規約に従うものとします。詳細については、Snowflakeの担当者にお問い合わせください。

紹介

チュートリアル共通セットアップ を完了すると、ジョブを作成する準備が整います。このチュートリアルでは、Snowflakeに接続し、 SQL SELECT クエリを実行し、結果をテーブルに保存する単純なジョブを作成します。

このチュートリアルには2つのパートがあります。

パート1: ジョブを作成してテストする。 このチュートリアルで提供されるコードをダウンロードし、ステップバイステップの手順に従います。

  1. このチュートリアルのジョブコードをダウンロードします。

  2. Snowpark Container Services用のDockerイメージをビルドし、アカウントのリポジトリにイメージをアップロードします。

  3. Snowflakeにコンテナー構成情報を与えるジョブ仕様ファイルをステージします。コンテナーの起動に使用するイメージの名前に加えて、仕様ファイルは以下を提供します。

    • 3つの引数: SELECT クエリ、クエリを実行する仮想ウェアハウス、および結果を保存するテーブル名。

    • SELECT ステートメントを実行するウェアハウス。

  4. ジョブを実行します。EXECUTE SERVICE コマンドを使用して、仕様ファイルとSnowflakeがコンテナーを実行できるコンピューティングプールを指定すると、ジョブを実行できます。そして最後に、ジョブの結果を確認します。

パート2: ジョブコードを理解する。このセクションでは、ジョブコードの概要を説明し、さまざまなコンポーネントがどのように連携しているかを明らかにします。

1: サービスコードをダウンロードする

ジョブを作成するためのコード(Pythonアプリケーション)が提供されます。

  1. zipファイル をディレクトリにダウンロードします。

  2. ファイルを解凍します。チュートリアルごとに1つのディレクトリが含まれています。 Tutorial-2 ディレクトリには以下のファイルがあります。

    • main.py

    • Dockerfile

    • my_job_spec.yaml

2: イメージをビルドしてアップロードする

Snowpark Container Servicesがサポートするlinux/amd64プラットフォーム用のイメージをビルドし、アカウントのイメージリポジトリにイメージをアップロードします(共通セットアップ を参照)。

イメージをビルドしてアップロードする前に、リポジトリに関する情報(リポジトリ URL とレジストリのホスト名)が必要です。詳細については、 レジストリおよびリポジトリ をご参照ください。

リポジトリに関する情報を取得する

  1. リポジトリ URL を取得するには、 SHOW IMAGE REPOSITORIES SQL コマンドを実行します。

    SHOW IMAGE REPOSITORIES;
    
    Copy
    • 出力の repository_url 列は、 URL を提供します。以下に例を示します。

      <orgname>-<acctname>.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository
      
    • リポジトリ URL のホスト名はレジストリのホスト名です。以下に例を示します。

      <orgname>-<acctname>.registry.snowflakecomputing.com
      

イメージをビルドし、リポジトリにアップロードする

  1. ターミナルウィンドウを開き、解凍したファイルのあるディレクトリに移動します。

  2. Dockerイメージをビルドするには、Docker CLI を使用して以下の docker build コマンドを実行します。このコマンドは、イメージのビルドに使用するファイルの PATH として、現在の作業ディレクトリ(.)を指定していることに注意してください。

    docker build --rm --platform linux/amd64 -t <repository_url>/<image_name> .
    
    Copy
    • image_name には、 my_job_image:latest を使用します。

    docker build --rm --platform linux/amd64 -t myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest .
    
    Copy
  3. Snowflakeアカウントのリポジトリにイメージをアップロードします。Dockerがあなたの代わりにリポジトリにイメージをアップロードするには、まずSnowflakeでDockerを認証する必要があります。

    1. SnowflakeレジストリでDockerを認証するには、以下のコマンドを実行します。

      docker login <registry_hostname> -u <username>
      
      Copy
      • username には、Snowflakeのユーザー名を指定します。Dockerは、パスワードの入力を求めるプロンプトを表示します。

    2. イメージをアップロードするには、以下のコマンドを実行します。

      docker push <repository_url>/<image_name>
      
      Copy

      docker push myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest
      
      Copy

3: 仕様ファイルをステージする

  • ジョブ仕様ファイル(my_job_spec.yaml)をステージにアップロードするには、以下のオプションのいずれかを使用します。

    • Snowsightウェブインターフェイス: 手順については、 ローカルファイルに対する内部ステージの選択 をご参照ください。

    • SnowSQL CLI: 次の PUT コマンドを実行します。

      PUT file://<file-path>[/\]my_job_spec.yaml @tutorial_stage
        AUTO_COMPRESS=FALSE
        OVERWRITE=TRUE;
      
      Copy

      例:

      • Linuxまたは macOS

        PUT file:///tmp/my_job_spec.yaml @tutorial_stage
          AUTO_COMPRESS=FALSE
          OVERWRITE=TRUE;
        
        Copy
      • Windows

        PUT file://C:\temp\my_job_spec.yaml @tutorial_stage
          AUTO_COMPRESS=FALSE
          OVERWRITE=TRUE;
        
        Copy

      相対パスを指定することもできます。

      PUT file://./my_job_spec.yaml @tutorial_stage
        AUTO_COMPRESS=FALSE
        OVERWRITE=TRUE;
      
      Copy

      このコマンドは OVERWRITE=TRUE を設定するため、必要な場合(例: 仕様ファイルのエラーを修正した場合)は、ファイルを再度アップロードすることができます。PUT コマンドが正常に実行されると、アップロードされたファイルに関する情報がプリントアウトされます。

4: ジョブを実行する

これでジョブを作成する準備が整いました。

  1. ジョブを開始するには、 EXECUTE SERVICE コマンドを実行します。

    EXECUTE SERVICE
      IN COMPUTE POOL tutorial_compute_pool
      FROM @tutorial_stage
      SPEC='my_job_spec.yaml';
    
    Copy

    次の点に注意してください。

    • FROM と SPEC は、ステージ名とジョブ仕様ファイル名を提供します。ジョブが実行されると、 SQL ステートメントが実行され、 my_job_spec.yaml で指定されたテーブルに結果が保存されます。

      ジョブの SQL ステートメントは、Dockerコンテナー内では実行されません。代わりに、実行中のコンテナーはSnowflakeに接続し、Snowflakeウェアハウスで SQL ステートメントを実行します。

    • COMPUTE_POOL は、Snowflakeがジョブを実行するコンピューティングリソースを提供します。

    • EXECUTE SERVICE は、次の出力例に示すように、ジョブのSnowflake割り当て UUID を含む出力を返します。

      +------------------------------------------------------------------------------------+
      |                      status                                                        |
      -------------------------------------------------------------------------------------+
      | Job 01af7ee6-0001-cb52-0020-c5870077223a completed successfully with status: DONE. |
      +------------------------------------------------------------------------------------+
      
  2. 実行したクエリの ID を取得します(EXECUTE SERVICE はクエリ)。

    SET jobid = LAST_QUERY_ID();
    
    Copy

    以下のステップで ID を使用して、ジョブステータスとジョブログ情報を取得します。

    注釈

    EXECUTE SERVICE を呼び出した直後に LAST_QUERY_ID を呼び出して、コマンドの返すジョブ ID が EXECUTE SERVICE コマンド用であることを確認することが重要です。

    LAST_QUERY_ID ジョブのクエリ ID はジョブが完了した後にのみ返されます。進行中の長時間実行ジョブは、リアルタイムのステータス情報を取得するには適していません。代わりに、テーブル関数の QUERY HISTORY ファミリーを使用して、ジョブのクエリ ID を取得するようにします。詳細については、 ジョブの操作 をご参照ください。

  3. このジョブは単純なクエリを実行し、結果を結果テーブルに保存します。ジョブが正常に完了したことは、結果テーブルをクエリすると確認できます。

    SELECT * FROM results;
    
    Copy

    サンプル出力:

    +----------+-----------+
    | TIME     | TEXT      |
    |----------+-----------|
    | 10:56:52 | hello     |
    +----------+-----------+
    
  4. ジョブの実行をデバッグする場合は、システム関数を使用します。たとえば、 SYSTEM$GET_JOB_STATUS を使用して、ジョブがまだ実行中なのか、起動に失敗したのか、起動に失敗した場合はなぜ失敗したのかを判断します。また、コードが有用なログを標準出力または標準エラーに出力すると想定し、 SYSTEM$GET_JOB_LOGS を使用してログにアクセスできます。

    1. ジョブステータスを取得するには、システム関数 SYSTEM$GET_JOB_STATUS を呼び出します。

      SELECT SYSTEM$GET_JOB_STATUS($jobid);
      
      Copy

      サンプル出力:

      [
        {
            "status":"DONE",
            "message":"Completed successfully",
            "containerName":"main",
            "instanceId":"0",
            "serviceName":"01af7ee6-0001-cb52-0020-c5870077223a",
            "image":"orgname-acctname.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest",
            "restartCount":0,
            "startTime":""
        }
      ]
      
      Copy

      出力では、ジョブに名前がないため、 serviceName はジョブのSnowflake割り当て UUID (クエリ ID)です。

    2. ジョブログ情報を取得するには、システム関数 SYSTEM$GET_JOB_LOGS を使用します。

      SELECT SYSTEM$GET_JOB_LOGS($jobid, 'main')
      
      Copy
      job-tutorial - INFO - Job started
      job-tutorial - INFO - Connection succeeded. Current session context: database="TUTORIAL_DB", schema="DATA_SCHEMA", warehouse="TUTORIAL_WAREHOUSE", role="TEST_ROLE"
      job-tutorial - INFO - Executing query [select current_time() as time,'hello'] and writing result to table [results]
      job-tutorial - INFO - Job finished
      

5: クリーンアップする

チュートリアル3 に進む予定がない場合は、作成した請求対象リソースを削除する必要があります。詳細については、 チュートリアル3 のステップ5をご参照ください。

6: ジョブコードを確認する

このセクションでは、以下のトピックを取り上げます。

提供されたファイルの調査

チュートリアルの最初にダウンロードしたzipファイルには、以下のファイルが含まれています。

  • main.py

  • Dockerfile

  • my_job_spec.yaml

このセクションでは、コードがジョブを実装する方法の概要を説明します。

main.pyファイル

#!/opt/conda/bin/python3

import argparse
import logging
import os
import sys

from snowflake.snowpark import Session
from snowflake.snowpark.exceptions import *

# Environment variables below will be automatically populated by Snowflake.
SNOWFLAKE_ACCOUNT = os.getenv("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_HOST = os.getenv("SNOWFLAKE_HOST")
SNOWFLAKE_DATABASE = os.getenv("SNOWFLAKE_DATABASE")
SNOWFLAKE_SCHEMA = os.getenv("SNOWFLAKE_SCHEMA")

# Custom environment variables
SNOWFLAKE_USER = os.getenv("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = os.getenv("SNOWFLAKE_PASSWORD")
SNOWFLAKE_ROLE = os.getenv("SNOWFLAKE_ROLE")
SNOWFLAKE_WAREHOUSE = os.getenv("SNOWFLAKE_WAREHOUSE")


def get_arg_parser():
  """
  Input argument list.
  """
  parser = argparse.ArgumentParser()
  parser.add_argument("--query", required=True, help="query text to execute")
  parser.add_argument(
    "--result_table",
    required=True,
    help="name of the table to store result of query specified by flag --query")

  return parser


def get_logger():
  """
  Get a logger for local logging.
  """
  logger = logging.getLogger("job-tutorial")
  logger.setLevel(logging.DEBUG)
  handler = logging.StreamHandler(sys.stdout)
  handler.setLevel(logging.DEBUG)
  formatter = logging.Formatter("%(name)s - %(levelname)s - %(message)s")
  handler.setFormatter(formatter)
  logger.addHandler(handler)
  return logger


def get_login_token():
  """
  Read the login token supplied automatically by Snowflake. These tokens
  are short lived and should always be read right before creating any new connection.
  """
  with open("/snowflake/session/token", "r") as f:
    return f.read()


def get_connection_params():
  """
  Construct Snowflake connection params from environment variables.
  """
  if os.path.exists("/snowflake/session/token"):
    return {
      "account": SNOWFLAKE_ACCOUNT,
      "host": SNOWFLAKE_HOST,
      "authenticator": "oauth",
      "token": get_login_token(),
      "warehouse": SNOWFLAKE_WAREHOUSE,
      "database": SNOWFLAKE_DATABASE,
      "schema": SNOWFLAKE_SCHEMA
    }
  else:
    return {
      "account": SNOWFLAKE_ACCOUNT,
      "host": SNOWFLAKE_HOST,
      "user": SNOWFLAKE_USER,
      "password": SNOWFLAKE_PASSWORD,
      "role": SNOWFLAKE_ROLE,
      "warehouse": SNOWFLAKE_WAREHOUSE,
      "database": SNOWFLAKE_DATABASE,
      "schema": SNOWFLAKE_SCHEMA
    }

def run_job():
  """
  Main body of this job.
  """
  logger = get_logger()
  logger.info("Job started")

  # Parse input arguments
  args = get_arg_parser().parse_args()
  query = args.query
  result_table = args.result_table

  # Start a Snowflake session, run the query and write results to specified table
  with Session.builder.configs(get_connection_params()).create() as session:
    # Print out current session context information.
    database = session.get_current_database()
    schema = session.get_current_schema()
    warehouse = session.get_current_warehouse()
    role = session.get_current_role()
    logger.info(
      f"Connection succeeded. Current session context: database={database}, schema={schema}, warehouse={warehouse}, role={role}"
    )

    # Execute query and persist results in a table.
    logger.info(
      f"Executing query [{query}] and writing result to table [{result_table}]"
    )
    res = session.sql(query)
    # If the table already exists, the query result must match the table scheme.
    # If the table does not exist, this will create a new table.
    res.write.mode("append").save_as_table(result_table)

  logger.info("Job finished")


if __name__ == "__main__":
  run_job()
Copy

コードで、

  • Pythonコードが main で実行され、次に run_job() 関数が実行されます。

    if __name__ == "__main__":
      run_job()
    
    Copy
  • run_job() 関数は環境変数を読み取り、それを使用してさまざまなパラメーターのデフォルト値を設定します。コンテナーは、これらのパラメーターを使用してSnowflakeに接続します。次に注意してください。

    • これらのデフォルトのパラメーター値は、上書きすることができます。詳細については、 サービス仕様リファレンス をご参照ください。

    • イメージがSnowflakeで実行されると、Snowflakeはこれらのパラメーターの一部(ソースコードを参照)を自動的に入力します。しかし、イメージをローカルでテストする場合は、これらのパラメーターを明示的に指定する必要があります(次のセクション ローカルでのイメージの構築とテスト に示すように)。

Dockerfile

このファイルには、Dockerを使用してイメージを構築するためのすべてのコマンドが含まれています。

ARG BASE_IMAGE=continuumio/miniconda3:4.12.0
FROM $BASE_IMAGE
RUN conda install python=3.8 && \
  conda install snowflake-snowpark-python
COPY main.py ./
ENTRYPOINT ["python3", "main.py"]
Copy

my_job_spec.yamlファイル(ジョブ仕様)

Snowflakeは、この仕様で提供された情報を使用して、ジョブを構成および実行します。

spec:
container:
- name: main
   image: /tutorial_db/data_schema/tutorial_repository/my_job_image:latest
   env:
      SNOWFLAKE_WAREHOUSE: tutorial_warehouse
   args:
   - "--query=select current_time() as time,'hello'"
   - "--result_table=results"
Copy

container.namecontainer.image の必須フィールド(サービス仕様リファレンス を参照)に加えて、この仕様には引数をリストするためのオプションの container.args フィールドがあります。

  • --query ジョブの実行時に実行するクエリを提供します。

  • --result_table は、クエリ結果を保存するテーブルを識別します。

ローカルでのイメージの構築とテスト

Snowflakeアカウントのリポジトリにアップロードする前に、ローカルでDockerイメージをテストすることができます。ローカルテストでは、コンテナーはスタンドアロンで実行されます(Snowflakeが実行するジョブではありません)。

以下のステップを使用して、チュートリアル2Dockerイメージをテストします。

  1. Dockerイメージを作成するには、Docker CLIで docker build コマンドを実行します。

    docker build --rm -t my_service:local .
    
    Copy
  2. コードを起動するには、 docker run コマンドを実行し、 <組織名>-<アカウント名><ユーザー名><パスワード> を提供します。

    docker run --rm \
      -e SNOWFLAKE_ACCOUNT=<orgname>-<acctname> \
      -e SNOWFLAKE_HOST=<orgname>-<acctname>.snowflakecomputing.com \
      -e SNOWFLAKE_DATABASE=tutorial_db \
      -e SNOWFLAKE_SCHEMA=data_schema \
      -e SNOWFLAKE_ROLE=test_role \
      -e SNOWFLAKE_USER=<username> \
      -e SNOWFLAKE_PASSWORD=<password> \
      -e SNOWFLAKE_WAREHOUSE=tutorial_warehouse \
      my_job:local \
      --query="select current_time() as time,'hello'" \
      --result_table=tutorial_db.data_schema.results
    
    Copy

    ローカルでイメージをテストする場合は、3つの引数(クエリ、クエリを実行するウェアハウス、結果を保存するテーブル)に加えて、ローカルで実行するコンテナーがSnowflakeに接続するための接続パラメーターも提供することに注意してください。

    コンテナーをジョブとして実行すると、Snowflake はこれらのパラメーターを環境変数としてコンテナーに提供します。詳細については、 Snowflakeクライアントを構成する をご参照ください。

    ジョブはクエリ(select current_time() as time,'hello')を実行し、結果をテーブル(tutorial_db.data_schema.results)に書き込みます。テーブルが存在しない場合は作成されます。テーブルが存在する場合、ジョブは行を追加します。

    結果テーブルのクエリ結果の例:

    +----------+----------+
    | TIME     | TEXT     |
    |----------+----------|
    | 10:56:52 | hello    |
    +----------+----------+
    

次の内容

これで チュートリアル3 をテストし、サービス間の通信方法を表示することができます。