카테고리:

시스템 함수

SYSTEM$TASK_RUNTIME_INFO

현재 작업 실행에 대한 정보를 반환합니다. 이 함수가 작업 실행 외부에서 호출되면 오류가 발생하며 실패합니다.

구문

SYSTEM$TASK_RUNTIME_INFO('<arg_name>')
Copy

인자

'arg_name'

반환할 정보의 유형을 지정합니다. 다음 값 중 하나를 지정할 수 있습니다.

설명

'CURRENT_TASK_NAME'

현재 작업의 이름을 반환합니다.

'CURRENT_ROOT_TASK_NAME'

현재 작업 그래프의 루트 작업 이름을 반환합니다.

'CURRENT_ROOT_TASK_UUID'

현재 작업 그래프에서 루트 작업을 나타내는 범용 고유 식별자(UUID)를 반환합니다.

'CURRENT_TASK_GRAPH_RUN_GROUP_ID'

현재 그래프 실행 그룹을 나타내는 범용 고유 식별자(UUID)를 반환합니다.

'CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'

현재 그래프 실행 그룹에 있는 루트 작업의 원래 예약된 타임스탬프를 반환합니다.

재시도되는 그래프의 경우 반환되는 값은 현재 그룹에서의 초기 그래프 실행의 원래 예약된 타임스탬프입니다.

'LAST_SUCCESSFUL_TASK_GRAPH_RUN_GROUP_ID'

최근에 성공한 그래프 실행 그룹을 나타내는 범용 고유 식별자(UUID)를 반환합니다.

이 값은 그래프 실행 그룹 전체에서 일관되며 초기 그래프 실행의 루트 작업이 시작될 때 결정됩니다.

'LAST_SUCCESSFUL_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'

최근에 성공한 그래프 실행 그룹에 있는 루트 작업의 원래 예약된 타임스탬프를 반환합니다.

이 값은 그래프 실행 그룹 전체에서 일관되며 초기 그래프 실행의 루트 작업이 시작될 때 결정됩니다.

반환

요청된 정보와 함께 STRING 또는 TEXT를 반환합니다.

CURRENT_ROOT_TASK_NAME과 함께 CURRENT_TASK_GRAPH_RUN_GROUP_ID를 사용하여 고유한 출력 디렉터리 또는 파일을 디버깅하고 생성합니다.

CREATE OR REPLACE TASK my_task ...
  AS
  ...

  -- Inside Python UDF

  query_result = session.sql("""select
        SYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_NAME')
        root_name,
        SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_RUN_GROUP_ID')
        run_id""").collect()
  current_root_task_name, current_graph_run_id = result.ROOT_NAME, result.RUN_ID

  -- Logging information here

  logger.debug(f"start training for {current_root_task_name} at run {current_graph_run_id}")

  -- Create a unique output directory to store intermediate information

  output_dir_name = f"{current_root_task_name}/{current_graph_run_id}/preprocessing.out"
  with open(output_dir_name, "rw+") as f:
    ....
...;
Copy

LAST_SUCCESSFUL_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP와 함께 CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP를 사용하여 스트리밍 입력 원본의 데이터를 처리합니다.

CREATE OR REPLACE TASK my_task ...
  AS
  ...
  INSERT INTO my_output_table
    SELECT * FROM my_source_table
      WHERE TRUE
        ...
        AND TIMESTAMP BETWEEN
          COALESCE(
            SYSTEM$TASK_RUNTIME_INFO(‘LAST_SUCCESSFUL_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP’),
            '2023-07-01'
          ) AND SYSTEM$TASK_RUNTIME_INFO(‘CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP’)
   ...;
Copy

LAST_SUCCESSFUL_TASK_GRAPH_RUN_GROUP_ID를 사용하여 고유한 출력 디렉터리와 로그 줄을 생성합니다.

CREATE OR REPLACE TASK my_task ...
  AS
  ...

  -- Inside Python UDF

  query_result = session.sql("select
      SYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_NAME') root_name, SYSTEM$TASK_RUNTIME_INFO('LAST_SUCCESSFUL_TASK_GRAPH_RUN_GROUP_ID') last_run_id").collect()
  current_root_task_name, last_graph_run_id = result.ROOT_NAME,result.LAST_RUN_ID
  logger.log(f"graph name: {current_root_task_name}, last successful run: {last_graph_run_id}")
  ...;
Copy