Tutorial 2: Crie e gerencie tarefas e gráficos de tarefa (DAGs)¶
Introdução¶
Neste tutorial, você cria e usa tarefas Snowflake para gerenciar alguns procedimentos armazenados básicos. Você também cria um gráfico de tarefa – também chamado de gráfico acíclico dirigido (DAG) – para orquestrar tarefas com uma API de gráfico de tarefa de nível superior.
Pré-requisitos¶
Nota
Se você já concluiu Configuração comum para tutoriais das APIs Snowflake Python e Tutorial 1: Crie um banco de dados, esquema, tabela e warehouse, pode ignorar esses pré-requisitos e prosseguir para a primeira etapa deste tutorial.
Antes de iniciar este tutorial, é necessário concluir as seguintes etapas:
Siga as instruções comuns de configuração, que incluem as seguintes etapas:
Configure seu ambiente de desenvolvimento.
Instale o pacote Snowflake Python APIs.
Configure sua conexão Snowflake.
Importe todos os módulos necessários para os tutoriais da API Python.
Crie um objeto
Root
da API.
Execute o código a seguir para criar um banco de dados nomeado
PYTHON_API_DB
e um esquema nomeadoPYTHON_API_SCHEMA
nesse banco de dados.database = root.databases.create( Database( name="PYTHON_API_DB"), mode=CreateMode.or_replace ) schema = database.schemas.create( Schema( name="PYTHON_API_SCHEMA"), mode=CreateMode.or_replace, )
Esses são os mesmos objetos de banco de dados e esquema que você criou no Tutorial 1.
Após concluir esses pré-requisitos, você estará pronto para começar a usar API para gerenciamento de tarefa.
Configuração de objetos Snowflake¶
Configure os procedimentos armazenados que suas tarefas invocarão e o estágio que conterá os procedimentos armazenados. É possível usar seu objeto root
do Snowflake Python APIs para criar um estágio no banco de dados PYTHON_API_DB
e no esquema PYTHON_API_SCHEMA
criado anteriormente.
Para criar um estágio chamado
TASKS_STAGE
, na próxima célula do notebook, execute o seguinte código:stages = root.databases[database.name].schemas[schema.name].stages stages.create(Stage(name="TASKS_STAGE"))
Este estágio manterá os procedimentos armazenados e quaisquer dependências que esses procedimentos precisem.
Para criar duas funções básicas do Python, que as tarefas executarão como procedimentos armazenados, na próxima célula, execute o seguinte código:
def trunc(session: Session, from_table: str, to_table: str, count: int) -> str: ( session .table(from_table) .limit(count) .write.save_as_table(to_table) ) return "Truncated table successfully created!" def filter_by_shipmode(session: Session, mode: str) -> str: ( session .table("snowflake_sample_data.tpch_sf100.lineitem") .filter(col("L_SHIPMODE") == mode) .limit(10) .write.save_as_table("filter_table") ) return "Filter table successfully created!"
Essas funções fazem o seguinte:
trunc()
: Cria uma versão truncada de uma tabela de entrada.filter_by_shipmode()
: Filtra a tabelaSNOWFLAKE_SAMPLE_DATA.TPCH_SF100.LINEITEM
por modo de envio, limita os resultados a 10 linhas e grava os resultados em uma nova tabela.Nota
Esta função consulta os dados de amostra TPC-H no banco de dados SNOWFLAKE_SAMPLE_DATA. O Snowflake cria o banco de dados de amostra em novas contas por padrão. Se o banco de dados não foi criado em sua conta, consulte Uso do banco de dados de amostra.
As funções são intencionalmente básicas e destinadas a fins de demonstração.
Criação e gerenciamento de tarefas¶
Defina, crie e gerencie duas tarefas que executarão suas funções Python criadas anteriormente como procedimentos armazenados.
Para definir as duas tarefas,
task1
etask2
, na próxima célula do notebook, execute o seguinte código:tasks_stage = f"{database.name}.{schema.name}.TASKS_STAGE" task1 = Task( name="task_python_api_trunc", definition=StoredProcedureCall( func=trunc, stage_location=f"@{tasks_stage}", packages=["snowflake-snowpark-python"], ), warehouse="COMPUTE_WH", schedule=timedelta(minutes=1) ) task2 = Task( name="task_python_api_filter", definition=StoredProcedureCall( func=filter_by_shipmode, stage_location=f"@{tasks_stage}", packages=["snowflake-snowpark-python"], ), warehouse="COMPUTE_WH" )
Neste código, você especifica os seguintes parâmetros de tarefa:
Para cada tarefa, uma definição representada por um objeto StoredProcedureCall que inclui os seguintes atributos:
A função chamável a ser executada
O local do estágio onde o conteúdo da função Python e suas dependências são carregados
Dependências do pacote do procedimento armazenado
Um warehouse para executar o procedimento armazenado (necessário ao criar uma tarefa com um objeto
StoredProcedureCall
). Este tutorial usa o warehouseCOMPUTE_WH
incluído em sua conta de avaliação.Um cronograma de execução para a tarefa raiz,
task1
. O cronograma especifica o intervalo no qual a tarefa deve ser executada periodicamente.
Para obter mais informações sobre os procedimentos armazenados, consulte Como escrever procedimentos armazenados em Python.
Para criar as duas tarefas, recuperar um objeto
TaskCollection
(tasks
) do esquema do banco de dados e chamar.create()
na coleção de tarefas:# create the task in the Snowflake database tasks = schema.tasks trunc_task = tasks.create(task1, mode=CreateMode.or_replace) task2.predecessors = [trunc_task.name] filter_task = tasks.create(task2, mode=CreateMode.or_replace)
Neste exemplo de código, você também vincula as tarefas configurando
task1
como predecessora paratask2
, o que cria um gráfico de tarefa mínimo.Para confirmar que as duas tarefas agora existem, na próxima célula, execute o seguinte código:
taskiter = tasks.iter() for t in taskiter: print(t.name)
Quando você cria tarefas, elas são suspensas por padrão.
Para iniciar uma tarefa, chame
.resume()
no objeto de recurso da tarefa:trunc_task.resume()
Para confirmar que a tarefa
trunc_task
foi iniciada, na próxima célula, execute o seguinte código:taskiter = tasks.iter() for t in taskiter: print("Name: ", t.name, "| State: ", t.state)
A saída deve ser semelhante a esta:
Name: TASK_PYTHON_API_FILTER | State: suspended Name: TASK_PYTHON_API_TRUNC | State: started
É possível repetir esta etapa sempre que quiser confirmar o status das tarefas.
Para limpar os recursos da tarefa, primeiro suspenda a tarefa antes de descartá-la.
Na próxima célula, execute o seguinte código:
trunc_task.suspend()
Para confirmar que a tarefa está suspensa, repita a etapa 5.
Opcional: Para descartar ambas as tarefas, na próxima célula, execute o seguinte código:
trunc_task.drop() filter_task.drop()
Crie e gerencie um gráfico de tarefa¶
Quando você coordena a execução de um grande número de tarefas, pode ser difícil gerenciar cada tarefa individualmente. O Snowflake Python APIs fornece funcionalidade para orquestrar tarefas com uma API de gráfico de tarefa de nível superior.
Um gráfico de tarefa, também chamado de gráfico acíclico direcionado (DAG), é uma série de tarefas compostas de uma tarefa raiz e tarefas filho, organizadas por suas dependências. Para obter mais informações, consulte Gerenciamento de dependências de tarefa com gráficos de tarefa.
Para criar e implementar um gráfico de tarefa, execute o seguinte código:
dag_name = "python_api_dag" dag = DAG(name=dag_name, schedule=timedelta(days=1)) with dag: dag_task1 = DAGTask( name="task_python_api_trunc", definition=StoredProcedureCall( func=trunc, stage_location=f"@{tasks_stage}", packages=["snowflake-snowpark-python"]), warehouse="COMPUTE_WH", ) dag_task2 = DAGTask( name="task_python_api_filter", definition=StoredProcedureCall( func=filter_by_shipmode, stage_location=f"@{tasks_stage}", packages=["snowflake-snowpark-python"]), warehouse="COMPUTE_WH", ) dag_task1 >> dag_task2 dag_op = DAGOperation(schema) dag_op.deploy(dag, mode=CreateMode.or_replace)
Neste código, você faz o seguinte:
Crie um objeto de gráfico de tarefa chamando o construtor
DAG
e especificando um nome e uma cronograma.Defina o gráfico de tarefa – tarefas específicas usando o construtor
DAGTask
. Observe que o construtor aceita os mesmos argumentos que você especificou para a classeStoredProcedureCall
em uma etapa anterior.Especifique
dag_task1
como a tarefa raiz e predecessora paradag_task2
com sintaxe mais conveniente.Implemente o gráfico de tarefa no esquema
PYTHON_API_SCHEMA
do banco de dadosPYTHON_API_DB
.
Para confirmar a criação do gráfico de tarefa, na próxima célula, execute o seguinte código:
taskiter = tasks.iter() for t in taskiter: print("Name: ", t.name, "| State: ", t.state)
É possível repetir esta etapa sempre que quiser confirmar o status das tarefas.
Para iniciar o gráfico de tarefa iniciando a tarefa raiz, na próxima célula, execute o seguinte código:
dag_op.run(dag)
Para confirmar que a tarefa
PYTHON_API_DAG$TASK_PYTHON_API_TRUNC
foi iniciada, repita a etapa 2.Nota
A chamada de função invocada pelo gráfico de tarefa não será bem-sucedida porque você não a está chamando com nenhum dos argumentos necessários. O objetivo desta etapa é apenas demonstrar como iniciar programaticamente o gráfico de tarefa.
Para soltar o gráfico de tarefa, na próxima célula, execute o seguinte código:
dag_op.drop(dag)
Limpe o objeto de banco de dados que você criou nestes tutoriais:
database.drop()
Qual é o próximo passo?¶
Parabéns! Neste tutorial, você aprendeu como criar e gerenciar tarefas e gráficos de tarefa usando o Snowflake Python APIs.
Resumo¶
Ao longo do processo, você concluiu as seguintes etapas:
Crie um estágio que possa conter procedimentos armazenados e suas dependências.
Crie e gerencie tarefas.
Crie e gerencie um gráfico de tarefa.
Limpe seus objetos de recurso do Snowflake descartando-os.
Próximo tutorial¶
Agora você pode prosseguir para Tutorial 3: Crie e gerencie serviços de contêiner do Snowpark, que mostra como criar e gerenciar componentes no Snowpark Container Services.
Recursos adicionais¶
Para obter mais exemplos de uso da API para gerenciar outros tipos de objetos no Snowflake, consulte os seguintes guias do desenvolvedor:
Guia |
Descrição |
---|---|
Gerenciamento de bancos de dados, esquemas, tabelas e exibições Snowflake com Python |
Use a API para criar e gerenciar bancos de dados, esquemas e tabelas. |
Gerenciamento de usuários, funções e concessões Snowflake com Python |
Use a API para criar e gerenciar usuários, funções e concessões. |
Gerenciamento de recursos de carregamento e descarregamento de dados com Python |
Use a API para criar e gerenciar recursos de carregamento e descarregamento de dados, incluindo volumes externos, canais e estágios. |
Gerenciamento do Snowpark Container Services (incluindo funções de serviço) com Python |
Use a API para gerenciar componentes do Snowpark Container Services, incluindo pools de computação, repositórios de imagens, serviços e funções de serviço. |