Gerenciamento de tarefas do Snowflake e DAGs com Python

Você pode usar Python para gerenciar tarefas do Snowflake, com as quais você pode executar instruções SQL, chamadas de procedimento e lógica no Script Snowflake. Para obter uma visão geral das tarefas, consulte Introdução às tarefas.

A Snowflake Python API representa tarefas com dois tipos separados:

  • Task: expõe as propriedades de uma tarefa, como cronograma, parâmetros e predecessores.

  • TaskResource: expõe métodos que você pode usar para buscar um objeto Task correspondente, executar a tarefa e alterá-la.

Os exemplos neste tópico pressupõem que você adicionou código para se conectar ao Snowflake e criou um objeto Root a partir do qual usar a Snowflake Python API. Para obter mais informações, consulte Conexão ao Snowflake com a Snowflake Python API.

O código no exemplo a seguir usa parâmetros de conexão definidos em um arquivo de configuração para criar uma conexão com o Snowflake. Usando o objeto Session resultante, o código cria um objeto Root para usar os tipos e métodos de API.

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

Criação de uma tarefa

Para criar uma tarefa, primeiro crie um objeto Task. Em seguida, especificando o banco de dados e o esquema no qual a tarefa será criada, crie um objeto TaskCollection. Usando TaskCollection.create, adicione a nova tarefa ao Snowflake.

O código no exemplo a seguir cria um objeto Task que representa uma tarefa chamada my_task que executa uma consulta SQL especificada no parâmetro definition. Ele cria uma variável TaskCollection tasks do banco de dados my_db e do esquema my_schema. Usando TaskCollection.create, ele cria uma nova tarefa no Snowflake.

Este exemplo de código também especifica um valor timedelta de uma hora para o cronograma da tarefa. Você pode definir o agendamento de uma tarefa usando um valor timedelta ou uma expressão Cron.

from datetime import timedelta
from snowflake.core import Root
from snowflake.core.task import Task

my_task = Task(name='my_task', definition='<sql query>', schedule=timedelta(hours=1))
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task)
Copy

Você também pode criar uma tarefa que execute uma função Python ou um procedimento armazenado. O código no exemplo a seguir cria uma tarefa chamada my_task2 que executa uma função representada por um objeto StoredProcedureCall. Este objeto especifica uma função chamada dosomething localizada no local do estágio @mystage. Você também deve especificar um warehouse ao criar uma tarefa com um objeto StoredProcedureCall.

from snowflake.core import Root
from snowflake.core.task import StoredProcedureCall, Task

my_task2 = Task(
  StoredProcedureCall(
      dosomething, stage_location="@mystage"
  ),
  warehouse="test_warehouse"
)
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task2)
Copy

Criação ou atualização de uma tarefa

Você pode atualizar as características de uma tarefa existente definindo propriedades de um objeto Task que representa a tarefa e, em seguida, passando o objeto para Snowflake com o método TaskResource.create_or_update.

Você também pode passar um objeto Task descrevendo uma nova tarefa quando quiser criá-la.

O código no exemplo a seguir define o nome, a definição e o cronograma de uma tarefa e, em seguida, atualiza a tarefa no Snowflake, criando a tarefa se ela ainda não existir.

from datetime import timedelta
from snowflake.core import Root
from snowflake.core.task import Task

tasks = root.databases["my_db"].schemas["my_schema"].tasks
tasks['my_task'].create_or_update(
  Task(name='my_task', definition='<sql query 2>', schedule=timedelta(hours=1))
)
Copy

Listagem de tarefas

Você pode listar tarefas usando o método TaskCollection.iter. O método retorna um iterador PagedIter de objetos Task.

O código no exemplo a seguir lista tarefas cujo nome inclui o texto “meu”, imprimindo o nome de cada uma.

from snowflake.core import Root
from snowflake.core.task import Task
from snowflake.core.task import TaskCollection

root = Root(connection)

tasks: TaskCollection = root.databases["my_db"].schemas["my_schema"].tasks
task_iter = tasks.iter(like="my%")  # returns a PagedIter[Task]
for task_obj in task_iter:
  print(task_obj.name)
Copy

Execução de operações de tarefa

Você pode realizar operações de tarefas comuns—como executar, suspender e retomar tarefas—com um objeto TaskResource.

O código no exemplo a seguir executa, suspende, retoma e exclui a tarefa my_task.

from snowflake.core import Root
from snowflake.core.task import Task

tasks = root.databases["my_db"].schemas["my_schema"].tasks
task_res = tasks['my_task']

task_res.execute()
task_res.suspend()
task_res.resume()
task_res.delete()
Copy

Gerenciamento de tarefas em um DAG

Você pode gerenciar tarefas coletadas em um gráfico acíclico direcionado (DAG). Um DAG é uma série de tarefas com uma tarefa raiz e tarefas adicionais, organizada por suas dependências.

Para obter mais informações sobre tarefas em um DAG, consulte DAG de tarefas.

Criação de um DAG de tarefas

Para criar um DAG de tarefas, primeiro crie um objeto DAG que especifique seu nome e outras propriedades opcionais, como seu cronograma. Você pode definir o cronograma de um DAG usando um valor timedelta ou uma expressão Cron.

O código no exemplo a seguir define uma função Python dosomething e, em seguida, especifica a função como um objeto DAGTask denominado dag_task2 em DAG. Ele também define uma instrução SQL como outro objeto DAGTask chamado dag_task1 e, em seguida, especifica dag_task1 como um predecessor de dag_task2. Por último, ele implementa o DAG no Snowflake no banco de dados my_db e no esquema my_schema.

from snowflake.core import Root
from snowflake.core.task import StoredProcedureCall
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation
from snowflake.snowpark import Session
from snowflake.snowpark.functions import sum as sum_

def dosomething(session: Session) -> None:
  df = session.table("target")
  df.group_by("a").agg(sum_("b")).save_as_table("agg_table")

with DAG("my_dag", schedule=timedelta(days=1)) as dag:
  # Create a task that runs some SQL.
  dag_task1 = DAGTask(
    "dagtask1",
    "MERGE INTO target USING source_stream WHEN MATCHED THEN UPDATE SET target.v = source_stream.v"
  )
  # Create a task that runs a Python function.
  dag_task2 = DAGTask(
    StoredProcedureCall(
      dosomething, stage_location="@mystage",
      packages=["snowflake-snowpark-python"]
    ),
    warehouse="test_warehouse"
  )
# Shift right and left operators can specify task relationships.
dag_task1 >> dag_task2  # dag_task1 is a predecessor of dag_task2
schema = root.databases["my_db"].schemas["my_schema"]
dag_op = DAGOperation(schema)
dag_op.deploy(dag)
Copy

Criação de um DAG com uma programação cron, ramificações de tarefas e valores de retorno de função

Você também pode criar um DAG com uma programação cron especificada, ramificações de tarefa e valores de retorno de função que são usados como valores de retorno de tarefa.

O código no exemplo a seguir cria um objeto DAG com um objeto Cron especificando sua programação. Ele define um objeto DAGTaskBranch chamado task1_branch junto com outros objetos DAGTask e especifica suas dependências entre si.

Este exemplo de código também define funções de manipulador de tarefas e cria cada objeto DAGTask e DAGTaskBranch com um manipulador de tarefas especificado atribuído à tarefa. O código define o parâmetro use_func_return_value de DAG como True, que especifica o uso do valor de retorno da função Python como o valor de retorno da tarefa correspondente. Caso contrário, o valor padrão de use_func_return_value é False.

from snowflake.core import Root
from snowflake.core._common import CreateMode
from snowflake.core.task import Cron
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation, DAGTaskBranch
from snowflake.snowpark import Session

def task_handler(session: Session) -> None:
  pass  # do something

def task_branch_handler(session: Session) -> str:
  # do something
  return "task3"

try:
  with DAG(
    "my_dag",
    schedule=Cron("10 * * * *", "America/Los_Angeles"),
    stage_location="@mystage",
    packages=["snowflake-snowpark-python"],
    use_func_return_value=True,
  ) as dag:
    task1 = DAGTask(
      "task1",
      task_handler,
      warehouse=test_warehouse,
    )
    task1_branch = DAGTaskBranch("task1_branch", task_branch_handler, warehouse=test_warehouse)
    task2 = DAGTask("task2", task_handler, warehouse=test_warehouse)
    task3 = DAGTask("task3", task_handler, warehouse=test_warehouse, condition="1=1")
    task1 >> task1_branch
    task1_branch >> [task2, task3]
  schema = root.databases["my_db"].schemas["my_schema"]
  op = DAGOperation(schema)
  op.deploy(dag, mode=CreateMode.or_replace)
finally:
  session.close()
Copy

Como definir e obter o valor de retorno de uma tarefa em um DAG

Quando a definição de uma tarefa é um objeto StoredProcedureCall, o manipulador do procedimento armazenado (ou função) pode definir explicitamente o valor de retorno da tarefa usando um objeto TaskContext.

Para obter mais informações, consulte SYSTEM$SET_RETURN_VALUE.

O código no exemplo a seguir define uma função de manipulador de tarefas que cria um objeto TaskContext chamado context da sessão atual. Em seguida, ele usa o método TaskContext.set_return_value para definir explicitamente o valor de retorno para uma cadeia de caracteres especificada.

from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session

def task_handler(session: Session) -> None:
  context = TaskContext(session)
  # this return value can be retrieved by successor Tasks.
  context.set_return_value("predecessor_return_value")
Copy

Em um DAG de tarefas, uma tarefa sucessora imediata que identifica a tarefa anterior como sua predecessora pode então recuperar o valor de retorno definido explicitamente pela tarefa predecessora.

Para obter mais informações, consulte SYSTEM$GET_PREDECESSOR_RETURN_VALUE.

O código no exemplo a seguir define uma função de manipulador de tarefas que usa o método TaskContext.get_predecessor_return_value para obter o valor de retorno da tarefa predecessora chamada pred_task_name.

from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session

def task_handler(session: Session) -> None:
  context = TaskContext(session)
  pred_return_value = context.get_predecessor_return_value("pred_task_name")
Copy