Python을 사용하여 Snowflake 작업 및 DAG 관리하기

Python을 사용하면 Snowflake Scripting 에서 SQL 문, 프로시저 호출, 논리를 실행할 수 있는 Snowflake 작업을 관리할 수 있습니다. 작업 개요는 작업 소개 섹션을 참조하십시오.

Snowflake Python API는 다음 두 가지 별개 유형의 작업을 나타냅니다.

  • Task: 일정, 매개 변수, 선행 작업과 같은 작업의 속성을 노출합니다.

  • TaskResource: 해당 Task 오브젝트를 가져오고 작업을 실행하고 작업을 변경하는 데 사용할 수 있는 메서드를 노출합니다.

이 항목의 예제에서는 Snowflake와 연결하고 Snowflake Python API를 사용할 Root 오브젝트를 생성하는 코드를 추가했다고 가정합니다. 자세한 내용은 Snowflake Python API를 사용하여 Snowflake에 연결하기 섹션을 참조하십시오.

다음 예제의 코드에서는 구성 파일에 정의된 연결 매개 변수를 사용하여 Snowflake에 대한 연결을 생성합니다. 해당 코드에서는 결과 Session 오브젝트를 사용하여 API의 유형과 메서드를 사용하기 위해 Root 오브젝트를 생성합니다.

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

작업 만들기

작업을 만들려면 먼저 Task 오브젝트를 생성하십시오. 그런 다음 작업을 생성할 데이터베이스와 스키마를 지정하여 TaskCollection 오브젝트를 생성합니다. TaskCollection.create 를 사용하여 Snowflake에 새 작업을 추가합니다.

다음 예제의 코드는 definition 매개 변수에 지정된 SQL 쿼리를 실행하는 my_task 라는 작업을 나타내는 Task 오브젝트를 생성합니다. my_db 데이터베이스와 my_schema 스키마에서 TaskCollection 변수 tasks 를 생성합니다. Snowflake에서 TaskCollection.create 를 사용하여 새 작업을 생성합니다.

이 코드 예제에서는 작업의 일정에 대해 1시간의 timedelta 값도 지정합니다. timedelta 값 또는 Cron 식을 사용하여 작업 일정을 정의할 수 있습니다.

from datetime import timedelta
from snowflake.core import Root
from snowflake.core.task import Task

my_task = Task(name='my_task', definition='<sql query>', schedule=timedelta(hours=1))
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task)
Copy

또한 Python 함수 또는 저장 프로시저를 실행하는 작업을 생성할 수도 있습니다. 다음 예제의 코드는 StoredProcedureCall 오브젝트로 표시되는 함수를 실행하는 my_task2 라는 작업을 생성합니다. 이 오브젝트는 @mystage 스테이지 위치에 있는 dosomething 이라는 함수를 지정합니다. StoredProcedureCall 오브젝트로 작업을 생성할 때 warehouse 도 지정해야 합니다.

from snowflake.core import Root
from snowflake.core.task import StoredProcedureCall, Task

my_task2 = Task(
  StoredProcedureCall(
      dosomething, stage_location="@mystage"
  ),
  warehouse="test_warehouse"
)
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task2)
Copy

작업 만들기 또는 업데이트하기

작업을 나타내는 Task 오브젝트의 속성을 설정한 다음 TaskResource.create_or_update 메서드로 이 오브젝트를 Snowflake에 전달하여 기존 작업의 특성을 업데이트할 수 있습니다.

작업을 생성하려는 경우 새 작업을 설명하는 Task 오브젝트를 전달할 수도 있습니다.

다음 예제의 코드는 작업의 이름, 정의, 일정을 설정한 다음 Snowflake에서 작업을 업데이트하여 작업이 아직 존재하지 않는다면 작업을 생성합니다.

from datetime import timedelta
from snowflake.core import Root
from snowflake.core.task import Task

tasks = root.databases["my_db"].schemas["my_schema"].tasks
tasks['my_task'].create_or_update(
  Task(name='my_task', definition='<sql query 2>', schedule=timedelta(hours=1))
)
Copy

작업 나열하기

TaskCollection.iter 메서드를 사용하여 작업을 나열할 수 있습니다. 이 메서드는 Task 오브젝트의 PagedIter 반복기를 반환합니다.

다음 예제의 코드로 이름에 《my》라는 텍스트가 포함되는 작업을 나열하고 각 작업의 이름을 인쇄합니다.

from snowflake.core import Root
from snowflake.core.task import Task
from snowflake.core.task import TaskCollection

root = Root(connection)

tasks: TaskCollection = root.databases["my_db"].schemas["my_schema"].tasks
task_iter = tasks.iter(like="my%")  # returns a PagedIter[Task]
for task_obj in task_iter:
  print(task_obj.name)
Copy

작업 수행하기

TaskResource 오브젝트를 사용하여 작업 실행, 일시 중단, 재개 등 일반적인 작업을 수행할 수 있습니다.

다음 예제의 코드는 my_task 작업을 실행, 일시 중단, 재개, 삭제합니다.

from snowflake.core import Root
from snowflake.core.task import Task

tasks = root.databases["my_db"].schemas["my_schema"].tasks
task_res = tasks['my_task']

task_res.execute()
task_res.suspend()
task_res.resume()
task_res.delete()
Copy

DAG에서 작업 관리하기

방향성 비순환 그래프(DAG)에서 수집된 작업을 관리할 수 있습니다. DAG는 단일 루트 작업과 종속성을 기준으로 구성되는 추가 작업을 포함한 일련의 작업입니다.

DAG의 작업에 대한 자세한 내용은 작업의 DAG 섹션을 참조하십시오.

작업의 DAG 만들기

작업의 DAG를 생성하려면 먼저 해당 이름과 기타 선택적 속성(예: 일정)을 지정하는 DAG 오브젝트를 생성하십시오. timedelta 값 또는 Cron 식을 사용하여 DAG 일정을 정의할 수 있습니다.

다음 예제의 코드는 Python 함수 dosomething 을 정의한 다음 DAG에서 함수를 dag_task2 라는 DAGTask 오브젝트로 지정합니다. 또한 SQL 문을 dag_task1 이라는 또 다른 DAGTask 오브젝트로 정의한 다음 dag_task1dag_task2 의 선행 작업으로 지정합니다. 마지막으로, my_db 데이터베이스 및 my_schema 스키마의 Snowflake에 DAG를 배포합니다.

from snowflake.core import Root
from snowflake.core.task import StoredProcedureCall
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation
from snowflake.snowpark import Session
from snowflake.snowpark.functions import sum as sum_

def dosomething(session: Session) -> None:
  df = session.table("target")
  df.group_by("a").agg(sum_("b")).save_as_table("agg_table")

with DAG("my_dag", schedule=timedelta(days=1)) as dag:
  # Create a task that runs some SQL.
  dag_task1 = DAGTask(
    "dagtask1",
    "MERGE INTO target USING source_stream WHEN MATCHED THEN UPDATE SET target.v = source_stream.v"
  )
  # Create a task that runs a Python function.
  dag_task2 = DAGTask(
    StoredProcedureCall(
      dosomething, stage_location="@mystage",
      packages=["snowflake-snowpark-python"]
    ),
    warehouse="test_warehouse"
  )
# Shift right and left operators can specify task relationships.
dag_task1 >> dag_task2  # dag_task1 is a predecessor of dag_task2
schema = root.databases["my_db"].schemas["my_schema"]
dag_op = DAGOperation(schema)
dag_op.deploy(dag)
Copy

Cron 일정, 작업 분기, 함수 반환 값으로 DAG 만들기

지정된 cron 일정, 작업 분기, 작업 반환 값으로 사용되는 함수 반환 값을 사용하여 DAG를 생성할 수도 있습니다.

다음 예제의 코드는 일정을 지정하는 Cron 오브젝트를 사용하여 DAG 오브젝트를 생성합니다. 이 코드는 다른 DAGTask 오브젝트와 함께 task1_branch 라는 DAGTaskBranch 오브젝트를 정의하고 서로의 종속성을 지정합니다.

또한 이 코드 예제에서는 작업 처리기 함수를 정의하고 작업에 할당된 지정된 작업 처리기를 사용하여 각 DAGTaskDAGTaskBranch 오브젝트를 생성합니다. 이 코드는 DAG의 use_func_return_value 매개 변수를 True 로 설정하는데, Python 함수의 반환 값을 해당 작업의 반환 값으로 사용하도록 지정하는 매개 변수입니다. 그렇지 않으면 use_func_return_value 의 기본값은 False 입니다.

from snowflake.core import Root
from snowflake.core._common import CreateMode
from snowflake.core.task import Cron
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation, DAGTaskBranch
from snowflake.snowpark import Session

def task_handler(session: Session) -> None:
  pass  # do something

def task_branch_handler(session: Session) -> str:
  # do something
  return "task3"

try:
  with DAG(
    "my_dag",
    schedule=Cron("10 * * * *", "America/Los_Angeles"),
    stage_location="@mystage",
    packages=["snowflake-snowpark-python"],
    use_func_return_value=True,
  ) as dag:
    task1 = DAGTask(
      "task1",
      task_handler,
      warehouse=test_warehouse,
    )
    task1_branch = DAGTaskBranch("task1_branch", task_branch_handler, warehouse=test_warehouse)
    task2 = DAGTask("task2", task_handler, warehouse=test_warehouse)
    task3 = DAGTask("task3", task_handler, warehouse=test_warehouse, condition="1=1")
    task1 >> task1_branch
    task1_branch >> [task2, task3]
  schema = root.databases["my_db"].schemas["my_schema"]
  op = DAGOperation(schema)
  op.deploy(dag, mode=CreateMode.or_replace)
finally:
  session.close()
Copy

DAG에서 작업의 반환 값을 설정하고 가져옵니다.

작업 정의가 StoredProcedureCall 오브젝트인 경우 저장 프로시저(또는 함수)의 처리기는 TaskContext 오브젝트를 사용하여 작업의 반환 값을 명시적으로 설정할 수 있습니다.

자세한 내용은 SYSTEM$SET_RETURN_VALUE 섹션을 참조하십시오.

다음 예제의 코드는 현재 세션에서 context 라는 TaskContext 오브젝트를 생성하는 작업 처리기 함수를 정의합니다. 그런 다음 TaskContext.set_return_value 메서드를 사용하여 반환 값을 지정된 문자열로 명시적으로 설정합니다.

from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session

def task_handler(session: Session) -> None:
  context = TaskContext(session)
  # this return value can be retrieved by successor Tasks.
  context.set_return_value("predecessor_return_value")
Copy

작업의 DAG에서 이전 작업을 선행 작업으로 식별하는 바로 다음의 후속 작업은 선행 작업에서 명시적으로 설정한 반환 값을 불러올 수 있습니다.

자세한 내용은 SYSTEM$GET_PREDECESSOR_RETURN_VALUE 섹션을 참조하십시오.

다음 예제의 코드는 TaskContext.get_predecessor_return_value 메서드를 사용하여 pred_task_name 이라는 선행 작업의 반환 값을 가져오는 작업 처리기 함수를 정의합니다.

from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session

def task_handler(session: Session) -> None:
  context = TaskContext(session)
  pred_return_value = context.get_predecessor_return_value("pred_task_name")
Copy