Python을 사용하여 Snowflake 작업 및 작업 그래프 관리하기

Python을 사용하면 Snowflake Scripting 에서 SQL 문, 프로시저 호출, 논리를 실행할 수 있는 Snowflake 작업을 관리할 수 있습니다. 작업 개요는 작업 소개 섹션을 참조하십시오.

Snowflake Python API는 다음 두 가지 별개 유형의 작업을 나타냅니다.

  • Task: 일정, 매개 변수, 선행 작업과 같은 작업의 속성을 노출합니다.

  • TaskResource: 해당 Task 오브젝트를 가져오고 작업을 실행하고 작업을 변경하는 데 사용할 수 있는 메서드를 노출합니다.

전제 조건

이 항목의 예제에서는 Snowflake와 연결하고 Snowflake Python API를 사용할 Root 오브젝트를 생성하는 코드를 추가했다고 가정합니다.

예를 들어, 다음 코드는 구성 파일에 정의된 연결 매개 변수를 사용하여 Snowflake에 대한 연결을 생성합니다.

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

해당 코드에서는 결과 Session 오브젝트를 사용하여 API의 유형과 메서드를 사용하기 위해 Root 오브젝트를 생성합니다. 자세한 내용은 Snowflake Python API를 사용하여 Snowflake에 연결하기 섹션을 참조하십시오.

작업 만들기

작업을 만들려면 먼저 Task 오브젝트를 생성하십시오. 그런 다음 작업을 생성할 데이터베이스와 스키마를 지정하여 TaskCollection 오브젝트를 생성합니다. TaskCollection.create 를 사용하여 Snowflake에 새 작업을 추가합니다.

다음 예제의 코드는 definition 매개 변수에 지정된 SQL 쿼리를 실행하는 my_task 라는 작업을 나타내는 Task 오브젝트를 생성합니다.

from datetime import timedelta
from snowflake.core import Root
from snowflake.core.task import Task

my_task = Task(name='my_task', definition='<sql query>', schedule=timedelta(hours=1))
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task)
Copy

이 코드는 my_db 데이터베이스와 my_schema 스키마에서 TaskCollection 변수 tasks 를 생성합니다. Snowflake에서 TaskCollection.create 를 사용하여 새 작업을 생성합니다.

이 코드 예제에서는 작업의 일정에 대해 1시간의 timedelta 값도 지정합니다. timedelta 값 또는 Cron 식을 사용하여 작업 일정을 정의할 수 있습니다.

또한 Python 함수 또는 저장 프로시저를 실행하는 작업을 생성할 수도 있습니다. 다음 예제의 코드는 StoredProcedureCall 오브젝트로 표시되는 함수를 실행하는 my_task2 라는 작업을 생성합니다.

from snowflake.core import Root
from snowflake.core.task import StoredProcedureCall, Task

my_task2 = Task(
  StoredProcedureCall(
      dosomething, stage_location="@mystage"
  ),
  warehouse="test_warehouse"
)
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task2)
Copy

이 오브젝트는 @mystage 스테이지 위치에 있는 dosomething 이라는 함수를 지정합니다. StoredProcedureCall 오브젝트로 작업을 생성할 때 warehouse 도 지정해야 합니다.

작업 생성 또는 업데이트하기

작업을 나타내는 Task 오브젝트의 속성을 설정한 다음 TaskResource.create_or_update 메서드로 이 오브젝트를 Snowflake에 전달하여 기존 작업의 특성을 업데이트할 수 있습니다.

작업을 생성하려는 경우 새 작업을 설명하는 Task 오브젝트를 전달할 수도 있습니다.

다음 예제의 코드는 작업의 이름, 정의, 일정을 설정한 다음 Snowflake에서 작업을 업데이트하거나 작업이 아직 존재하지 않는다면 작업을 생성합니다.

from datetime import timedelta
from snowflake.core import Root
from snowflake.core.task import Task

tasks = root.databases["my_db"].schemas["my_schema"].tasks
tasks['my_task'].create_or_update(
  Task(name='my_task', definition='<sql query 2>', schedule=timedelta(hours=1))
)
Copy

작업 나열하기

TaskCollection.iter 메서드를 사용하여 작업을 나열할 수 있습니다. 이 메서드는 Task 오브젝트의 PagedIter 반복기를 반환합니다.

다음 예제의 코드로 이름이 my 로 시작하는 작업을 나열합니다.

from snowflake.core import Root
from snowflake.core.task import Task
from snowflake.core.task import TaskCollection

root = Root(connection)

tasks: TaskCollection = root.databases["my_db"].schemas["my_schema"].tasks
task_iter = tasks.iter(like="my%")  # returns a PagedIter[Task]
for task_obj in task_iter:
  print(task_obj.name)
Copy

작업 수행하기

TaskResource 오브젝트를 사용하여 작업 실행, 일시 중단, 재개 등 일반적인 작업을 수행할 수 있습니다.

다음 예제의 코드는 my_task 작업을 실행, 일시 중단, 재개, 삭제합니다.

from snowflake.core import Root
from snowflake.core.task import Task

tasks = root.databases["my_db"].schemas["my_schema"].tasks
task_res = tasks['my_task']

task_res.execute()
task_res.suspend()
task_res.resume()
task_res.delete()
Copy

작업 그래프에서 작업 관리하기

작업 그래프에서 수집된 작업을 관리할 수 있습니다. 작업 그래프는 단일 루트 작업과 종속성을 기준으로 구성되는 추가 작업을 포함한 일련의 작업입니다.

작업 그래프의 작업에 대한 자세한 내용은 작업 그래프 섹션을 참조하십시오.

작업 그래프 만들기

작업 그래프를 생성하려면 먼저 해당 이름과 기타 선택적 속성(예: 일정)을 지정하는 DAG 오브젝트를 생성하십시오. timedelta 값 또는 Cron 식을 사용하여 작업 그래프의 일정을 정의할 수 있습니다.

다음 예제의 코드는 Python 함수 dosomething 을 정의한 다음 작업 그래프에서 함수를 dag_task2 라는 DAGTask 오브젝트로 지정합니다.

from snowflake.core import Root
from snowflake.core.task import StoredProcedureCall
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation
from snowflake.snowpark import Session
from snowflake.snowpark.functions import sum as sum_

def dosomething(session: Session) -> None:
  df = session.table("target")
  df.group_by("a").agg(sum_("b")).save_as_table("agg_table")

with DAG("my_dag", schedule=timedelta(days=1)) as dag:
  # Create a task that runs some SQL.
  dag_task1 = DAGTask(
    "dagtask1",
    "MERGE INTO target USING source_stream WHEN MATCHED THEN UPDATE SET target.v = source_stream.v"
  )
  # Create a task that runs a Python function.
  dag_task2 = DAGTask(
    StoredProcedureCall(
      dosomething, stage_location="@mystage",
      packages=["snowflake-snowpark-python"]
    ),
    warehouse="test_warehouse"
  )
# Shift right and left operators can specify task relationships.
dag_task1 >> dag_task2  # dag_task1 is a predecessor of dag_task2
schema = root.databases["my_db"].schemas["my_schema"]
dag_op = DAGOperation(schema)
dag_op.deploy(dag)
Copy

또한 이 코드는 SQL 문을 dag_task1 이라는 또 다른 DAGTask 오브젝트로 정의한 다음 dag_task1dag_task2 의 선행 작업으로 지정합니다. 마지막으로, my_db 데이터베이스 및 my_schema 스키마의 Snowflake에 작업 그래프를 배포합니다.

Cron 일정, 작업 분기, 함수 반환 값으로 작업 그래프 만들기

지정된 cron 일정, 작업 분기, 작업 반환 값으로 사용되는 함수 반환 값을 사용하여 작업 그래프를 생성할 수도 있습니다.

다음 예제의 코드는 일정을 지정하는 Cron 오브젝트를 사용하여 DAG 오브젝트를 생성합니다. 이 코드는 다른 DAGTask 오브젝트와 함께 task1_branch 라는 DAGTaskBranch 오브젝트를 정의하고 서로의 종속성을 지정합니다.

from snowflake.core import Root
from snowflake.core._common import CreateMode
from snowflake.core.task import Cron
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation, DAGTaskBranch
from snowflake.snowpark import Session

def task_handler(session: Session) -> None:
  pass  # do something

def task_branch_handler(session: Session) -> str:
  # do something
  return "task3"

try:
  with DAG(
    "my_dag",
    schedule=Cron("10 * * * *", "America/Los_Angeles"),
    stage_location="@mystage",
    packages=["snowflake-snowpark-python"],
    use_func_return_value=True,
  ) as dag:
    task1 = DAGTask(
      "task1",
      task_handler,
      warehouse=test_warehouse,
    )
    task1_branch = DAGTaskBranch("task1_branch", task_branch_handler, warehouse=test_warehouse)
    task2 = DAGTask("task2", task_handler, warehouse=test_warehouse)
    task3 = DAGTask("task3", task_handler, warehouse=test_warehouse, condition="1=1")
    task1 >> task1_branch
    task1_branch >> [task2, task3]
  schema = root.databases["my_db"].schemas["my_schema"]
  op = DAGOperation(schema)
  op.deploy(dag, mode=CreateMode.or_replace)
finally:
  session.close()
Copy

또한 이 코드 예제에서는 작업 처리기 함수를 정의하고 작업에 할당된 지정된 작업 처리기를 사용하여 각 DAGTaskDAGTaskBranch 오브젝트를 생성합니다. 이 코드는 DAG의 use_func_return_value 매개 변수를 True 로 설정하는데, Python 함수의 반환 값을 해당 작업의 반환 값으로 사용하도록 지정하는 매개 변수입니다. 그렇지 않으면 use_func_return_value 의 기본값은 False 입니다.

작업 그래프에서 작업의 반환 값 설정 및 가져오기

작업 정의가 StoredProcedureCall 오브젝트인 경우 저장 프로시저(또는 함수)의 처리기는 TaskContext 오브젝트를 사용하여 작업의 반환 값을 명시적으로 설정할 수 있습니다.

자세한 내용은 SYSTEM$SET_RETURN_VALUE 섹션을 참조하십시오.

다음 예제의 코드는 현재 세션에서 context 라는 TaskContext 오브젝트를 생성하는 작업 처리기 함수를 정의합니다. 그런 다음 TaskContext.set_return_value 메서드를 사용하여 반환 값을 지정된 문자열로 명시적으로 설정합니다.

from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session

def task_handler(session: Session) -> None:
  context = TaskContext(session)
  # this return value can be retrieved by successor Tasks.
  context.set_return_value("predecessor_return_value")
Copy

작업 그래프에서 이전 작업을 선행 작업으로 식별하는 바로 다음의 후속 작업은 선행 작업에서 명시적으로 설정한 반환 값을 불러올 수 있습니다.

자세한 내용은 SYSTEM$GET_PREDECESSOR_RETURN_VALUE 섹션을 참조하십시오.

다음 예제의 코드는 TaskContext.get_predecessor_return_value 메서드를 사용하여 pred_task_name 이라는 선행 작업의 반환 값을 가져오는 작업 처리기 함수를 정의합니다.

from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session

def task_handler(session: Session) -> None:
  context = TaskContext(session)
  pred_return_value = context.get_predecessor_return_value("pred_task_name")
Copy