자습서 2: 작업 및 작업 그래프(DAG) 만들기 및 관리

소개

이 자습서에서는 몇 가지 기본 저장 프로시저를 관리하기 위해 Snowflake 작업을 생성하고 사용합니다. 또한 작업 그래프(방향성 비순환 그래프(DAG)라고도 함)를 생성하여 상위 수준의 작업 그래프 API로 작업을 오케스트레이션할 수도 있습니다.

전제 조건

참고

이미 Snowflake Python APIs 자습서의 일반 설정자습서 1: 데이터베이스, 스키마, 테이블, 웨어하우스 만들기 을 완료한 경우, 이러한 전제 조건을 건너뛰고 이 자습서의 첫 번째 단계로 넘어갈 수 있습니다.

이 자습서를 시작하기 전에 다음 단계를 완료해야 합니다.

  1. 다음 단계가 포함된 일반 설정 지침을 따르십시오.

    • 개발 환경을 설정합니다.

    • Snowflake Python APIs 패키지를 설치합니다.

    • Snowflake 연결을 구성합니다.

    • Python API 자습서에 필요한 모든 모듈을 가져옵니다.

    • API Root 오브젝트를 생성합니다.

  2. 다음 코드를 실행하여 이름이 PYTHON_API_DB 인 데이터베이스와 해당 데이터베이스에 이름이 PYTHON_API_SCHEMA 인 스키마를 생성합니다.

    database = root.databases.create(
      Database(
        name="PYTHON_API_DB"),
        mode=CreateMode.or_replace
      )
    
    schema = database.schemas.create(
      Schema(
        name="PYTHON_API_SCHEMA"),
        mode=CreateMode.or_replace,
      )
    
    Copy

    이는 자습서 1 에서 생성한 것과 동일한 데이터베이스 및 스키마 오브젝트입니다.

이러한 사전 요구 사항을 완료하면 작업 관리를 위해 API를 사용할 준비가 된 것입니다.

Snowflake 오브젝트 설정

작업에서 호출할 저장 프로시저와 저장 프로시저를 보관할 스테이지를 설정합니다. Snowflake Python APIs root 오브젝트를 사용하여 이전에 생성한 PYTHON_API_DB 데이터베이스 및 PYTHON_API_SCHEMA 스키마에 스테이지를 생성할 수 있습니다.

  1. 이름이 TASKS_STAGE 인 스테이지를 생성하려면 노트북의 다음 셀에서 다음 코드를 실행합니다.

    stages = root.databases[database.name].schemas[schema.name].stages
    stages.create(Stage(name="TASKS_STAGE"))
    
    Copy

    이 스테이지에서는 저장된 프로시저와 해당 프로시저에 필요한 모든 종속성이 보관됩니다.

  2. 작업이 저장 프로시저로 실행되는 두 가지 기본 Python 함수를 생성하려면 다음 셀에서 다음 코드를 실행합니다.

    def trunc(session: Session, from_table: str, to_table: str, count: int) -> str:
      (
        session
        .table(from_table)
        .limit(count)
        .write.save_as_table(to_table)
      )
      return "Truncated table successfully created!"
    
    def filter_by_shipmode(session: Session, mode: str) -> str:
      (
        session
        .table("snowflake_sample_data.tpch_sf100.lineitem")
        .filter(col("L_SHIPMODE") == mode)
        .limit(10)
        .write.save_as_table("filter_table")
      )
      return "Filter table successfully created!"
    
    Copy

    이러한 함수는 다음을 수행합니다.

    • trunc(): 입력 테이블의 잘린 버전을 생성합니다.

    • filter_by_shipmode(): 배송 모드별로 SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.LINEITEM 테이블을 필터링하고 결과를 10개 행으로 제한한 다음 결과를 새 테이블에 씁니다.

      참고

      이 함수는 SNOWFLAKE_SAMPLE_DATA 데이터베이스의 TPC-H 샘플 데이터 를 쿼리합니다. Snowflake는 기본적으로 새 계정에 샘플 데이터베이스를 만듭니다. 귀하의 계정에 데이터베이스가 생성되지 않은 경우 샘플 데이터베이스 사용하기 섹션을 참조하십시오.

    이러한 함수는 의도적으로 기본적인 것이며 데모 목적으로 개발되었습니다.

작업 생성 및 관리

이전에 생성한 Python 함수를 저장 프로시저로 실행하는 두 가지 작업을 정의, 생성 및 관리합니다.

  1. 노트북의 다음 셀에 task1task2 라는 두 작업을 정의하려면 다음 코드를 실행합니다.

    tasks_stage = f"{database.name}.{schema.name}.TASKS_STAGE"
    
    task1 = Task(
        name="task_python_api_trunc",
        definition=StoredProcedureCall(
          func=trunc,
          stage_location=f"@{tasks_stage}",
          packages=["snowflake-snowpark-python"],
        ),
        warehouse="COMPUTE_WH",
        schedule=timedelta(minutes=1)
    )
    
    task2 = Task(
        name="task_python_api_filter",
        definition=StoredProcedureCall(
          func=filter_by_shipmode,
          stage_location=f"@{tasks_stage}",
          packages=["snowflake-snowpark-python"],
        ),
        warehouse="COMPUTE_WH"
    )
    
    Copy

    이 코드에서는 다음 작업 매개 변수를 지정합니다.

    • 각 작업에 대해, 다음 특성을 포함하는 StoredProcedureCall 오브젝트에 의해 표시되는 정의입니다.

      • 실행할 호출 가능한 함수

      • Python 함수의 내용과 종속성이 업로드되는 스테이지 위치

      • 저장 프로시저의 패키지 종속성

    • 저장 프로시저를 실행할 웨어하우스(StoredProcedureCall 오브젝트가 있는 작업을 생성할 때 필요). 이 자습서에서는 평가판 계정에 포함된 COMPUTE_WH 웨어하우스를 사용합니다.

    • 루트 작업의 실행 일정, task1. 일정은 작업을 주기적으로 실행하는 간격을 지정합니다.

    저장 프로시저에 대한 자세한 내용은 Python으로 저장 프로시저 작성하기 를 참조하십시오.

  2. 두 개의 작업을 생성하려면 데이터베이스 스키마에서 TaskCollection 오브젝트(tasks)를 검색하고 작업 컬렉션에서 .create() 를 호출합니다.

    # create the task in the Snowflake database
    tasks = schema.tasks
    trunc_task = tasks.create(task1, mode=CreateMode.or_replace)
    
    task2.predecessors = [trunc_task.name]
    filter_task = tasks.create(task2, mode=CreateMode.or_replace)
    
    Copy

    이 코드 예제에서는 task1task2 의 선행 작업으로 설정하여 작업을 연결하여 최소 작업 그래프를 생성합니다.

  3. 두 작업이 이제 있는지 확인하려면 다음 셀에서 다음 코드를 실행합니다.

    taskiter = tasks.iter()
    for t in taskiter:
        print(t.name)
    
    Copy
  4. 작업을 생성하면 기본적으로 작업이 일시 중단됩니다.

    작업을 시작하려면 작업 리소스 오브젝트에서 .resume() 을 호출합니다.

    trunc_task.resume()
    
    Copy
  5. trunc_task 작업이 시작되었는지 확인하려면 다음 셀에서 다음 코드를 실행합니다.

    taskiter = tasks.iter()
    for t in taskiter:
        print("Name: ", t.name, "| State: ", t.state)
    
    Copy

    출력은 다음과 유사해야 합니다.

    Name:  TASK_PYTHON_API_FILTER | State:  suspended
    Name:  TASK_PYTHON_API_TRUNC | State:  started
    

    작업 상태를 확인하고 싶을 때마다 이 단계를 반복할 수 있습니다.

  6. 작업 리소스를 정리하려면 먼저 작업을 일시 중단한 다음 삭제합니다.

    다음 셀에서 다음 코드를 실행합니다.

    trunc_task.suspend()
    
    Copy
  7. 작업이 일시 중단되었는지 확인하려면 5단계를 반복합니다.

  8. 선택 사항: 두 작업을 모두 삭제하려면 다음 셀에서 다음 코드를 실행합니다.

    trunc_task.drop()
    filter_task.drop()
    
    Copy

작업 그래프를 생성하고 관리합니다.

다수의 작업 실행을 조정할 때, 각 작업을 개별적으로 관리하는 것은 어려울 수 있습니다. Snowflake Python APIs 은 상위 수준의 작업 그래프 API로 작업을 오케스트레이션하는 기능을 제공합니다.

방향성 비순환 그래프(DAG)라고도 하는 작업 그래프는 루트 작업과 하위 작업으로 구성된 일련의 작업으로, 종속성에 따라 구성됩니다. 자세한 내용은 작업 그래프를 사용한 작업 종속성 관리 섹션을 참조하십시오.

  1. 작업 그래프를 생성하고 배포하려면 다음 코드를 실행합니다.

    dag_name = "python_api_dag"
    dag = DAG(name=dag_name, schedule=timedelta(days=1))
    with dag:
        dag_task1 = DAGTask(
            name="task_python_api_trunc",
            definition=StoredProcedureCall(
                func=trunc,
                stage_location=f"@{tasks_stage}",
                packages=["snowflake-snowpark-python"]),
            warehouse="COMPUTE_WH",
        )
        dag_task2 = DAGTask(
            name="task_python_api_filter",
            definition=StoredProcedureCall(
                func=filter_by_shipmode,
                stage_location=f"@{tasks_stage}",
                packages=["snowflake-snowpark-python"]),
            warehouse="COMPUTE_WH",
        )
        dag_task1 >> dag_task2
    dag_op = DAGOperation(schema)
    dag_op.deploy(dag, mode=CreateMode.or_replace)
    
    Copy

    이 코드에서는 다음을 수행합니다.

    • DAG 생성자를 호출하고 이름과 일정을 지정하여 작업 그래프 오브젝트를 생성합니다.

    • DAGTask 생성자를 사용하여 작업 그래프별 작업을 정의합니다. 생성자는 이전 단계에서 StoredProcedureCall 클래스에 지정한 것과 동일한 인자를 허용한다는 점에 유의하십시오.

    • 더 편리한 구문을 사용하여 dag_task1 을 루트 작업으로 지정하고 dag_task2 의 이전 작업을 지정합니다.

    • 작업 그래프를 PYTHON_API_SCHEMA 데이터베이스의 PYTHON_API_DB 스키마에 배포합니다.

  2. 작업 그래프 생성을 확인하려면 다음 셀에서 다음 코드를 실행합니다.

    taskiter = tasks.iter()
    for t in taskiter:
        print("Name: ", t.name, "| State: ", t.state)
    
    Copy

    작업 상태를 확인하고 싶을 때마다 이 단계를 반복할 수 있습니다.

  3. 루트 작업을 시작하여 작업 그래프를 시작하려면 다음 셀에서 다음 코드를 실행합니다.

    dag_op.run(dag)
    
    Copy
  4. PYTHON_API_DAG$TASK_PYTHON_API_TRUNC 작업이 시작되었는지 확인하려면 2단계를 반복합니다.

    참고

    작업 그래프에서 호출한 함수 호출은 필수 인자를 사용하지 않았기 때문에 성공하지 못합니다. 이 단계의 목적은 작업 그래프를 프로그래밍 방식으로 시작하는 방법을 보여주는 것입니다.

  5. 작업 그래프를 삭제하려면 다음 셀에서 다음 코드를 실행합니다.

    dag_op.drop(dag)
    
    Copy
  6. 이 자습서에서 생성한 데이터베이스 오브젝트를 정리합니다.

    database.drop()
    
    Copy

다음에는 무엇을 해야 합니까?

축하합니다! 이 자습서에서는 Snowflake Python APIs 을 사용하여 작업 및 작업 그래프를 생성하고 관리하는 방법을 배웠습니다.

요약

이 과정에서 다음 단계를 완료했습니다.

  • 저장 프로시저와 해당 종속성을 보관할 수 있는 스테이지를 생성합니다.

  • 작업을 생성하고 관리합니다.

  • 작업 그래프를 생성하고 관리합니다.

  • Snowflake 리소스 오브젝트를 삭제하여 정리합니다.

다음 자습서

이제 Snowpark Container Services 에서 컴포넌트를 생성하고 관리하는 방법을 보여주는 자습서 3: Snowpark Container Service 만들기 및 관리 으로 진행할 수 있습니다.

추가 리소스

API를 사용하여 Snowflake에서 다른 유형의 오브젝트를 관리하는 더 많은 예제는 다음 개발자 가이드를 참조하십시오.

가이드

설명

Python을 사용하여 Snowflake 데이터베이스, 스키마, 테이블 및 뷰 관리하기

데이터베이스, 스키마, 테이블을 생성하고 관리하려면 API를 사용합니다.

Python을 사용하여 Snowflake 사용자, 역할 및 권한 관리

API를 사용하여 사용자, 역할 및 보조금을 생성하고 관리합니다.

Python을 사용하여 데이터 로딩 및 언로딩 리소스 관리

API를 사용하여 외부 볼륨, 파이프, 스테이지를 포함한 데이터 로딩 및 언로딩 리소스를 생성하고 관리합니다.

Python을 사용하여 Snowpark 컨테이너 서비스(서비스 함수 포함) 관리하기

API를 사용하여 컴퓨팅 풀, 이미지 리포지토리, 서비스, 서비스 함수 등 Snowpark Container Services의 구성 요소를 관리합니다.