カテゴリ:

システム関数

SYSTEM$TASK_RUNTIME_INFO

現在のタスク実行に関する情報を返します。この関数がタスク実行外で呼び出されると、エラーになり失敗します。

構文

SYSTEM$TASK_RUNTIME_INFO('<arg_name>')
Copy

引数

'arg_name'

返す情報の種類を指定します。次の値のいずれかを指定できます。

説明

'CURRENT_TASK_NAME'

現在のタスクの名前を返します。

'CURRENT_ROOT_TASK_NAME'

現在のタスクグラフのルートタスクの名前を返します。

'CURRENT_ROOT_TASK_UUID'

現在のタスクグラフのルートタスクを表すユニバーサル一意識別子(UUID)を返します。

'CURRENT_TASK_GRAPH_RUN_GROUP_ID'

現在のグラフ実行グループを表すユニバーサル一意識別子(UUID)を返します。

'CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'

現在のグラフ実行グループのルートタスクの元のスケジュールされたタイムスタンプを返します。

再試行されたグラフの場合、返される値は、現在のグループで最初に実行されたグラフの元のスケジュールされたタイムスタンプです。

'LAST_SUCCESSFUL_TASK_GRAPH_RUN_GROUP_ID'

最後に成功したグラフ実行グループを表す、ユニバーサル一意識別子(UUID)を返します。

この値はグラフ実行グループ全体で一貫しており、最初のグラフ実行のルートタスクが開始されるときに決定されます。

'LAST_SUCCESSFUL_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'

最後に成功したグラフ実行グループ内のルートタスクの元のスケジュールされたタイムスタンプを返します。

この値はグラフ実行グループ全体で一貫しており、最初のグラフ実行のルートタスクが開始されるときに決定されます。

戻り値

要求された情報を STRING または TEXT で返します。

CURRENT_TASK_GRAPH_RUN_GROUP_ID を CURRENT_ROOT_TASK_NAME と一緒に使用してデバッグしたり、一意の出力ディレクトリやファイルを作成したりします。

CREATE OR REPLACE TASK my_task ...
  AS
  ...

  -- Inside Python UDF

  query_result = session.sql("""select
        SYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_NAME')
        root_name,
        SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_RUN_GROUP_ID')
        run_id""").collect()
  current_root_task_name, current_graph_run_id = result.ROOT_NAME, result.RUN_ID

  -- Logging information here

  logger.debug(f"start training for {current_root_task_name} at run {current_graph_run_id}")

  -- Create a unique output directory to store intermediate information

  output_dir_name = f"{current_root_task_name}/{current_graph_run_id}/preprocessing.out"
  with open(output_dir_name, "rw+") as f:
    ....
...;
Copy

ストリーミング入力ソースからのデータを処理するには、 CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP と LAST_SUCCESSFUL_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP を使用します。

CREATE OR REPLACE TASK my_task ...
  AS
  ...
  INSERT INTO my_output_table
    SELECT * FROM my_source_table
      WHERE TRUE
        ...
        AND TIMESTAMP BETWEEN
          COALESCE(
            SYSTEM$TASK_RUNTIME_INFO(‘LAST_SUCCESSFUL_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP’),
            '2023-07-01'
          ) AND SYSTEM$TASK_RUNTIME_INFO(‘CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP’)
   ...;
Copy

一意の出力ディレクトリとログ行を生成するには、 LAST_SUCCESSFUL_TASK_GRAPH_RUN_GROUP_ID を使用します。

CREATE OR REPLACE TASK my_task ...
  AS
  ...

  -- Inside Python UDF

  query_result = session.sql("select
      SYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_NAME') root_name, SYSTEM$TASK_RUNTIME_INFO('LAST_SUCCESSFUL_TASK_GRAPH_RUN_GROUP_ID') last_run_id").collect()
  current_root_task_name, last_graph_run_id = result.ROOT_NAME,result.LAST_RUN_ID
  logger.log(f"graph name: {current_root_task_name}, last successful run: {last_graph_run_id}")
  ...;
Copy