Gerenciamento de tarefas e gráficos de tarefas do Snowflake com Python¶
Você pode usar Python para gerenciar tarefas do Snowflake, com as quais você pode executar instruções SQL, chamadas de procedimento e lógica no Script Snowflake. Para obter uma visão geral das tarefas, consulte Introdução às tarefas.
A Snowflake Python API representa tarefas com dois tipos separados:
Task
: expõe as propriedades de uma tarefa, como cronograma, parâmetros e predecessores.TaskResource
: expõe métodos que você pode usar para buscar um objetoTask
correspondente, executar a tarefa e alterá-la.
Pré-requisitos¶
Os exemplos neste tópico pressupõem que você adicionou código para se conectar ao Snowflake e para criar um objeto Root
a partir do qual usar a Snowflake Python API.
Por exemplo, o seguinte código usa parâmetros de conexão definidos em um arquivo de configuração para criar uma conexão com o Snowflake:
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Usando o objeto Session
resultante, o código cria um objeto Root
para usar os tipos e métodos de API. Para obter mais informações, consulte Conexão ao Snowflake com a Snowflake Python API.
Criação de uma tarefa¶
Para criar uma tarefa, primeiro crie um objeto Task
. Em seguida, especificando o banco de dados e o esquema no qual a tarefa será criada, crie um objeto TaskCollection
. Usando TaskCollection.create
, adicione a nova tarefa ao Snowflake.
O código no exemplo a seguir cria um objeto Task
que representa uma tarefa chamada my_task
que executa uma consulta SQL especificada no parâmetro definition
:
from datetime import timedelta
from snowflake.core import Root
from snowflake.core.task import Task
my_task = Task(name='my_task', definition='<sql query>', schedule=timedelta(hours=1))
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task)
Este código cria uma variável TaskCollection
tasks
do banco de dados my_db
e do esquema my_schema
. Usando TaskCollection.create
, ele cria uma nova tarefa no Snowflake.
Este exemplo de código também especifica um valor timedelta
de uma hora para o cronograma da tarefa. Você pode definir o agendamento de uma tarefa usando um valor timedelta
ou uma expressão Cron
.
Você também pode criar uma tarefa que execute uma função Python ou um procedimento armazenado. O código no exemplo a seguir cria uma tarefa chamada my_task2
que executa uma função representada por um objeto StoredProcedureCall
:
from snowflake.core import Root
from snowflake.core.task import StoredProcedureCall, Task
my_task2 = Task(
StoredProcedureCall(
dosomething, stage_location="@mystage"
),
warehouse="test_warehouse"
)
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task2)
Este objeto especifica uma função chamada dosomething
localizada no local do estágio @mystage
. Você também deve especificar um warehouse
ao criar uma tarefa com um objeto StoredProcedureCall
.
Criação ou atualização de uma tarefa¶
Você pode atualizar as características de uma tarefa existente definindo propriedades de um objeto Task
que representa a tarefa e, em seguida, passando o objeto para Snowflake com o método TaskResource.create_or_update
.
Você também pode passar um objeto Task
descrevendo uma nova tarefa quando quiser criá-la.
O código no exemplo a seguir define o nome, a definição e o cronograma de uma tarefa e, em seguida, atualiza a tarefa no Snowflake ou cria a tarefa se ela ainda não existir:
from datetime import timedelta
from snowflake.core import Root
from snowflake.core.task import Task
tasks = root.databases["my_db"].schemas["my_schema"].tasks
tasks['my_task'].create_or_update(
Task(name='my_task', definition='<sql query 2>', schedule=timedelta(hours=1))
)
Listagem de tarefas¶
Você pode listar tarefas usando o método TaskCollection.iter
. O método retorna um iterador PagedIter
de objetos Task
.
O código no exemplo a seguir lista tarefas cujo nome começa com my:
from snowflake.core import Root
from snowflake.core.task import Task
from snowflake.core.task import TaskCollection
root = Root(connection)
tasks: TaskCollection = root.databases["my_db"].schemas["my_schema"].tasks
task_iter = tasks.iter(like="my%") # returns a PagedIter[Task]
for task_obj in task_iter:
print(task_obj.name)
Execução de operações de tarefa¶
Você pode realizar operações de tarefas comuns—como executar, suspender e retomar tarefas—com um objeto TaskResource
.
O código no exemplo a seguir executa, suspende, retoma e exclui a tarefa my_task
:
from snowflake.core import Root
from snowflake.core.task import Task
tasks = root.databases["my_db"].schemas["my_schema"].tasks
task_res = tasks['my_task']
task_res.execute()
task_res.suspend()
task_res.resume()
task_res.delete()
Gerenciamento de tarefas em um gráfico de tarefas¶
Você pode gerenciar tarefas coletadas em um gráfico de tarefas. Um gráfico de tarefas é uma série de tarefas com uma tarefa raiz e tarefas adicionais, organizada por suas dependências.
Para obter mais informações sobre tarefas em um gráfico de tarefas, consulte Gráficos da tarefa.
Criação de um gráfico de tarefas¶
Para criar um gráfico de tarefas, primeiro crie um objeto DAG
que especifique seu nome e outras propriedades opcionais, como seu cronograma. Você pode definir o agendamento de um gráfico de tarefas usando um valor timedelta
ou uma expressão Cron
.
O código no exemplo a seguir define uma função Python dosomething
e, em seguida, especifica a função como um objeto DAGTask
denominado dag_task2
no gráfico de tarefas.
from snowflake.core import Root
from snowflake.core.task import StoredProcedureCall
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation
from snowflake.snowpark import Session
from snowflake.snowpark.functions import sum as sum_
def dosomething(session: Session) -> None:
df = session.table("target")
df.group_by("a").agg(sum_("b")).save_as_table("agg_table")
with DAG("my_dag", schedule=timedelta(days=1)) as dag:
# Create a task that runs some SQL.
dag_task1 = DAGTask(
"dagtask1",
"MERGE INTO target USING source_stream WHEN MATCHED THEN UPDATE SET target.v = source_stream.v"
)
# Create a task that runs a Python function.
dag_task2 = DAGTask(
StoredProcedureCall(
dosomething, stage_location="@mystage",
packages=["snowflake-snowpark-python"]
),
warehouse="test_warehouse"
)
# Shift right and left operators can specify task relationships.
dag_task1 >> dag_task2 # dag_task1 is a predecessor of dag_task2
schema = root.databases["my_db"].schemas["my_schema"]
dag_op = DAGOperation(schema)
dag_op.deploy(dag)
Este código também define uma instrução SQL como outro objeto DAGTask
chamado dag_task1
e, em seguida, especifica dag_task1
como um predecessor de dag_task2
. Por fim, ele implementa o gráfico de tarefas no Snowflake no banco de dados my_db
e no esquema my_schema
.
Criação de um gráfico de tarefas com uma programação cron, ramificações de tarefas e valores de retorno de função¶
Você também pode criar um gráfico de tarefas com uma programação cron especificada, ramificações de tarefa e valores de retorno de função que são usados como valores de retorno de tarefa.
O código no exemplo a seguir cria um objeto DAG
com um objeto Cron
especificando sua programação. Ele define um objeto DAGTaskBranch
chamado task1_branch
junto com outros objetos DAGTask
e especifica suas dependências entre si:
from snowflake.core import Root
from snowflake.core._common import CreateMode
from snowflake.core.task import Cron
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation, DAGTaskBranch
from snowflake.snowpark import Session
def task_handler(session: Session) -> None:
pass # do something
def task_branch_handler(session: Session) -> str:
# do something
return "task3"
try:
with DAG(
"my_dag",
schedule=Cron("10 * * * *", "America/Los_Angeles"),
stage_location="@mystage",
packages=["snowflake-snowpark-python"],
use_func_return_value=True,
) as dag:
task1 = DAGTask(
"task1",
task_handler,
warehouse=test_warehouse,
)
task1_branch = DAGTaskBranch("task1_branch", task_branch_handler, warehouse=test_warehouse)
task2 = DAGTask("task2", task_handler, warehouse=test_warehouse)
task3 = DAGTask("task3", task_handler, warehouse=test_warehouse, condition="1=1")
task1 >> task1_branch
task1_branch >> [task2, task3]
schema = root.databases["my_db"].schemas["my_schema"]
op = DAGOperation(schema)
op.deploy(dag, mode=CreateMode.or_replace)
finally:
session.close()
Este exemplo de código também define funções de manipulador de tarefas e cria cada objeto DAGTask
e DAGTaskBranch
com um manipulador de tarefas especificado atribuído à tarefa. O código define o parâmetro use_func_return_value
de DAG como True
, que especifica o uso do valor de retorno da função Python como o valor de retorno da tarefa correspondente. Caso contrário, o valor padrão de use_func_return_value
é False
.
Configuração e obtenção do valor de retorno de uma tarefa em um gráfico de tarefas¶
Quando a definição de uma tarefa é um objeto StoredProcedureCall
, o manipulador do procedimento armazenado (ou função) pode definir explicitamente o valor de retorno da tarefa usando um objeto TaskContext
.
Para obter mais informações, consulte SYSTEM$SET_RETURN_VALUE.
O código no exemplo a seguir define uma função de manipulador de tarefas que cria um objeto TaskContext
chamado context
da sessão atual. Em seguida, ele usa o método TaskContext.set_return_value
para definir explicitamente o valor de retorno para uma cadeia de caracteres especificada.
from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session
def task_handler(session: Session) -> None:
context = TaskContext(session)
# this return value can be retrieved by successor Tasks.
context.set_return_value("predecessor_return_value")
Em um gráfico de tarefas, uma tarefa sucessora imediata que identifica a tarefa anterior como sua predecessora pode então recuperar o valor de retorno definido explicitamente pela tarefa predecessora.
Para obter mais informações, consulte SYSTEM$GET_PREDECESSOR_RETURN_VALUE.
O código no exemplo a seguir define uma função de manipulador de tarefas que usa o método TaskContext.get_predecessor_return_value
para obter o valor de retorno da tarefa predecessora chamada pred_task_name
:
from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session
def task_handler(session: Session) -> None:
context = TaskContext(session)
pred_return_value = context.get_predecessor_return_value("pred_task_name")