Python을 사용하여 Snowflake 작업 및 작업 그래프 관리하기¶
Python을 사용하면 Snowflake Scripting 에서 SQL 문, 프로시저 호출, 논리를 실행할 수 있는 Snowflake 작업을 관리할 수 있습니다. 작업 개요는 작업 소개 섹션을 참조하십시오.
Snowflake Python API는 다음 두 가지 별개 유형의 작업을 나타냅니다.
Task
: 일정, 매개 변수, 선행 작업과 같은 작업의 속성을 노출합니다.TaskResource
: 해당Task
오브젝트를 가져오고 작업을 실행하고 작업을 변경하는 데 사용할 수 있는 메서드를 노출합니다.
전제 조건¶
이 항목의 예제에서는 Snowflake와 연결하고 Snowflake Python API를 사용할 Root
오브젝트를 생성하는 코드를 추가했다고 가정합니다.
예를 들어, 다음 코드는 구성 파일에 정의된 연결 매개 변수를 사용하여 Snowflake에 대한 연결을 생성합니다.
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
해당 코드에서는 결과 Session
오브젝트를 사용하여 API의 유형과 메서드를 사용하기 위해 Root
오브젝트를 생성합니다. 자세한 내용은 Snowflake Python API를 사용하여 Snowflake에 연결하기 섹션을 참조하십시오.
작업 만들기¶
작업을 만들려면 먼저 Task
오브젝트를 생성하십시오. 그런 다음 작업을 생성할 데이터베이스와 스키마를 지정하여 TaskCollection
오브젝트를 생성합니다. TaskCollection.create
를 사용하여 Snowflake에 새 작업을 추가합니다.
다음 예제의 코드는 definition
매개 변수에 지정된 SQL 쿼리를 실행하는 my_task
라는 작업을 나타내는 Task
오브젝트를 생성합니다.
from datetime import timedelta
from snowflake.core import Root
from snowflake.core.task import Task
my_task = Task(name='my_task', definition='<sql query>', schedule=timedelta(hours=1))
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task)
이 코드는 my_db
데이터베이스와 my_schema
스키마에서 TaskCollection
변수 tasks
를 생성합니다. Snowflake에서 TaskCollection.create
를 사용하여 새 작업을 생성합니다.
이 코드 예제에서는 작업의 일정에 대해 1시간의 timedelta
값도 지정합니다. timedelta
값 또는 Cron
식을 사용하여 작업 일정을 정의할 수 있습니다.
또한 Python 함수 또는 저장 프로시저를 실행하는 작업을 생성할 수도 있습니다. 다음 예제의 코드는 StoredProcedureCall
오브젝트로 표시되는 함수를 실행하는 my_task2
라는 작업을 생성합니다.
from snowflake.core import Root
from snowflake.core.task import StoredProcedureCall, Task
my_task2 = Task(
StoredProcedureCall(
dosomething, stage_location="@mystage"
),
warehouse="test_warehouse"
)
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task2)
이 오브젝트는 @mystage
스테이지 위치에 있는 dosomething
이라는 함수를 지정합니다. StoredProcedureCall
오브젝트로 작업을 생성할 때 warehouse
도 지정해야 합니다.
작업 생성 또는 업데이트하기¶
작업을 나타내는 Task
오브젝트의 속성을 설정한 다음 TaskResource.create_or_update
메서드로 이 오브젝트를 Snowflake에 전달하여 기존 작업의 특성을 업데이트할 수 있습니다.
작업을 생성하려는 경우 새 작업을 설명하는 Task
오브젝트를 전달할 수도 있습니다.
다음 예제의 코드는 작업의 이름, 정의, 일정을 설정한 다음 Snowflake에서 작업을 업데이트하거나 작업이 아직 존재하지 않는다면 작업을 생성합니다.
from datetime import timedelta
from snowflake.core import Root
from snowflake.core.task import Task
tasks = root.databases["my_db"].schemas["my_schema"].tasks
tasks['my_task'].create_or_update(
Task(name='my_task', definition='<sql query 2>', schedule=timedelta(hours=1))
)
작업 나열하기¶
TaskCollection.iter
메서드를 사용하여 작업을 나열할 수 있습니다. 이 메서드는 Task
오브젝트의 PagedIter
반복기를 반환합니다.
다음 예제의 코드로 이름이 my 로 시작하는 작업을 나열합니다.
from snowflake.core import Root
from snowflake.core.task import Task
from snowflake.core.task import TaskCollection
root = Root(connection)
tasks: TaskCollection = root.databases["my_db"].schemas["my_schema"].tasks
task_iter = tasks.iter(like="my%") # returns a PagedIter[Task]
for task_obj in task_iter:
print(task_obj.name)
작업 수행하기¶
TaskResource
오브젝트를 사용하여 작업 실행, 일시 중단, 재개 등 일반적인 작업을 수행할 수 있습니다.
다음 예제의 코드는 my_task
작업을 실행, 일시 중단, 재개, 삭제합니다.
from snowflake.core import Root
from snowflake.core.task import Task
tasks = root.databases["my_db"].schemas["my_schema"].tasks
task_res = tasks['my_task']
task_res.execute()
task_res.suspend()
task_res.resume()
task_res.delete()
작업 그래프에서 작업 관리하기¶
작업 그래프에서 수집된 작업을 관리할 수 있습니다. 작업 그래프는 단일 루트 작업과 종속성을 기준으로 구성되는 추가 작업을 포함한 일련의 작업입니다.
작업 그래프의 작업에 대한 자세한 내용은 작업 그래프 섹션을 참조하십시오.
작업 그래프 만들기¶
작업 그래프를 생성하려면 먼저 해당 이름과 기타 선택적 속성(예: 일정)을 지정하는 DAG
오브젝트를 생성하십시오. timedelta
값 또는 Cron
식을 사용하여 작업 그래프의 일정을 정의할 수 있습니다.
다음 예제의 코드는 Python 함수 dosomething
을 정의한 다음 작업 그래프에서 함수를 dag_task2
라는 DAGTask
오브젝트로 지정합니다.
from snowflake.core import Root
from snowflake.core.task import StoredProcedureCall
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation
from snowflake.snowpark import Session
from snowflake.snowpark.functions import sum as sum_
def dosomething(session: Session) -> None:
df = session.table("target")
df.group_by("a").agg(sum_("b")).save_as_table("agg_table")
with DAG("my_dag", schedule=timedelta(days=1)) as dag:
# Create a task that runs some SQL.
dag_task1 = DAGTask(
"dagtask1",
"MERGE INTO target USING source_stream WHEN MATCHED THEN UPDATE SET target.v = source_stream.v"
)
# Create a task that runs a Python function.
dag_task2 = DAGTask(
StoredProcedureCall(
dosomething, stage_location="@mystage",
packages=["snowflake-snowpark-python"]
),
warehouse="test_warehouse"
)
# Shift right and left operators can specify task relationships.
dag_task1 >> dag_task2 # dag_task1 is a predecessor of dag_task2
schema = root.databases["my_db"].schemas["my_schema"]
dag_op = DAGOperation(schema)
dag_op.deploy(dag)
또한 이 코드는 SQL 문을 dag_task1
이라는 또 다른 DAGTask
오브젝트로 정의한 다음 dag_task1
을 dag_task2
의 선행 작업으로 지정합니다. 마지막으로, my_db
데이터베이스 및 my_schema
스키마의 Snowflake에 작업 그래프를 배포합니다.
Cron 일정, 작업 분기, 함수 반환 값으로 작업 그래프 만들기¶
지정된 cron 일정, 작업 분기, 작업 반환 값으로 사용되는 함수 반환 값을 사용하여 작업 그래프를 생성할 수도 있습니다.
다음 예제의 코드는 일정을 지정하는 Cron
오브젝트를 사용하여 DAG
오브젝트를 생성합니다. 이 코드는 다른 DAGTask
오브젝트와 함께 task1_branch
라는 DAGTaskBranch
오브젝트를 정의하고 서로의 종속성을 지정합니다.
from snowflake.core import Root
from snowflake.core._common import CreateMode
from snowflake.core.task import Cron
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation, DAGTaskBranch
from snowflake.snowpark import Session
def task_handler(session: Session) -> None:
pass # do something
def task_branch_handler(session: Session) -> str:
# do something
return "task3"
try:
with DAG(
"my_dag",
schedule=Cron("10 * * * *", "America/Los_Angeles"),
stage_location="@mystage",
packages=["snowflake-snowpark-python"],
use_func_return_value=True,
) as dag:
task1 = DAGTask(
"task1",
task_handler,
warehouse=test_warehouse,
)
task1_branch = DAGTaskBranch("task1_branch", task_branch_handler, warehouse=test_warehouse)
task2 = DAGTask("task2", task_handler, warehouse=test_warehouse)
task3 = DAGTask("task3", task_handler, warehouse=test_warehouse, condition="1=1")
task1 >> task1_branch
task1_branch >> [task2, task3]
schema = root.databases["my_db"].schemas["my_schema"]
op = DAGOperation(schema)
op.deploy(dag, mode=CreateMode.or_replace)
finally:
session.close()
또한 이 코드 예제에서는 작업 처리기 함수를 정의하고 작업에 할당된 지정된 작업 처리기를 사용하여 각 DAGTask
및 DAGTaskBranch
오브젝트를 생성합니다. 이 코드는 DAG의 use_func_return_value
매개 변수를 True
로 설정하는데, Python 함수의 반환 값을 해당 작업의 반환 값으로 사용하도록 지정하는 매개 변수입니다. 그렇지 않으면 use_func_return_value
의 기본값은 False
입니다.
작업 그래프에서 작업의 반환 값 설정 및 가져오기¶
작업 정의가 StoredProcedureCall
오브젝트인 경우 저장 프로시저(또는 함수)의 처리기는 TaskContext
오브젝트를 사용하여 작업의 반환 값을 명시적으로 설정할 수 있습니다.
자세한 내용은 SYSTEM$SET_RETURN_VALUE 섹션을 참조하십시오.
다음 예제의 코드는 현재 세션에서 context
라는 TaskContext
오브젝트를 생성하는 작업 처리기 함수를 정의합니다. 그런 다음 TaskContext.set_return_value
메서드를 사용하여 반환 값을 지정된 문자열로 명시적으로 설정합니다.
from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session
def task_handler(session: Session) -> None:
context = TaskContext(session)
# this return value can be retrieved by successor Tasks.
context.set_return_value("predecessor_return_value")
작업 그래프에서 이전 작업을 선행 작업으로 식별하는 바로 다음의 후속 작업은 선행 작업에서 명시적으로 설정한 반환 값을 불러올 수 있습니다.
자세한 내용은 SYSTEM$GET_PREDECESSOR_RETURN_VALUE 섹션을 참조하십시오.
다음 예제의 코드는 TaskContext.get_predecessor_return_value
메서드를 사용하여 pred_task_name
이라는 선행 작업의 반환 값을 가져오는 작업 처리기 함수를 정의합니다.
from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session
def task_handler(session: Session) -> None:
context = TaskContext(session)
pred_return_value = context.get_predecessor_return_value("pred_task_name")