Tutorial 2: Crie e gerencie tarefas e gráficos de tarefa (DAGs)

Introdução

Neste tutorial, você cria e usa tarefas Snowflake para gerenciar alguns procedimentos armazenados básicos. Você também cria um gráfico de tarefa – também chamado de gráfico acíclico dirigido (DAG) – para orquestrar tarefas com uma API de gráfico de tarefa de nível superior.

Pré-requisitos

Nota

Se você já concluiu Configuração comum para tutoriais das APIs Snowflake Python e Tutorial 1: Crie um banco de dados, esquema, tabela e warehouse, pode ignorar esses pré-requisitos e prosseguir para a primeira etapa deste tutorial.

Antes de iniciar este tutorial, é necessário concluir as seguintes etapas:

  1. Siga as instruções comuns de configuração, que incluem as seguintes etapas:

    • Configure seu ambiente de desenvolvimento.

    • Instale o pacote Snowflake Python APIs.

    • Configure sua conexão Snowflake.

    • Importe todos os módulos necessários para os tutoriais da API Python.

    • Crie um objeto Root da API.

  2. Execute o código a seguir para criar um banco de dados nomeado PYTHON_API_DB e um esquema nomeado PYTHON_API_SCHEMA nesse banco de dados.

    database = root.databases.create(
      Database(
        name="PYTHON_API_DB"),
        mode=CreateMode.or_replace
      )
    
    schema = database.schemas.create(
      Schema(
        name="PYTHON_API_SCHEMA"),
        mode=CreateMode.or_replace,
      )
    
    Copy

    Esses são os mesmos objetos de banco de dados e esquema que você criou no Tutorial 1.

Após concluir esses pré-requisitos, você estará pronto para começar a usar API para gerenciamento de tarefa.

Configuração de objetos Snowflake

Configure os procedimentos armazenados que suas tarefas invocarão e o estágio que conterá os procedimentos armazenados. É possível usar seu objeto root do Snowflake Python APIs para criar um estágio no banco de dados PYTHON_API_DB e no esquema PYTHON_API_SCHEMA criado anteriormente.

  1. Para criar um estágio chamado TASKS_STAGE, na próxima célula do notebook, execute o seguinte código:

    stages = root.databases[database.name].schemas[schema.name].stages
    stages.create(Stage(name="TASKS_STAGE"))
    
    Copy

    Este estágio manterá os procedimentos armazenados e quaisquer dependências que esses procedimentos precisem.

  2. Para criar duas funções básicas do Python, que as tarefas executarão como procedimentos armazenados, na próxima célula, execute o seguinte código:

    def trunc(session: Session, from_table: str, to_table: str, count: int) -> str:
      (
        session
        .table(from_table)
        .limit(count)
        .write.save_as_table(to_table)
      )
      return "Truncated table successfully created!"
    
    def filter_by_shipmode(session: Session, mode: str) -> str:
      (
        session
        .table("snowflake_sample_data.tpch_sf100.lineitem")
        .filter(col("L_SHIPMODE") == mode)
        .limit(10)
        .write.save_as_table("filter_table")
      )
      return "Filter table successfully created!"
    
    Copy

    Essas funções fazem o seguinte:

    • trunc(): Cria uma versão truncada de uma tabela de entrada.

    • filter_by_shipmode(): Filtra a tabela SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.LINEITEM por modo de envio, limita os resultados a 10 linhas e grava os resultados em uma nova tabela.

      Nota

      Esta função consulta os dados de amostra TPC-H no banco de dados SNOWFLAKE_SAMPLE_DATA. O Snowflake cria o banco de dados de amostra em novas contas por padrão. Se o banco de dados não foi criado em sua conta, consulte Uso do banco de dados de amostra.

    As funções são intencionalmente básicas e destinadas a fins de demonstração.

Criação e gerenciamento de tarefas

Defina, crie e gerencie duas tarefas que executarão suas funções Python criadas anteriormente como procedimentos armazenados.

  1. Para definir as duas tarefas, task1 e task2, na próxima célula do notebook, execute o seguinte código:

    tasks_stage = f"{database.name}.{schema.name}.TASKS_STAGE"
    
    task1 = Task(
        name="task_python_api_trunc",
        definition=StoredProcedureCall(
          func=trunc,
          stage_location=f"@{tasks_stage}",
          packages=["snowflake-snowpark-python"],
        ),
        warehouse="COMPUTE_WH",
        schedule=timedelta(minutes=1)
    )
    
    task2 = Task(
        name="task_python_api_filter",
        definition=StoredProcedureCall(
          func=filter_by_shipmode,
          stage_location=f"@{tasks_stage}",
          packages=["snowflake-snowpark-python"],
        ),
        warehouse="COMPUTE_WH"
    )
    
    Copy

    Neste código, você especifica os seguintes parâmetros de tarefa:

    • Para cada tarefa, uma definição representada por um objeto StoredProcedureCall que inclui os seguintes atributos:

      • A função chamável a ser executada

      • O local do estágio onde o conteúdo da função Python e suas dependências são carregados

      • Dependências do pacote do procedimento armazenado

    • Um warehouse para executar o procedimento armazenado (necessário ao criar uma tarefa com um objeto StoredProcedureCall). Este tutorial usa o warehouse COMPUTE_WH incluído em sua conta de avaliação.

    • Um cronograma de execução para a tarefa raiz, task1. O cronograma especifica o intervalo no qual a tarefa deve ser executada periodicamente.

    Para obter mais informações sobre os procedimentos armazenados, consulte Como escrever procedimentos armazenados com SQL e Python.

  2. Para criar as duas tarefas, recuperar um objeto TaskCollection (tasks) do esquema do banco de dados e chamar .create() na coleção de tarefas:

    # create the task in the Snowflake database
    tasks = schema.tasks
    trunc_task = tasks.create(task1, mode=CreateMode.or_replace)
    
    task2.predecessors = [trunc_task.name]
    filter_task = tasks.create(task2, mode=CreateMode.or_replace)
    
    Copy

    Neste exemplo de código, você também vincula as tarefas configurando task1 como predecessora para task2, o que cria um gráfico de tarefa mínimo.

  3. Para confirmar que as duas tarefas agora existem, na próxima célula, execute o seguinte código:

    taskiter = tasks.iter()
    for t in taskiter:
        print(t.name)
    
    Copy
  4. Quando você cria tarefas, elas são suspensas por padrão.

    Para iniciar uma tarefa, chame .resume() no objeto de recurso da tarefa:

    trunc_task.resume()
    
    Copy
  5. Para confirmar que a tarefa trunc_task foi iniciada, na próxima célula, execute o seguinte código:

    taskiter = tasks.iter()
    for t in taskiter:
        print("Name: ", t.name, "| State: ", t.state)
    
    Copy

    A saída deve ser semelhante a esta:

    Name:  TASK_PYTHON_API_FILTER | State:  suspended
    Name:  TASK_PYTHON_API_TRUNC | State:  started
    

    É possível repetir esta etapa sempre que quiser confirmar o status das tarefas.

  6. Para limpar os recursos da tarefa, primeiro suspenda a tarefa antes de descartá-la.

    Na próxima célula, execute o seguinte código:

    trunc_task.suspend()
    
    Copy
  7. Para confirmar que a tarefa está suspensa, repita a etapa 5.

  8. Opcional: Para descartar ambas as tarefas, na próxima célula, execute o seguinte código:

    trunc_task.drop()
    filter_task.drop()
    
    Copy

Crie e gerencie um gráfico de tarefa

Quando você coordena a execução de um grande número de tarefas, pode ser difícil gerenciar cada tarefa individualmente. O Snowflake Python APIs fornece funcionalidade para orquestrar tarefas com uma API de gráfico de tarefa de nível superior.

Um gráfico de tarefa, também chamado de gráfico acíclico direcionado (DAG), é uma série de tarefas compostas de uma tarefa raiz e tarefas filho, organizadas por suas dependências. Para obter mais informações, consulte Gerenciamento de dependências de tarefa com gráficos de tarefa.

  1. Para criar e implementar um gráfico de tarefa, execute o seguinte código:

    dag_name = "python_api_dag"
    dag = DAG(name=dag_name, schedule=timedelta(days=1))
    with dag:
        dag_task1 = DAGTask(
            name="task_python_api_trunc",
            definition=StoredProcedureCall(
                func=trunc,
                stage_location=f"@{tasks_stage}",
                packages=["snowflake-snowpark-python"]),
            warehouse="COMPUTE_WH",
        )
        dag_task2 = DAGTask(
            name="task_python_api_filter",
            definition=StoredProcedureCall(
                func=filter_by_shipmode,
                stage_location=f"@{tasks_stage}",
                packages=["snowflake-snowpark-python"]),
            warehouse="COMPUTE_WH",
        )
        dag_task1 >> dag_task2
    dag_op = DAGOperation(schema)
    dag_op.deploy(dag, mode=CreateMode.or_replace)
    
    Copy

    Neste código, você faz o seguinte:

    • Crie um objeto de gráfico de tarefa chamando o construtor DAG e especificando um nome e uma cronograma.

    • Defina o gráfico de tarefa – tarefas específicas usando o construtor DAGTask. Observe que o construtor aceita os mesmos argumentos que você especificou para a classe StoredProcedureCall em uma etapa anterior.

    • Especifique dag_task1 como a tarefa raiz e predecessora para dag_task2 com sintaxe mais conveniente.

    • Implemente o gráfico de tarefa no esquema PYTHON_API_SCHEMA do banco de dados PYTHON_API_DB.

  2. Para confirmar a criação do gráfico de tarefa, na próxima célula, execute o seguinte código:

    taskiter = tasks.iter()
    for t in taskiter:
        print("Name: ", t.name, "| State: ", t.state)
    
    Copy

    É possível repetir esta etapa sempre que quiser confirmar o status das tarefas.

  3. Para iniciar o gráfico de tarefa iniciando a tarefa raiz, na próxima célula, execute o seguinte código:

    dag_op.run(dag)
    
    Copy
  4. Para confirmar que a tarefa PYTHON_API_DAG$TASK_PYTHON_API_TRUNC foi iniciada, repita a etapa 2.

    Nota

    A chamada de função invocada pelo gráfico de tarefa não será bem-sucedida porque você não a está chamando com nenhum dos argumentos necessários. O objetivo desta etapa é apenas demonstrar como iniciar programaticamente o gráfico de tarefa.

  5. Para soltar o gráfico de tarefa, na próxima célula, execute o seguinte código:

    dag_op.drop(dag)
    
    Copy
  6. Limpe o objeto de banco de dados que você criou nestes tutoriais:

    database.drop()
    
    Copy

Qual é o próximo passo?

Parabéns! Neste tutorial, você aprendeu como criar e gerenciar tarefas e gráficos de tarefa usando o Snowflake Python APIs.

Resumo

Ao longo do processo, você concluiu as seguintes etapas:

  • Crie um estágio que possa conter procedimentos armazenados e suas dependências.

  • Crie e gerencie tarefas.

  • Crie e gerencie um gráfico de tarefa.

  • Limpe seus objetos de recurso do Snowflake descartando-os.

Próximo tutorial

Agora você pode prosseguir para Tutorial 3: Crie e gerencie serviços de contêiner do Snowpark, que mostra como criar e gerenciar componentes no Snowpark Container Services.

Recursos adicionais

Para obter mais exemplos de uso da API para gerenciar outros tipos de objetos no Snowflake, consulte os seguintes guias do desenvolvedor:

Guia

Descrição

Gerenciamento de bancos de dados, esquemas, tabelas e exibições Snowflake com Python

Use a API para criar e gerenciar bancos de dados, esquemas e tabelas.

Gerenciamento de usuários, funções e concessões Snowflake com Python

Use a API para criar e gerenciar usuários, funções e concessões.

Gerenciamento de recursos de carregamento e descarregamento de dados com Python

Use a API para criar e gerenciar recursos de carregamento e descarregamento de dados, incluindo volumes externos, canais e estágios.

Gerenciamento do Snowpark Container Services (incluindo funções de serviço) com Python

Use a API para gerenciar componentes do Snowpark Container Services, incluindo pools de computação, repositórios de imagens, serviços e funções de serviço.