PythonによるSnowflakeタスクと DAGs の管理

Pythonを使用してSnowflakeタスクを管理し、 SQL ステートメント、プロシージャ呼び出し、および Snowflakeスクリプト のロジックを実行できます。タスクの概要については、 タスクの紹介 をご参照ください。

Snowflake Python API は、2つの別々のタイプでタスクを表します。

  • Task: スケジュール、パラメーター、先行タスクなどのタスクのプロパティを公開します。

  • TaskResource: 対応する Task オブジェクトのフェッチ、タスクの実行、タスクの変更に使用できるメソッドを公開します。

このトピックの例では、Snowflakeと接続するコードを追加して Root オブジェクトを作成し、そこからSnowflake Python API を使用することを想定しています。詳細については、 Snowflake Python API によるSnowflakeへの接続 をご参照ください。

次の例のコードは、構成ファイルで定義された接続パラメーターを使用して、Snowflakeへの接続を作成します。出来上がった Session オブジェクトを使って、コードは API のタイプとメソッドを使う Root オブジェクトを作成します。

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

タスクを作成する

タスクを作成するには、まず Task オブジェクトを作成します。次に、タスクを作成するデータベースとスキーマを指定して TaskCollection オブジェクトを作成します。 TaskCollection.create を使用して、新しいタスクをSnowflakeに追加します。

次の例のコードは、 definition パラメーターで指定された SQL クエリを実行する my_task というタスクを表す Task オブジェクトを作成します。これは my_db データベースと my_schema スキーマから TaskCollection 変数 tasks を作成します。 TaskCollection.create を使用して、Snowflakeに新しいタスクを作成します。

このコード例では、タスクのスケジュールに1時間の timedelta 値も指定します。タスクのスケジュールは timedelta 値または Cron 式のいずれかを使用して定義できます。

from datetime import timedelta
from snowflake.core import Root
from snowflake.core.task import Task

my_task = Task(name='my_task', definition='<sql query>', schedule=timedelta(hours=1))
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task)
Copy

Python関数やストアドプロシージャを実行するタスクを作成することもできます。次の例のコードは、 my_task2 というタスクを作成し、 StoredProcedureCall オブジェクトで表される関数を実行します。このオブジェクトは dosomething という名前の関数を @mystage ステージのロケーションに指定します。 StoredProcedureCall オブジェクトでタスクを作成する場合は、 warehouse も指定する必要があります。

from snowflake.core import Root
from snowflake.core.task import StoredProcedureCall, Task

my_task2 = Task(
  StoredProcedureCall(
      dosomething, stage_location="@mystage"
  ),
  warehouse="test_warehouse"
)
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task2)
Copy

タスクを作成または更新する

タスクを表す Task オブジェクトのプロパティを設定し、 TaskResource.create_or_update メソッドでオブジェクトをSnowflakeに渡すことで、既存のタスクの特性を更新できます。

タスクを作成するときに、新しいタスクを記述する Task オブジェクトを渡すこともできます。

次の例のコードは、タスクの名前、定義、スケジュールを設定した上で、Snowflakeでタスクを更新し、まだ存在しない場合はタスクを作成します。

from datetime import timedelta
from snowflake.core import Root
from snowflake.core.task import Task

tasks = root.databases["my_db"].schemas["my_schema"].tasks
tasks['my_task'].create_or_update(
  Task(name='my_task', definition='<sql query 2>', schedule=timedelta(hours=1))
)
Copy

タスクを一覧表示する

TaskCollection.iter メソッドを使用してタスクを一覧表示できます。このメソッドは Task オブジェクトの PagedIter 反復子を返します。

次の例のコードは、名前に「my」というテキストを含むタスクを一覧表示し、それぞれの名前を印刷します。

from snowflake.core import Root
from snowflake.core.task import Task
from snowflake.core.task import TaskCollection

root = Root(connection)

tasks: TaskCollection = root.databases["my_db"].schemas["my_schema"].tasks
task_iter = tasks.iter(like="my%")  # returns a PagedIter[Task]
for task_obj in task_iter:
  print(task_obj.name)
Copy

タスク操作を実行する

タスクの実行、中断、再開といった一般的なタスク操作を TaskResource オブジェクトで行うことができます。

次の例のコードは、 my_task タスクを実行、中断、再開、削除します。

from snowflake.core import Root
from snowflake.core.task import Task

tasks = root.databases["my_db"].schemas["my_schema"].tasks
task_res = tasks['my_task']

task_res.execute()
task_res.suspend()
task_res.resume()
task_res.delete()
Copy

DAG でのタスクの管理

有向非巡回グラフ(DAG)に集められたタスクを管理することができます。DAG は、単一のルートタスクと追加のタスクで構成され、依存関係によってまとめられた一連のタスクです。

DAG のタスクの詳細については、 タスクのDAG をご参照ください。

タスクの DAG を作成する

タスクの DAG を作成するには、まず DAG オブジェクトを作成し、名前とスケジュールなどのオプションのプロパティを指定します。DAG のスケジュールは timedelta 値か Cron 式のいずれかを使って定義できます。

次の例のコードは、Python関数 dosomething を定義し、 DAG で dag_task2 という名前の DAGTask オブジェクトとして関数を指定します。また、 SQL ステートメントを dag_task1 という別の DAGTask オブジェクトとして定義し、 dag_task1dag_task2 の先行タスクとして指定します。最後に、 DAG を my_db データベースと my_schema スキーマでSnowflakeに展開します。

from snowflake.core import Root
from snowflake.core.task import StoredProcedureCall
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation
from snowflake.snowpark import Session
from snowflake.snowpark.functions import sum as sum_

def dosomething(session: Session) -> None:
  df = session.table("target")
  df.group_by("a").agg(sum_("b")).save_as_table("agg_table")

with DAG("my_dag", schedule=timedelta(days=1)) as dag:
  # Create a task that runs some SQL.
  dag_task1 = DAGTask(
    "dagtask1",
    "MERGE INTO target USING source_stream WHEN MATCHED THEN UPDATE SET target.v = source_stream.v"
  )
  # Create a task that runs a Python function.
  dag_task2 = DAGTask(
    StoredProcedureCall(
      dosomething, stage_location="@mystage",
      packages=["snowflake-snowpark-python"]
    ),
    warehouse="test_warehouse"
  )
# Shift right and left operators can specify task relationships.
dag_task1 >> dag_task2  # dag_task1 is a predecessor of dag_task2
schema = root.databases["my_db"].schemas["my_schema"]
dag_op = DAGOperation(schema)
dag_op.deploy(dag)
Copy

cronスケジュール、タスクブランチ、関数の戻り値で DAG を作成します。

また、タスクの戻り値として使用される指定されたcronスケジュール、タスクブランチ、関数の戻り値を持つ DAG を作成することもできます。

次の例のコードでは、 DAG オブジェクトを、そのスケジュールを指定する Cron オブジェクトで作成します。これは、 DAGTaskBranch オブジェクトを task1_branch という名前で他の DAGTask オブジェクトと一緒に定義し、それらの依存関係を指定します。

このコード例では、タスクハンドラー関数も定義し、タスクに割り当てられた指定タスクハンドラーを持つ DAGTaskDAGTaskBranch の各オブジェクトを作成します。コードは DAG の use_func_return_value パラメーターを True に設定します。これはPython関数の戻り値を対応するタスクの戻り値として使うことを指定します。それ以外の場合、 use_func_return_value のデフォルト値は False となります。

from snowflake.core import Root
from snowflake.core._common import CreateMode
from snowflake.core.task import Cron
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation, DAGTaskBranch
from snowflake.snowpark import Session

def task_handler(session: Session) -> None:
  pass  # do something

def task_branch_handler(session: Session) -> str:
  # do something
  return "task3"

try:
  with DAG(
    "my_dag",
    schedule=Cron("10 * * * *", "America/Los_Angeles"),
    stage_location="@mystage",
    packages=["snowflake-snowpark-python"],
    use_func_return_value=True,
  ) as dag:
    task1 = DAGTask(
      "task1",
      task_handler,
      warehouse=test_warehouse,
    )
    task1_branch = DAGTaskBranch("task1_branch", task_branch_handler, warehouse=test_warehouse)
    task2 = DAGTask("task2", task_handler, warehouse=test_warehouse)
    task3 = DAGTask("task3", task_handler, warehouse=test_warehouse, condition="1=1")
    task1 >> task1_branch
    task1_branch >> [task2, task3]
  schema = root.databases["my_db"].schemas["my_schema"]
  op = DAGOperation(schema)
  op.deploy(dag, mode=CreateMode.or_replace)
finally:
  session.close()
Copy

DAG でタスクの戻り値を設定して取得します。

タスクの定義が StoredProcedureCall オブジェクトである場合、ストアドプロシージャ(または関数)のハンドラーは TaskContext オブジェクトを使用することで、タスクの戻り値を明示的に設定することができます。

詳細については、 SYSTEM$SET_RETURN_VALUE をご参照ください。

次の例のコードは、現在のセッションから TaskContext オブジェクトを context という名前で作成するタスクハンドラー関数を定義します。次に、 TaskContext.set_return_value メソッドを使用して、戻り値を指定された文字列に明示的に設定します。

from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session

def task_handler(session: Session) -> None:
  context = TaskContext(session)
  # this return value can be retrieved by successor Tasks.
  context.set_return_value("predecessor_return_value")
Copy

タスクの DAG において、直前のタスクをその先行タスクとして識別する直後のタスクは、先行タスクによって明示的に設定された戻り値を取得できます。

詳細については、 SYSTEM$GET_PREDECESSOR_RETURN_VALUE をご参照ください。

次の例のコードは、 TaskContext.get_predecessor_return_value メソッドを使って pred_task_name という先行タスクの戻り値を取得するタスクハンドラー関数を定義します。

from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session

def task_handler(session: Session) -> None:
  context = TaskContext(session)
  pred_return_value = context.get_predecessor_return_value("pred_task_name")
Copy