PythonによるSnowflakeタスクとタスクグラフの管理

Pythonを使用してSnowflakeタスクを管理し、 SQL ステートメント、プロシージャ呼び出し、および Snowflakeスクリプト のロジックを実行できます。タスクの概要については、 タスクの紹介 をご参照ください。

Snowflake Python API は、2つの別々のタイプでタスクを表します。

  • Task: スケジュール、パラメーター、先行タスクなどのタスクのプロパティを公開します。

  • TaskResource: 対応する Task オブジェクトのフェッチ、タスクの実行、タスクの変更に使用できるメソッドを公開します。

前提条件

このトピックの例では、Snowflakeと接続するコードを追加して Root オブジェクトを作成し、そこからSnowflake Python API を使用することを想定しています。

たとえば、以下のコードでは、構成ファイルで定義された接続パラメーターを使用してSnowflakeへの接続を作成します。

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

出来上がった Session オブジェクトを使って、コードは API のタイプとメソッドを使う Root オブジェクトを作成します。詳細については、 Snowflake Python API を使用してSnowflakeに接続する をご参照ください。

タスクの作成

タスクを作成するには、まず Task オブジェクトを作成します。次に、タスクを作成するデータベースとスキーマを指定して TaskCollection オブジェクトを作成します。 TaskCollection.create を使用して、新しいタスクをSnowflakeに追加します。

次の例のコードは、 definition パラメーターで指定された SQL クエリを実行する my_task というタスクを表す Task オブジェクトを作成します。

from datetime import timedelta
from snowflake.core import Root
from snowflake.core.task import Task

my_task = Task(name='my_task', definition='<sql query>', schedule=timedelta(hours=1))
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task)
Copy

このコードは、 my_db データベースと my_schema スキーマから TaskCollection 変数 tasks を作成します。 TaskCollection.create を使用して、Snowflakeに新しいタスクを作成します。

このコード例では、タスクのスケジュールに1時間の timedelta 値も指定します。タスクのスケジュールは timedelta 値または Cron 式のいずれかを使用して定義できます。

Python関数やストアドプロシージャを実行するタスクを作成することもできます。次の例のコードは、 my_task2 というタスクを作成し、 StoredProcedureCall オブジェクトで表される関数を実行します。

from snowflake.core import Root
from snowflake.core.task import StoredProcedureCall, Task

my_task2 = Task(
  StoredProcedureCall(
      dosomething, stage_location="@mystage"
  ),
  warehouse="test_warehouse"
)
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task2)
Copy

このオブジェクトは dosomething という名前の関数を @mystage ステージのロケーションに指定します。 StoredProcedureCall オブジェクトでタスクを作成する場合は、 warehouse も指定する必要があります。

タスクの作成または更新

タスクを表す Task オブジェクトのプロパティを設定し、 TaskResource.create_or_update メソッドでオブジェクトをSnowflakeに渡すことで、既存のタスクの特性を更新できます。

タスクを作成するときに、新しいタスクを記述する Task オブジェクトを渡すこともできます。

次の例のコードは、タスクの名前、定義、スケジュールを設定してから、Snowflakeでタスクを更新するか、まだ存在しない場合はタスクを作成します。

from datetime import timedelta
from snowflake.core import Root
from snowflake.core.task import Task

tasks = root.databases["my_db"].schemas["my_schema"].tasks
tasks['my_task'].create_or_update(
  Task(name='my_task', definition='<sql query 2>', schedule=timedelta(hours=1))
)
Copy

タスクのリスト

TaskCollection.iter メソッドを使用してタスクを一覧表示できます。このメソッドは Task オブジェクトの PagedIter 反復子を返します。

次の例のコードは、名前が my で始まるタスクをリストします。

from snowflake.core import Root
from snowflake.core.task import Task
from snowflake.core.task import TaskCollection

root = Root(connection)

tasks: TaskCollection = root.databases["my_db"].schemas["my_schema"].tasks
task_iter = tasks.iter(like="my%")  # returns a PagedIter[Task]
for task_obj in task_iter:
  print(task_obj.name)
Copy

タスク操作の実行

タスクの実行、中断、再開といった一般的なタスク操作を TaskResource オブジェクトで行うことができます。

次の例のコードは、 my_task タスクを実行、中断、再開、削除します。

from snowflake.core import Root
from snowflake.core.task import Task

tasks = root.databases["my_db"].schemas["my_schema"].tasks
task_res = tasks['my_task']

task_res.execute()
task_res.suspend()
task_res.resume()
task_res.delete()
Copy

タスクグラフでのタスクの管理

タスクグラフで集められたタスクを管理することができます。タスクグラフは、依存関係によってまとめられた単一のルートタスクと追加のタスクで構成された、一連のタスクです。

タスクグラフ内のタスクに関する詳細については、 タスクグラフ をご参照ください。

タスクグラフの作成

タスクグラフを作成するには、まず DAG オブジェクトを作成し、名前とスケジュールなどのオプションのプロパティを指定します。タスクグラフのスケジュールは、 timedelta 値または Cron 式のいずれかを使用して定義できます。

次の例のコードは、Python関数 dosomething を定義し、タスクグラフで dag_task2 という名前の DAGTask オブジェクトとして関数を指定します。

from snowflake.core import Root
from snowflake.core.task import StoredProcedureCall
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation
from snowflake.snowpark import Session
from snowflake.snowpark.functions import sum as sum_

def dosomething(session: Session) -> None:
  df = session.table("target")
  df.group_by("a").agg(sum_("b")).save_as_table("agg_table")

with DAG("my_dag", schedule=timedelta(days=1)) as dag:
  # Create a task that runs some SQL.
  dag_task1 = DAGTask(
    "dagtask1",
    "MERGE INTO target USING source_stream WHEN MATCHED THEN UPDATE SET target.v = source_stream.v"
  )
  # Create a task that runs a Python function.
  dag_task2 = DAGTask(
    StoredProcedureCall(
      dosomething, stage_location="@mystage",
      packages=["snowflake-snowpark-python"]
    ),
    warehouse="test_warehouse"
  )
# Shift right and left operators can specify task relationships.
dag_task1 >> dag_task2  # dag_task1 is a predecessor of dag_task2
schema = root.databases["my_db"].schemas["my_schema"]
dag_op = DAGOperation(schema)
dag_op.deploy(dag)
Copy

このコードはまた、 SQL ステートメントを dag_task1 という別の DAGTask オブジェクトとして定義してから、 dag_task1dag_task2 の先行タスクとして指定します。最後に、 my_db データベースと my_schema スキーマでタスクグラフをSnowflakeに展開します。

cronスケジュール、タスクブランチ、関数の戻り値でタスクグラフを作成します。

また、タスクの戻り値として使用される指定されたcronスケジュール、タスクブランチ、関数の戻り値を持つタスクグラフを作成することもできます。

次の例のコードでは、 DAG オブジェクトを、そのスケジュールを指定する Cron オブジェクトで作成します。これは、 DAGTaskBranch オブジェクトを task1_branch という名前で他の DAGTask オブジェクトと一緒に定義し、それらの依存関係を指定します。

from snowflake.core import Root
from snowflake.core._common import CreateMode
from snowflake.core.task import Cron
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation, DAGTaskBranch
from snowflake.snowpark import Session

def task_handler(session: Session) -> None:
  pass  # do something

def task_branch_handler(session: Session) -> str:
  # do something
  return "task3"

try:
  with DAG(
    "my_dag",
    schedule=Cron("10 * * * *", "America/Los_Angeles"),
    stage_location="@mystage",
    packages=["snowflake-snowpark-python"],
    use_func_return_value=True,
  ) as dag:
    task1 = DAGTask(
      "task1",
      task_handler,
      warehouse=test_warehouse,
    )
    task1_branch = DAGTaskBranch("task1_branch", task_branch_handler, warehouse=test_warehouse)
    task2 = DAGTask("task2", task_handler, warehouse=test_warehouse)
    task3 = DAGTask("task3", task_handler, warehouse=test_warehouse, condition="1=1")
    task1 >> task1_branch
    task1_branch >> [task2, task3]
  schema = root.databases["my_db"].schemas["my_schema"]
  op = DAGOperation(schema)
  op.deploy(dag, mode=CreateMode.or_replace)
finally:
  session.close()
Copy

このコード例は、タスクハンドラー関数も定義し、タスクに割り当てられた指定のタスクハンドラーを持つ DAGTaskDAGTaskBranch の各オブジェクトを作成します。コードは DAG の use_func_return_value パラメーターを True に設定します。これはPython関数の戻り値を対応するタスクの戻り値として使うことを指定します。それ以外の場合、 use_func_return_value のデフォルト値は False となります。

タスクグラフにおけるタスクの戻り値の設定と取得

タスクの定義が StoredProcedureCall オブジェクトである場合、ストアドプロシージャ(または関数)のハンドラーは TaskContext オブジェクトを使用することで、タスクの戻り値を明示的に設定することができます。

詳細については、 SYSTEM$SET_RETURN_VALUE をご参照ください。

次の例のコードは、現在のセッションから TaskContext オブジェクトを context という名前で作成するタスクハンドラー関数を定義します。次に、 TaskContext.set_return_value メソッドを使用して、指定された文字列に戻り値を明示的に設定します。

from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session

def task_handler(session: Session) -> None:
  context = TaskContext(session)
  # this return value can be retrieved by successor Tasks.
  context.set_return_value("predecessor_return_value")
Copy

タスクグラフで、直前のタスクをその先行タスクとして識別する直後のタスクは、先行タスクによって明示的に設定された戻り値を取得できます。

詳細については、 SYSTEM$GET_PREDECESSOR_RETURN_VALUE をご参照ください。

次の例のコードは、 TaskContext.get_predecessor_return_value メソッドを使用して pred_task_name という先行タスクの戻り値を取得するタスクハンドラー関数を定義します。

from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session

def task_handler(session: Session) -> None:
  context = TaskContext(session)
  pred_return_value = context.get_predecessor_return_value("pred_task_name")
Copy