PythonによるSnowflakeタスクとタスクグラフの管理¶
Pythonを使用してSnowflakeタスクを管理し、 SQL ステートメント、プロシージャ呼び出し、および Snowflakeスクリプト のロジックを実行できます。タスクの概要については、 タスクの紹介 をご参照ください。
Snowflake Python API は、2つの別々のタイプでタスクを表します。
Task
: スケジュール、パラメーター、先行タスクなどのタスクのプロパティを公開します。TaskResource
: 対応するTask
オブジェクトのフェッチ、タスクの実行、タスクの変更に使用できるメソッドを公開します。
前提条件¶
このトピックの例では、Snowflakeと接続するコードを追加して Root
オブジェクトを作成し、そこからSnowflake Python API を使用することを想定しています。
たとえば、以下のコードでは、構成ファイルで定義された接続パラメーターを使用してSnowflakeへの接続を作成します。
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
出来上がった Session
オブジェクトを使って、コードは API のタイプとメソッドを使う Root
オブジェクトを作成します。詳細については、 Snowflake Python API を使用してSnowflakeに接続する をご参照ください。
タスクの作成¶
タスクを作成するには、まず Task
オブジェクトを作成します。次に、タスクを作成するデータベースとスキーマを指定して TaskCollection
オブジェクトを作成します。 TaskCollection.create
を使用して、新しいタスクをSnowflakeに追加します。
次の例のコードは、 definition
パラメーターで指定された SQL クエリを実行する my_task
というタスクを表す Task
オブジェクトを作成します。
from datetime import timedelta
from snowflake.core import Root
from snowflake.core.task import Task
my_task = Task(name='my_task', definition='<sql query>', schedule=timedelta(hours=1))
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task)
このコードは、 my_db
データベースと my_schema
スキーマから TaskCollection
変数 tasks
を作成します。 TaskCollection.create
を使用して、Snowflakeに新しいタスクを作成します。
このコード例では、タスクのスケジュールに1時間の timedelta
値も指定します。タスクのスケジュールは timedelta
値または Cron
式のいずれかを使用して定義できます。
Python関数やストアドプロシージャを実行するタスクを作成することもできます。次の例のコードは、 my_task2
というタスクを作成し、 StoredProcedureCall
オブジェクトで表される関数を実行します。
from snowflake.core import Root
from snowflake.core.task import StoredProcedureCall, Task
my_task2 = Task(
StoredProcedureCall(
dosomething, stage_location="@mystage"
),
warehouse="test_warehouse"
)
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task2)
このオブジェクトは dosomething
という名前の関数を @mystage
ステージのロケーションに指定します。 StoredProcedureCall
オブジェクトでタスクを作成する場合は、 warehouse
も指定する必要があります。
タスクの作成または更新¶
タスクを表す Task
オブジェクトのプロパティを設定し、 TaskResource.create_or_update
メソッドでオブジェクトをSnowflakeに渡すことで、既存のタスクの特性を更新できます。
タスクを作成するときに、新しいタスクを記述する Task
オブジェクトを渡すこともできます。
次の例のコードは、タスクの名前、定義、スケジュールを設定してから、Snowflakeでタスクを更新するか、まだ存在しない場合はタスクを作成します。
from datetime import timedelta
from snowflake.core import Root
from snowflake.core.task import Task
tasks = root.databases["my_db"].schemas["my_schema"].tasks
tasks['my_task'].create_or_update(
Task(name='my_task', definition='<sql query 2>', schedule=timedelta(hours=1))
)
タスクのリスト¶
TaskCollection.iter
メソッドを使用してタスクを一覧表示できます。このメソッドは Task
オブジェクトの PagedIter
反復子を返します。
次の例のコードは、名前が my で始まるタスクをリストします。
from snowflake.core import Root
from snowflake.core.task import Task
from snowflake.core.task import TaskCollection
root = Root(connection)
tasks: TaskCollection = root.databases["my_db"].schemas["my_schema"].tasks
task_iter = tasks.iter(like="my%") # returns a PagedIter[Task]
for task_obj in task_iter:
print(task_obj.name)
タスク操作の実行¶
タスクの実行、中断、再開といった一般的なタスク操作を TaskResource
オブジェクトで行うことができます。
次の例のコードは、 my_task
タスクを実行、中断、再開、削除します。
from snowflake.core import Root
from snowflake.core.task import Task
tasks = root.databases["my_db"].schemas["my_schema"].tasks
task_res = tasks['my_task']
task_res.execute()
task_res.suspend()
task_res.resume()
task_res.delete()
タスクグラフでのタスクの管理¶
タスクグラフで集められたタスクを管理することができます。タスクグラフは、依存関係によってまとめられた単一のルートタスクと追加のタスクで構成された、一連のタスクです。
タスクグラフ内のタスクに関する詳細については、 タスクグラフ をご参照ください。
タスクグラフの作成¶
タスクグラフを作成するには、まず DAG
オブジェクトを作成し、名前とスケジュールなどのオプションのプロパティを指定します。タスクグラフのスケジュールは、 timedelta
値または Cron
式のいずれかを使用して定義できます。
次の例のコードは、Python関数 dosomething
を定義し、タスクグラフで dag_task2
という名前の DAGTask
オブジェクトとして関数を指定します。
from snowflake.core import Root
from snowflake.core.task import StoredProcedureCall
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation
from snowflake.snowpark import Session
from snowflake.snowpark.functions import sum as sum_
def dosomething(session: Session) -> None:
df = session.table("target")
df.group_by("a").agg(sum_("b")).save_as_table("agg_table")
with DAG("my_dag", schedule=timedelta(days=1)) as dag:
# Create a task that runs some SQL.
dag_task1 = DAGTask(
"dagtask1",
"MERGE INTO target USING source_stream WHEN MATCHED THEN UPDATE SET target.v = source_stream.v"
)
# Create a task that runs a Python function.
dag_task2 = DAGTask(
StoredProcedureCall(
dosomething, stage_location="@mystage",
packages=["snowflake-snowpark-python"]
),
warehouse="test_warehouse"
)
# Shift right and left operators can specify task relationships.
dag_task1 >> dag_task2 # dag_task1 is a predecessor of dag_task2
schema = root.databases["my_db"].schemas["my_schema"]
dag_op = DAGOperation(schema)
dag_op.deploy(dag)
このコードはまた、 SQL ステートメントを dag_task1
という別の DAGTask
オブジェクトとして定義してから、 dag_task1
を dag_task2
の先行タスクとして指定します。最後に、 my_db
データベースと my_schema
スキーマでタスクグラフをSnowflakeに展開します。
cronスケジュール、タスクブランチ、関数の戻り値でタスクグラフを作成します。¶
また、タスクの戻り値として使用される指定されたcronスケジュール、タスクブランチ、関数の戻り値を持つタスクグラフを作成することもできます。
次の例のコードでは、 DAG
オブジェクトを、そのスケジュールを指定する Cron
オブジェクトで作成します。これは、 DAGTaskBranch
オブジェクトを task1_branch
という名前で他の DAGTask
オブジェクトと一緒に定義し、それらの依存関係を指定します。
from snowflake.core import Root
from snowflake.core._common import CreateMode
from snowflake.core.task import Cron
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation, DAGTaskBranch
from snowflake.snowpark import Session
def task_handler(session: Session) -> None:
pass # do something
def task_branch_handler(session: Session) -> str:
# do something
return "task3"
try:
with DAG(
"my_dag",
schedule=Cron("10 * * * *", "America/Los_Angeles"),
stage_location="@mystage",
packages=["snowflake-snowpark-python"],
use_func_return_value=True,
) as dag:
task1 = DAGTask(
"task1",
task_handler,
warehouse=test_warehouse,
)
task1_branch = DAGTaskBranch("task1_branch", task_branch_handler, warehouse=test_warehouse)
task2 = DAGTask("task2", task_handler, warehouse=test_warehouse)
task3 = DAGTask("task3", task_handler, warehouse=test_warehouse, condition="1=1")
task1 >> task1_branch
task1_branch >> [task2, task3]
schema = root.databases["my_db"].schemas["my_schema"]
op = DAGOperation(schema)
op.deploy(dag, mode=CreateMode.or_replace)
finally:
session.close()
このコード例は、タスクハンドラー関数も定義し、タスクに割り当てられた指定のタスクハンドラーを持つ DAGTask
と DAGTaskBranch
の各オブジェクトを作成します。コードは DAG の use_func_return_value
パラメーターを True
に設定します。これはPython関数の戻り値を対応するタスクの戻り値として使うことを指定します。それ以外の場合、 use_func_return_value
のデフォルト値は False
となります。
タスクグラフにおけるタスクの戻り値の設定と取得¶
タスクの定義が StoredProcedureCall
オブジェクトである場合、ストアドプロシージャ(または関数)のハンドラーは TaskContext
オブジェクトを使用することで、タスクの戻り値を明示的に設定することができます。
詳細については、 SYSTEM$SET_RETURN_VALUE をご参照ください。
次の例のコードは、現在のセッションから TaskContext
オブジェクトを context
という名前で作成するタスクハンドラー関数を定義します。次に、 TaskContext.set_return_value
メソッドを使用して、指定された文字列に戻り値を明示的に設定します。
from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session
def task_handler(session: Session) -> None:
context = TaskContext(session)
# this return value can be retrieved by successor Tasks.
context.set_return_value("predecessor_return_value")
タスクグラフで、直前のタスクをその先行タスクとして識別する直後のタスクは、先行タスクによって明示的に設定された戻り値を取得できます。
詳細については、 SYSTEM$GET_PREDECESSOR_RETURN_VALUE をご参照ください。
次の例のコードは、 TaskContext.get_predecessor_return_value
メソッドを使用して pred_task_name
という先行タスクの戻り値を取得するタスクハンドラー関数を定義します。
from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session
def task_handler(session: Session) -> None:
context = TaskContext(session)
pred_return_value = context.get_predecessor_return_value("pred_task_name")