チュートリアル 2: タスクとタスクグラフの作成と管理 (DAGs)¶
概要¶
このチュートリアルでは、Snowflake タスクを作成して使用し、いくつかの基本的なストアド プロシージャを管理します。また、タスクグラフ(有向非巡回グラフ(DAG)とも呼ばれる)を作成し、上位レベルのタスクグラフ(API)でタスクをオーケストレーションすることもできます。
前提条件¶
注釈
すでに Snowflake Python APIs チュートリアルの共通セットアップ と チュートリアル1:データベース、スキーマ、テーブル、ウェアハウスの作成 の両方を完了している場合は、これらの前提条件をスキップして、このチュートリアルの最初のステップに進むことができます。
このチュートリアルを始める前に、以下のステップを完了する必要があります。
以下の 共通セットアップ の手順に従ってください。
開発環境を設定します。
Snowflake Python APIs パッケージをインストールします。
Snowflake接続を設定します。
Python API チュートリアルに必要なすべてのモジュールをインポートします。
API
Root
オブジェクトを作成します。
以下のコードを実行して、
PYTHON_API_DB
という名前のデータベースと、そのデータベース内にPYTHON_API_SCHEMA
という名前のスキーマを作成します。database = root.databases.create( Database( name="PYTHON_API_DB"), mode=CreateMode.or_replace ) schema = database.schemas.create( Schema( name="PYTHON_API_SCHEMA"), mode=CreateMode.or_replace, )
これらは、 チュートリアル1 で作成したものと同じデータベースとスキーマ・オブジェクトです。
これらの前提条件が完了したら、 API を使ってタスク管理を始める準備ができたことになります。
Snowflake オブジェクトの設定¶
タスクが呼び出すストアドプロシージャと、ストアドプロシージャを保持するステージを設定します。 Snowflake Python APIs root
オブジェクトを使って、以前に作成した PYTHON_API_DB
データベースと PYTHON_API_SCHEMA
スキーマにステージを作成することができます。
TASKS_STAGE
という名前のステージを作るには、ノートブックの次のセルに次のコードを実行します。stages = root.databases[database.name].schemas[schema.name].stages stages.create(Stage(name="TASKS_STAGE"))
このステージは、ストアドプロシージャと、それらのプロシージャが必要とする依存関係を保持します。
タスクがストアドプロシージャとして実行する2つの基本的なPython関数を作成するために、次のセルで以下のコードを実行します。
def trunc(session: Session, from_table: str, to_table: str, count: int) -> str: ( session .table(from_table) .limit(count) .write.save_as_table(to_table) ) return "Truncated table successfully created!" def filter_by_shipmode(session: Session, mode: str) -> str: ( session .table("snowflake_sample_data.tpch_sf100.lineitem") .filter(col("L_SHIPMODE") == mode) .limit(10) .write.save_as_table("filter_table") ) return "Filter table successfully created!"
これらの機能は以下のようなものです。
trunc()
: 入力テーブルの切り捨て版を作成します。filter_by_shipmode()
:SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.LINEITEM
テーブルをシップモードでフィルタリングし、結果を10行に制限し、結果を新しいテーブルに書き込みます。注釈
この関数は、 SNOWFLAKE_SAMPLE_DATA データベース内の TPC-H サンプルデータ をクエリします。Snowflakeはデフォルトで新規アカウントにサンプルデータベースを作成します。アカウントにデータベースが作成されていない場合は、 サンプルデータベースの使用 をご参照ください。
関数は意図的に基本的なものであり、デモンストレーションを目的としています。
タスクの作成と管理¶
以前に作成したPython関数をストアドプロシージャとして実行する2つのタスクを定義、作成、管理します。
ノートブックの次のセルに2つのタスク、
task1
とtask2
を定義するには、以下のコードを実行します。tasks_stage = f"{database.name}.{schema.name}.TASKS_STAGE" task1 = Task( name="task_python_api_trunc", definition=StoredProcedureCall( func=trunc, stage_location=f"@{tasks_stage}", packages=["snowflake-snowpark-python"], ), warehouse="COMPUTE_WH", schedule=timedelta(minutes=1) ) task2 = Task( name="task_python_api_filter", definition=StoredProcedureCall( func=filter_by_shipmode, stage_location=f"@{tasks_stage}", packages=["snowflake-snowpark-python"], ), warehouse="COMPUTE_WH" )
このコードでは、以下のタスク・パラメーターを指定します。
各タスクについて、以下の属性を含む StoredProcedureCall オブジェクトで表される定義:
実行する呼び出し可能な関数
Python関数のコンテンツとその依存関係がアップロードされるステージの場所。
ストアドプロシージャのパッケージ依存関係
ストアドプロシージャを実行するウェアハウス(
StoredProcedureCall
オブジェクトでタスクを作成する場合に必要)。このチュートリアルでは、トライアルアカウントに含まれるCOMPUTE_WH
ウェアハウスを使用します。ルートタスクの実行スケジュール、
task1
。スケジュールは、タスクを定期的に実行する間隔を指定します。
ストアドプロシージャの詳細については、 Pythonでのストアドプロシージャの記述 をご参照ください。
2つのタスクを作成するには、データベーススキーマから
TaskCollection
オブジェクト(tasks
)を取り出し、タスクコレクションで.create()
を呼び出します。# create the task in the Snowflake database tasks = schema.tasks trunc_task = tasks.create(task1, mode=CreateMode.or_replace) task2.predecessors = [trunc_task.name] filter_task = tasks.create(task2, mode=CreateMode.or_replace)
このコード例では、
task1
をtask2
の前任者に設定することで、タスクをリンクし、最小のタスクグラフを作成しています。2つのタスクが存在することを確認するために、次のセルで次のコードを実行します。
taskiter = tasks.iter() for t in taskiter: print(t.name)
タスクを作成すると、そのタスクはデフォルトで一時停止されます。
タスクを開始するには、タスク・リソース・オブジェクトで
.resume()
を呼び出します。trunc_task.resume()
trunc_task
タスクが開始されたことを確認するために、次のセルで以下のコードを実行します。taskiter = tasks.iter() for t in taskiter: print("Name: ", t.name, "| State: ", t.state)
出力はこのようになるはずです。
Name: TASK_PYTHON_API_FILTER | State: suspended Name: TASK_PYTHON_API_TRUNC | State: started
タスクのステータスを確認したいときは、いつでもこのステップを繰り返すことができます。
タスク・リソースをクリーンアップするには、まずタスクを一時停止してからドロップします。
次のセルで次のコードを実行します。
trunc_task.suspend()
タスクが一時停止されたことを確認するには、ステップ5を繰り返します。
任意: 両方のタスクを削除するには、次のセルで以下のコードを実行します。
trunc_task.drop() filter_task.drop()
タスクグラフの作成と管理¶
多数のタスクの実行を調整する場合、各タスクを個別に管理するのは難しいです。 Snowflake Python APIs は、より高いレベルのタスクグラフ API でタスクをオーケストレートする機能を提供します。
タスクグラフ、または有向非巡回グラフ(DAG)は、ルートタスクと子タスクで構成され、依存関係によって編成された一連のタスクです。詳細については、 タスクグラフでタスクの依存関係を管理する をご参照ください。
タスク・グラフを作成してデプロイするには、以下のコードを実行します。
dag_name = "python_api_dag" dag = DAG(name=dag_name, schedule=timedelta(days=1)) with dag: dag_task1 = DAGTask( name="task_python_api_trunc", definition=StoredProcedureCall( func=trunc, stage_location=f"@{tasks_stage}", packages=["snowflake-snowpark-python"]), warehouse="COMPUTE_WH", ) dag_task2 = DAGTask( name="task_python_api_filter", definition=StoredProcedureCall( func=filter_by_shipmode, stage_location=f"@{tasks_stage}", packages=["snowflake-snowpark-python"]), warehouse="COMPUTE_WH", ) dag_task1 >> dag_task2 dag_op = DAGOperation(schema) dag_op.deploy(dag, mode=CreateMode.or_replace)
このコードでは次のように行います。
DAG
コンストラクターを呼び出し、名前とスケジュールを指定してタスクグラフオブジェクトを作成します。DAGTask
コンストラクターを使って、タスクグラフ指定のタスクを定義します。コンストラクターは、前のステップでStoredProcedureCall
クラスに指定したのと同じ引数を受け付けることに注意してください。dag_task1
をルート・タスクとして指定し、より便利な構文でdag_task2
の前任者とします。PYTHON_API_DB
データベースのPYTHON_API_SCHEMA
スキーマにタスクグラフをデプロイします。
タスク・グラフの作成を確認するために、次のセルで次のコードを実行します。
taskiter = tasks.iter() for t in taskiter: print("Name: ", t.name, "| State: ", t.state)
タスクのステータスを確認したいときは、いつでもこのステップを繰り返すことができます。
ルート・タスクを開始してタスク・グラフを開始するには、次のセルで次のコードを実行します。
dag_op.run(dag)
PYTHON_API_DAG$TASK_PYTHON_API_TRUNC
タスクの開始を確認するには、ステップ2を繰り返します。注釈
タスクグラフによって呼び出される関数呼び出しは、必要な引数とともに呼び出していないため、成功しません。この手順の目的は、タスクグラフをプログラムで開始する方法を示すことのみです。
タスク・グラフをドロップするには、次のセルで次のコードを実行します。
dag_op.drop(dag)
これらのチュートリアルで作成したデータベースオブジェクトをクリーンアップします。
database.drop()
次の内容¶
おめでとうございます。このチュートリアルでは、 Snowflake Python APIs を使ってタスクとタスクグラフを作成、管理する方法を学びました。
概要¶
その過程で、あなたは以下のタスクを完了しました。
ストアドプロシージャとその依存関係を保持できるステージを作成します。
タスクを作成して管理します。
タスクグラフを作成して管理します。
Snowflakeのリソースオブジェクトを削除してクリーンアップします。
次のチュートリアル¶
次に、 チュートリアル3:Snowparkコンテナサービスの作成と管理 に進み、 Snowpark Container Services でコンポーネントを作成し管理する方法を説明します。
追加のリソース¶
API を使用して Snowflake で他のタイプのオブジェクトを管理する例については、以下の開発者ガイドを参照してください。
ガイド |
説明 |
---|---|
データベース、スキーマ、テーブルの作成と管理には API を使用します。 |
|
API を使用して、ユーザー、ロール、およびグラントを作成および管理します。 |
|
API を使用して、外部ボリューム、パイプ、ステージなど、データのロードとアンロードのリソースを作成および管理します。 |
|
API を使用して、コンピューティングプール、イメージリポジトリ、サービスなど、Snowpark Container Servicesのコンポーネントを管理します。 |