Tutorial 2: Crie e gerencie tarefas e gráficos de tarefa (DAGs)

Introdução

Neste tutorial, você cria e usa tarefas Snowflake para gerenciar alguns procedimentos armazenados básicos. Você também cria um gráfico de tarefa – também chamado de gráfico acíclico dirigido (DAG) – para orquestrar tarefas com uma API de gráfico de tarefa de nível superior.

Pré-requisitos

Nota

Se você já concluiu Configuração comum para tutoriais das APIs Snowflake Python e Tutorial 1: Crie um banco de dados, esquema, tabela e warehouse, pode ignorar esses pré-requisitos e prosseguir para a primeira etapa deste tutorial.

Antes de iniciar este tutorial, é necessário concluir as seguintes etapas:

  1. Siga as instruções comuns de configuração, que incluem as seguintes etapas:

    • Configure seu ambiente de desenvolvimento.

    • Instale o pacote Snowflake Python APIs.

    • Configure sua conexão Snowflake.

    • Importe todos os módulos necessários para os tutoriais da API Python.

    • Crie um objeto Root da API.

  2. Execute o código a seguir para criar um banco de dados nomeado PYTHON_API_DB e um esquema nomeado PYTHON_API_SCHEMA nesse banco de dados.

    database = root.databases.create(
      Database(
        name="PYTHON_API_DB"),
        mode=CreateMode.or_replace
      )
    
    schema = database.schemas.create(
      Schema(
        name="PYTHON_API_SCHEMA"),
        mode=CreateMode.or_replace,
      )
    
    Copy

    Esses são os mesmos objetos de banco de dados e esquema que você criou no Tutorial 1.

Após concluir esses pré-requisitos, você estará pronto para começar a usar API para gerenciamento de tarefa.

Configuração de objetos Snowflake

Configure os procedimentos armazenados que suas tarefas invocarão e o estágio que conterá os procedimentos armazenados. É possível usar seu objeto root do Snowflake Python APIs para criar um estágio no banco de dados PYTHON_API_DB e no esquema PYTHON_API_SCHEMA criado anteriormente.

  1. Para criar um estágio chamado TASKS_STAGE, na próxima célula do notebook, execute o seguinte código:

    stages = root.databases[database.name].schemas[schema.name].stages
    stages.create(Stage(name="TASKS_STAGE"))
    
    Copy

    Este estágio manterá os procedimentos armazenados e quaisquer dependências que esses procedimentos precisem.

  2. Para criar duas funções básicas do Python, que as tarefas executarão como procedimentos armazenados, na próxima célula, execute o seguinte código:

    def trunc(session: Session, from_table: str, to_table: str, count: int) -> str:
      (
        session
        .table(from_table)
        .limit(count)
        .write.save_as_table(to_table)
      )
      return "Truncated table successfully created!"
    
    def filter_by_shipmode(session: Session, mode: str) -> str:
      (
        session
        .table("snowflake_sample_data.tpch_sf100.lineitem")
        .filter(col("L_SHIPMODE") == mode)
        .limit(10)
        .write.save_as_table("filter_table")
      )
      return "Filter table successfully created!"
    
    Copy

    Essas funções fazem o seguinte:

    • trunc(): Cria uma versão truncada de uma tabela de entrada.

    • filter_by_shipmode(): Filtra a tabela SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.LINEITEM por modo de envio, limita os resultados a 10 linhas e grava os resultados em uma nova tabela.

      Nota

      Esta função consulta os dados de amostra TPC-H no banco de dados SNOWFLAKE_SAMPLE_DATA. O Snowflake cria o banco de dados de amostra em novas contas por padrão. Se o banco de dados não foi criado em sua conta, consulte Uso do banco de dados de amostra.

    As funções são intencionalmente básicas e destinadas a fins de demonstração.

Criação e gerenciamento de tarefas

Defina, crie e gerencie duas tarefas que executarão suas funções Python criadas anteriormente como procedimentos armazenados.

  1. Para definir as duas tarefas, task1 e task2, na próxima célula do notebook, execute o seguinte código:

    tasks_stage = f"{database.name}.{schema.name}.TASKS_STAGE"
    
    task1 = Task(
        name="task_python_api_trunc",
        definition=StoredProcedureCall(
          func=trunc,
          stage_location=f"@{tasks_stage}",
          packages=["snowflake-snowpark-python"],
        ),
        warehouse="COMPUTE_WH",
        schedule=timedelta(minutes=1)
    )
    
    task2 = Task(
        name="task_python_api_filter",
        definition=StoredProcedureCall(
          func=filter_by_shipmode,
          stage_location=f"@{tasks_stage}",
          packages=["snowflake-snowpark-python"],
        ),
        warehouse="COMPUTE_WH"
    )
    
    Copy

    Neste código, você especifica os seguintes parâmetros de tarefa:

    • Para cada tarefa, uma definição representada por um objeto StoredProcedureCall que inclui os seguintes atributos:

      • A função chamável a ser executada

      • O local do estágio onde o conteúdo da função Python e suas dependências são carregados

      • Dependências do pacote do procedimento armazenado

    • Um warehouse para executar o procedimento armazenado (necessário ao criar uma tarefa com um objeto StoredProcedureCall). Este tutorial usa o warehouse COMPUTE_WH incluído em sua conta de avaliação.

    • Um cronograma de execução para a tarefa raiz, task1. O cronograma especifica o intervalo no qual a tarefa deve ser executada periodicamente.

    Para obter mais informações sobre os procedimentos armazenados, consulte Como escrever procedimentos armazenados em Python.

  2. Para criar as duas tarefas, recuperar um objeto TaskCollection (tasks) do esquema do banco de dados e chamar .create() na coleção de tarefas:

    # create the task in the Snowflake database
    tasks = schema.tasks
    trunc_task = tasks.create(task1, mode=CreateMode.or_replace)
    
    task2.predecessors = [trunc_task.name]
    filter_task = tasks.create(task2, mode=CreateMode.or_replace)
    
    Copy

    Neste exemplo de código, você também vincula as tarefas configurando task1 como predecessora para task2, o que cria um gráfico de tarefa mínimo.

  3. Para confirmar que as duas tarefas agora existem, na próxima célula, execute o seguinte código:

    taskiter = tasks.iter()
    for t in taskiter:
        print(t.name)
    
    Copy
  4. Quando você cria tarefas, elas são suspensas por padrão.

    Para iniciar uma tarefa, chame .resume() no objeto de recurso da tarefa:

    trunc_task.resume()
    
    Copy
  5. Para confirmar que a tarefa trunc_task foi iniciada, na próxima célula, execute o seguinte código:

    taskiter = tasks.iter()
    for t in taskiter:
        print("Name: ", t.name, "| State: ", t.state)
    
    Copy

    A saída deve ser semelhante a esta:

    Name:  TASK_PYTHON_API_FILTER | State:  suspended
    Name:  TASK_PYTHON_API_TRUNC | State:  started
    

    É possível repetir esta etapa sempre que quiser confirmar o status das tarefas.

  6. Para limpar os recursos da tarefa, primeiro suspenda a tarefa antes de descartá-la.

    Na próxima célula, execute o seguinte código:

    trunc_task.suspend()
    
    Copy
  7. Para confirmar que a tarefa está suspensa, repita a etapa 5.

  8. Opcional: Para descartar ambas as tarefas, na próxima célula, execute o seguinte código:

    trunc_task.drop()
    filter_task.drop()
    
    Copy

Crie e gerencie um gráfico de tarefa

Quando você coordena a execução de um grande número de tarefas, pode ser difícil gerenciar cada tarefa individualmente. O Snowflake Python APIs fornece funcionalidade para orquestrar tarefas com uma API de gráfico de tarefa de nível superior.

Um gráfico de tarefa, também chamado de gráfico acíclico direcionado (DAG), é uma série de tarefas compostas de uma tarefa raiz e tarefas filho, organizadas por suas dependências. Para obter mais informações, consulte Gerenciamento de dependências de tarefa com gráficos de tarefa.

  1. Para criar e implementar um gráfico de tarefa, execute o seguinte código:

    dag_name = "python_api_dag"
    dag = DAG(name=dag_name, schedule=timedelta(days=1))
    with dag:
        dag_task1 = DAGTask(
            name="task_python_api_trunc",
            definition=StoredProcedureCall(
                func=trunc,
                stage_location=f"@{tasks_stage}",
                packages=["snowflake-snowpark-python"]),
            warehouse="COMPUTE_WH",
        )
        dag_task2 = DAGTask(
            name="task_python_api_filter",
            definition=StoredProcedureCall(
                func=filter_by_shipmode,
                stage_location=f"@{tasks_stage}",
                packages=["snowflake-snowpark-python"]),
            warehouse="COMPUTE_WH",
        )
        dag_task1 >> dag_task2
    dag_op = DAGOperation(schema)
    dag_op.deploy(dag, mode=CreateMode.or_replace)
    
    Copy

    Neste código, você faz o seguinte:

    • Crie um objeto de gráfico de tarefa chamando o construtor DAG e especificando um nome e uma cronograma.

    • Defina o gráfico de tarefa – tarefas específicas usando o construtor DAGTask. Observe que o construtor aceita os mesmos argumentos que você especificou para a classe StoredProcedureCall em uma etapa anterior.

    • Especifique dag_task1 como a tarefa raiz e predecessora para dag_task2 com sintaxe mais conveniente.

    • Implemente o gráfico de tarefa no esquema PYTHON_API_SCHEMA do banco de dados PYTHON_API_DB.

  2. Para confirmar a criação do gráfico de tarefa, na próxima célula, execute o seguinte código:

    taskiter = tasks.iter()
    for t in taskiter:
        print("Name: ", t.name, "| State: ", t.state)
    
    Copy

    É possível repetir esta etapa sempre que quiser confirmar o status das tarefas.

  3. Para iniciar o gráfico de tarefa iniciando a tarefa raiz, na próxima célula, execute o seguinte código:

    dag_op.run(dag)
    
    Copy
  4. Para confirmar que a tarefa PYTHON_API_DAG$TASK_PYTHON_API_TRUNC foi iniciada, repita a etapa 2.

    Nota

    A chamada de função invocada pelo gráfico de tarefa não será bem-sucedida porque você não a está chamando com nenhum dos argumentos necessários. O objetivo desta etapa é apenas demonstrar como iniciar programaticamente o gráfico de tarefa.

  5. Para soltar o gráfico de tarefa, na próxima célula, execute o seguinte código:

    dag_op.drop(dag)
    
    Copy
  6. Limpe o objeto de banco de dados que você criou nestes tutoriais:

    database.drop()
    
    Copy

Qual é o próximo passo?

Parabéns! Neste tutorial, você aprendeu como criar e gerenciar tarefas e gráficos de tarefa usando o Snowflake Python APIs.

Resumo

Ao longo do processo, você concluiu as seguintes etapas:

  • Crie um estágio que possa conter procedimentos armazenados e suas dependências.

  • Crie e gerencie tarefas.

  • Crie e gerencie um gráfico de tarefa.

  • Limpe seus objetos de recurso do Snowflake descartando-os.

Próximo tutorial

Agora você pode prosseguir para Tutorial 3: Crie e gerencie serviços de contêiner do Snowpark, que mostra como criar e gerenciar componentes no Snowpark Container Services.

Recursos adicionais

Para obter mais exemplos de uso da API para gerenciar outros tipos de objetos no Snowflake, consulte os seguintes guias do desenvolvedor:

Guia

Descrição

Gerenciamento de bancos de dados, esquemas, tabelas e exibições Snowflake com Python

Use a API para criar e gerenciar bancos de dados, esquemas e tabelas.

Gerenciamento de usuários, funções e concessões Snowflake com Python

Use a API para criar e gerenciar usuários, funções e concessões.

Gerenciamento de recursos de carregamento e descarregamento de dados com Python

Use a API para criar e gerenciar recursos de carregamento e descarregamento de dados, incluindo volumes externos, canais e estágios.

Gerenciamento do Snowpark Container Services (incluindo funções de serviço) com Python

Use a API para gerenciar componentes do Snowpark Container Services, incluindo pools de computação, repositórios de imagens, serviços e funções de serviço.