タスクグラフで一連のタスクを作成

Snowflakeでは、有向無サイクルグラフ (DAG) としても知られるタスクグラフで複数のタスクを管理できます。タスクグラフはルートタスクと依存関係のある子タスクで構成されます。依存関係は、ループのない、最初から最後までの方向で実行されなければなりません。オプションの最終タスク(ファイナライザ)は、他のすべてのタスクが完了した後にクリーンアップ処理を実行することができます。

ランタイム値、グラフレベルの構成、親タスクの戻り値を使用してタスク本文でロジックベースの操作を指定することにより、動的な動作をするタスクグラフを構築します。

SQL, JavaScript, Python, Java, Scala, Snowflake Scriptingのような サポートされている言語とツール を使用してタスクとタスクグラフを作成できます。このトピックでは SQL の例を示します。Python の例については、 PythonによるSnowflakeタスクとタスクグラフの管理 を参照してください。

タスクグラフの作成

CREATE TASK を使ってルートタスクを作成し、 CREATE TASK を使って子タスクを作成します。AFTER で親タスクを選択します。

ルートタスクは、 タスクグラフがいつ実行されるか を定義します。子タスクはタスクグラフで定義された順序で実行されます。

複数の子タスクが同じ親を持つ場合、子タスクは並行して実行されます。

タスクが複数の親タスクを持つ場合、タスクはすべての先行タスクの正常終了を待ってから開始します。(親タスクがスキップされたときにタスクが実行されることもあります。詳細については、 子タスクのスキップまたは中断 をご参照ください。)

次の例では、1分ごとに実行するようにスケジュールされたルートタスクから始まるサーバーレスタスクグラフを作成しています。ルートタスクには2つの子タスクがあり、並行して実行されます。(この図は、どちらかのタスクが他方よりも長く実行される例を示しています。)両方のタスクが完了すると、3番目の子タスクが実行されます。ファイナライザタスクは、他のすべてのタスクが完了した後、または完了しなかった後に実行されます。

一連のタスクのダイアグラム。
CREATE TASK task_root
  SCHEDULE = '1 MINUTE'
  AS SELECT 1;

CREATE TASK task_a
  AFTER task_root
  AS SELECT 1;

CREATE TASK task_b
  AFTER task_root
  AS SELECT 1;

CREATE TASK task_c
  AFTER task_a, task_b
  AS SELECT 1;
Copy

考慮事項:

  • タスクグラフは、最大1000タスクに制限されます。

  • 単一のタスクには、最大100個の親タスクと100個の子タスクを含めることができます。

  • 同じユーザーが管理するウェアハウスでタスクが並行して実行される場合、 コンピュートリソース は、並行タスクの実行に対応できるサイズでなければなりません。

ファイナライザタスク

タスクグラフの他のすべてのタスクが完了した(または完了しなかった)後に実行するファイナライザタスクをオプションで追加できます。次のような場合に使用します。

  • 不要になった中間データのクリーンアップなど、クリーンアップのパフォーマンスを実行します。

  • タスクの成否に関する通知を送信します。

タスクシーケンスとは、ルートタスクが2つの子タスクを指し示し、その子タスクがさらに別のタスクを指し示すというものです。ファイナライザ・タスクは一番下に表示され、他のすべてのタスクが完了または失敗した後に実行されます。

ファイナライザタスクを作成するには、ルートタスクで CREATE TASK ... FINALIZE ... を使います。例:

CREATE TASK task_finalizer
  FINALIZE = task_root
  AS SELECT 1;
Copy

考慮事項:

  • ファイナライザタスクは常にルートタスクと関連付けられます。各ルートタスクは1つのファイナライザタスクを持つことができ、ファイナライザタスクは1つのルートタスクにのみ関連付けることができます。

  • タスク・グラフのルート・タスクがスキップされると(例えば、 タスク・グラフが重複して実行されている ため)、ファイナライザ・タスクは開始されません。

  • ファイナライザタスクには子タスクを指定できません。

  • ファイナライザタスクは、現在のタスクグラフの実行中に他のタスクが実行中でないか、キューに入っていない場合にのみスケジュールされます。

その他の例については、 ファイナライザタスクの例:電子メール通知の送信ファイナライザタスクの例:エラーの修正 をご参照ください。

タスクグラフの所有権の管理

タスクグラフのすべてのタスクには、同じタスク所有者が必要で、同一のデータベースとスキーマに保存されている必要があります。

タスクグラフの全タスクの所有権は、以下のいずれかの操作で移譲できます。

  • DROP ROLE を使用して、タスクグラフのすべてのタスクの所有者を削除します。Snowflake は、DROPROLEコマンドを実行するロールに所有権を移します。

  • スキーマ内のすべてのタスクに対して GRANT OWNERSHIP を使用して、タスクグラフ内のすべてのタスクの所有権を転送します。

これらの方法でタスクグラフ内のタスクの所有権を移譲しても、タスクグラフ内のタ スクは互いの関係を保持したままです。

1つのタスクの所有権を譲渡すると、そのタスクと親タスクおよび子タスクの間の依存関係がなくなります。詳細については、 親タスクと子タスクのリンク解除 (このトピック内)をご参照ください。

注釈

レプリケーションを実行するロールとは別のロールがグラフを所有している場合、データベースのレプリケーションはタスクグラフでは機能しません。

タスクグラフでタスクを実行またはスケジュール

タスクグラフの手動実行

タスクグラフの単一のインスタンスを実行することができます。これは、本番環境でタスクグラフを有効にする前に、新しいタスクグラフや変更されたタスクグラフをテストする場合や、必要に応じて1回だけ実行する場合に便利です。

タスクグラフを開始する前に、実行に含めたい各子タスク(オプションのファイナライザタスクを含む)で ALTER TASK ... RESUME を使用します。

タスクグラフの単一のインスタンスを実行するには、ルートタスクで EXECUTE TASK を使用します。ルートタスクを実行すると、タスクグラフ内のすべての再開された子タスクが、タスクグラフで定義された順序で実行されます。

スケジュールまたはトリガータスクとしてタスクを実行

ルートタスクで、タスクグラフがいつ実行されるかを定義します。タスクグラフは、定期的なスケジュールで実行することも、イベントによってトリガーすることもできます。詳細については、次のトピックをご参照ください。

タスクグラフを開始するには、以下のどちらかを行います。

  • 実行に含めたい個々の子タスク(ファイナライザを含む)を再開してから、 ALTER TASK ... RESUME を使用してルートタスクを再開します。

  • ルートタスクで SYSTEM$TASK_DEPENDENTS_ENABLE (<root_task_name>) を使って、タスク・グラフの全タスクを一度に再開します。

タスクグラフで依存タスクを表示する

ルートタスクの子タスクを表示するには、 TASK_DEPENDENTS テーブル関数を呼び出します。タスクグラフ内の すべての タスクを取得するには、関数を呼び出すときにルートタスクを入力します。

また、 Snowsight を使ってタスクグラフを管理・表示することもできます。詳細については、 Snowsight でタスクとタスクグラフを表示する をご参照ください。

タスクの変更、中断、再試行

タスクグラフのタスク修正

スケジュールされたタスクグラフのタスクを修正するには、 ALTER TASK ... SUSPEND を使ってルートタスクを一時停止します。タスクグラフの実行が進行中であれば、現在の実行を完了します。ルートタスクの今後のスケジュール実行はすべてキャンセルされます。

ルートタスクが中断されても、ファイナライザタスクを含む子タスクはその状態(中断、実行、完了)を保持します。子タスクを個別に中断する必要はありません。

ルートタスクを一時停止した後、タスクグラフのどのタスクでも修正できます。

タスクグラフを再開するには、以下のどちらかを行います。

  • ALTER TASK ... RESUME を使ってルートタスクを再開します。以前に実行されていた個々の子タスクを再開する必要はありません。

  • SYSTEM$TASK_DEPENDENTS_ENABLE を呼び出し、ルートタスクの名前を渡すことで、タスクグラフ内のすべてのタスクを一度に再開できます。

子タスクのスキップまたは中断

タスクグラフの子タスクをスキップするには、 ALTER TASK ... SUSPEND を使って子タスクを一時停止します。

子タスクを一時停止すると、子タスクが成功したかのようにタスクグラフが実行され続けます。複数の先行タスクを持つ子タスクは、 少なくとも1つ の先行タスクが再開状態にある限り実行され、再開されたすべての先行タスクは正常に完了します。

中断された子タスクを含むタスク・グラフを示す図。中断された子タスクはスキップされ、タスクグラフは完了します。

失敗したタスクの再試行

EXECUTE TASK ... RETRY LAST を使って、最後に失敗したタスクからタスクグラフの実行を試みます。タスクが成功した場合、すべての子タスクはその前のタスクが完了すると同時に実行され続けます。

自動リトライ

デフォルトでは、子タスクが失敗した場合、タスクグラフ全体が失敗したとみなされます。

次にスケジュールされたタスクグラフの実行まで待つのではなく、ルートタスクの TASK_AUTO_RETRY_ATTEMPTS パラメーターをセットすることで、タスクグラフにすぐに再試行するよう指示することができます。子タスクが失敗すると、指定された回数まで、タスクグラフ全体が直ちに再試行されます。それでもタスクグラフが完了しない場合、タスクグラフは失敗したとみなされます。

失敗したタスクグラフの実行後にタスクグラフを一時停止

デフォルトでは、タスクグラフは10回連続して失敗すると中断されます。この値はルートタスクで SUSPEND_TASK_AFTER_NUM_FAILURES をセットすることで変更できます。

次の例では、子タスクが失敗するたびに、タスクグラフ全体が失敗したとみなされる前に、タスクグラフは直ちに2回リトライします。タスクグラフが3回連続で失敗した場合、タスクグラフは中断されます。

CREATE OR REPLACE TASK task_root
  SCHEDULE = '1 MINUTE'
  TASK_AUTO_RETRY_ATTEMPTS = 2   --  Failed task graph retries up to 2 times
  SUSPEND_TASK_AFTER_NUM_FAILURES = 3   --  Task graph suspends after 3 consecutive failures
  AS SELECT 1;
Copy

タスクグラフの重複実行

デフォルトでは、Snowflakeは特定のタスクグラフの1つのインスタンスのみが一度に実行できるようにします。ルートタスクの次の実行は、タスクグラフ内のすべてのタスクの実行が終了した後にのみスケジュールされます。これは、タスクグラフ内のすべてのタスクを実行するために必要な累積時間がルートタスクの定義で設定された、明示的にスケジュールされた時間を超える場合、タスクグラフの少なくとも1つの実行がスキップされることを意味します。

子タスクが重なるようにするには、ルートタスクで CREATE TASK または ALTER TASK を使い、 ALLOW_OVERLAPPING_EXECUTION を TRUE にセットします。(ルートタスクが重なることはありません。)

タスクグラフの重複実行

タスクグラフの重複実行によって実行される読み取り/書き込み SQL 操作が誤ったデータまたは重複するデータを生成しない場合、重複する実行は許容される(または望ましい)場合があります。ただし、他のタスクグラフの場合、タスク所有者(つまり、タスクグラフ内のすべてのタスクに対して OWNERSHIP 権限を持つロール)は、ルートタスクに適切なスケジュールを設定して、適切なウェアハウス(または、サーバーレスコンピューティングリソース)サイズを選択し、ルートタスクの次回の実行予定前に、タスクグラフのインスタンスが完了するようにする必要があります。

ルートタスクで定義されたスケジュールに合わせてタスクグラフを適切に調整するには、次を実行します。

  1. 可能であれば、ルートタスクの実行間のスケジューリング時間を増やします。

  2. コンピューティングの負荷が大きいタスクは、サーバーレスコンピューティングリソースを使用するように変更することを検討します。タスクがユーザー管理のコンピューティングリソースに依存している場合は、タスクグラフで大規模または複雑な SQL ステートメントやストアドプロシージャを実行するためのウェアハウスサイズを拡大することを検討します。

  3. 各タスクによって実行される SQL ステートメントまたはストアドプロシージャを分析します。並列処理を活用するためにコードを書き換えられるかどうかを判断します。

上記の解決策のいずれも役に立たない場合は、ルートタスクで ALLOW_OVERLAPPING_EXECUTION = TRUE を設定して、タスクグラフの同時実行を許可する必要があるかどうかを検討してください。タスクの作成時に(CREATE TASK を使用)、または後で(ALTER TASK を使用、または Snowsight で)、このパラメーターを定義できます。

バージョン管理

タスクグラフのルートタスクが再開または手動で実行されると、Snowflakeは、タスクグラフ内のすべてのタスクのすべてのプロパティを含む、タスクグラフ全体のバージョンを設定します。タスクが中断および変更された後、ルートタスクが再開されるか、手動で実行されると、Snowflakeが新しいバージョンが設定します。

タスクグラフ内にある任意のタスクを変更または再作成するには、最初にルートタスクを中断する必要があります。ルートタスクが中断されると、将来のスケジュールされたルートタスクの実行はすべてキャンセルされます。ただし、現在実行中のタスクがある場合、これらのタスクと子孫タスクは、現在のバージョンを使用して引き続き実行されます。

注釈

タスクグラフの実行中にタスクによって呼び出されるストアドプロシージャの定義が変更された場合、現在実行中のタスクによってストアドプロシージャが呼び出されたときに、新しいプログラミングを実行できます。

たとえば、タスクグラフのルートタスクが中断されているが、このタスクのスケジュールされた実行がすでに開始されているとします。タスクグラフのすべてのタスクの所有者は、ルートタスクの実行中に子タスクによって呼び出される SQL コードを変更します。子タスクが実行され、ルートタスクが実行を開始したときに最新だったバージョンのタスクグラフを使用して、定義内の SQL コードが実行されます。ルートタスクが再開されるか、手動で実行されると、タスクグラフの新しいバージョンが設定されます。この新しいバージョンには、子タスクへの変更が含まれています。

タスクバージョンの履歴を取得するには、 TASK_VERSIONS Account Usageビュー (SNOWFLAKE 共有データベース内)をクエリします。

タスクグラフの期間

タスクグラフの期間には、ルートタスクの開始スケジュールから最後の子タスクが完了するまでの時間が含まれます。タスクグラフの期間を計算するには、 COMPLETE_TASK_GRAPHS ビュー にクエリし、 SCHEDULED_TIME と COMPLETED_TIME を比較します。

例えば、次の図は、1分ごとに実行するようにスケジュールされたタスクグラフです。ルート・タスクとその2つの子タスクはそれぞれ5秒間キューに入れられ、10秒間実行されるため、完了までに合計45秒を要します。

依存関係を持つ3つのタスクを含むタスクグラフの図。各タスクは5秒間キューに入れられ、10秒間実行され、合計45秒間実行されます。

タスクグラフのタイムアウト

ルートタスクに USER_TASK_TIMEOUT_MS がセットされると、タイムアウトはタスクグラフ全体に適用されます。

子タスクまたはファイナライザタスクに USER_TASK_TIMEOUT_MS がセットされると、タイムアウトはそのタスクのみに適用されます。

ルートタスクと子タスクの両方で USER_TASK_TIMEOUT_MS がセットされている場合、子タスクのタイムアウトがルートタスクのタイムアウトを上書きします。

考慮事項

  • サーバーレスタスクの場合、Snowflakeは自動的にリソースをスケーリングし、キュー時間を含むターゲット完了間隔内でタスクが完了するようにします。

  • ユーザー管理タスクの場合、共有ウェアハウスや混雑しているウェアハウスでタスクが実行されるスケジュールでは、キュー期間が長くなるのが一般的です。

  • タスクグラフの場合、総時間には、子タスクが前タスクの完了を待つための追加のキュー時間が含まれるかもしれません。

ロジック(ランタイム情報、構成、戻り値)を含むタスクグラフの作成

タスクグラフのタスクは親タスクの戻り値を使用して、関数本文でロジックベースのオペレーションを実行することができます。

考慮事項:

  • SYSTEM$GET_PREDECESSOR_RETURN_VALUE のように、ロジックベースのコマンドには大文字と小文字を区別するものがあります。しかし、 CREATE TASK を使って引用符なしで作成されたタスクは、 大文字で保存され、解決されます。これを管理するには、次のいずれかを行います:

    • タスク名は大文字のみで作成してください。

    • タスクの名前や呼び出しには引用符を使用してください。

    • 小文字で定義されたタスク名の場合は、大文字を使ってタスクを呼び出します。例: "CREATE TASK task_c..." で定義されたタスクは、 SELECT SYSTEM$GET_PREDECESSOR_RETURN_VALUE('TASK_C') のように呼び出すことができます。

タスクグラフに構成情報を渡します。

JSON オブジェクトを使って構成情報を渡すことができます。このオブジェクトはタスクグラフ内の他のタスクから読み取ることができます。CREATE/ALTER TASK ... CONFIG の構文を使用して、ルートタスクの構成情報をセット、解除、修正します。関数 SYSTEM$GET_TASK_GRAPH_CONFIG を使って取得します。例:

CREATE OR REPLACE TASK "task_root"
  SCHEDULE = '1 MINUTE'
  USER_TASK_TIMEOUT_MS = 60000
  CONFIG='{"environment": "production", "path": "/prod_directory/"}'
  AS SELECT 1;

CREATE OR REPLACE TASK "task_a"
  USER_TASK_TIMEOUT_MS = 600000
  AFTER "task_root"
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$GET_TASK_GRAPH_CONFIG('path'));
      CREATE TABLE IF NOT EXISTS demo_table(NAME VARCHAR, VALUE VARCHAR);
      INSERT INTO demo_table VALUES('task c path',:value);
    END;
Copy

タスク間の戻り値の受け渡し

タスクグラフのタスク間で戻り値を渡すことができます。関数 SYSTEM$SET_RETURN_VALUE を使ってタスクから戻り値を追加し、関数 SYSTEM$GET_PREDECESSOR_RETURN_VALUE を使って戻り値を取得します。

タスクが複数の先行タスクを持っている場合、どのタスクが欲しい返り値を持っているかを指定する必要があります。次の例では、構成情報を追加するタスクグラフのルートタスクを作成します。

CREATE OR REPLACE TASK "task_c"
  SCHEDULE = '1 MINUTE'
  USER_TASK_TIMEOUT_MS = 60000
  AS
    BEGIN
      CALL SYSTEM$SET_RETURN_VALUE('task_c successful');
    END;

CREATE OR REPLACE TASK "task_d"
  USER_TASK_TIMEOUT_MS = 60000
  AFTER "task_c"
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$GET_PREDECESSOR_RETURN_VALUE('task_c'));
      CREATE TABLE IF NOT EXISTS demo_table(NAME VARCHAR, VALUE VARCHAR);
      INSERT INTO demo_table VALUES('Value from predecessor task_c', :value);
    END;
Copy

ランタイム情報の取得と使用

現在のタスク実行に関する情報を報告するには、関数 SYSTEM$TASK_RUNTIME_INFO を使用します。この関数にはタスクグラフ特有のオプションがいくつかあります。例えば、 CURRENT_ROOT_TASK_NAME を使って、現在のタスクグラフのルートタスクの名前を取得します。以下の例では、タスクグラフのルートタスクがいつ開始したかに基づいて、 テーブルに日付スタンプを追加する方法を示します。

-- Updates the date/time table after the root task completes.
CREATE OR REPLACE TASK "task_date_time_table"
  USER_TASK_TIMEOUT_MS = 60000
  AFTER "task_root"
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'));
      INSERT INTO date_time_table VALUES('order_date',:value);
    END;
Copy

例複数のタスクの開始とステータス報告

次の例では、ルートタスクが3つの異なるテーブルを更新するタスクを開始します。これら3つのテーブルが更新された後、タスクは他の3つのテーブルの情報を統合し、売上集計テーブルを作成します。

フローチャートはルートタスクが3つの子タスクを起動し、それぞれがテーブルを更新することを示しています。これら3つのタスクは、すべて別の子タスクに先行し、前の変更を別のテーブルにまとめます。
-- Create a notebook in the public schema
-- USE DATABASE <database name>;
-- USE SCHEMA <schema name>;

-- task_a: Root task. Starts the task graph and sets basic configurations.
CREATE OR REPLACE TASK task_a
  SCHEDULE = '1 MINUTE'
  TASK_AUTO_RETRY_ATTEMPTS = 2
  SUSPEND_TASK_AFTER_NUM_FAILURES = 3
  USER_TASK_TIMEOUT_MS = 60000
  CONFIG='{"environment": "production", "path": "/prod_directory/"}'
  AS
    BEGIN
      CALL SYSTEM$SET_RETURN_VALUE('task_a successful');
    END;
;

-- task_customer_table: Updates the customer table.
--   Runs after the root task completes.
CREATE OR REPLACE TASK task_customer_table
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_a
  AS
    BEGIN
      LET VALUE := (SELECT customer_id FROM ref_cust_table
        WHERE cust_name = "Jane Doe";);
      INSERT INTO customer_table VALUES('customer_id',:value);
    END;
;

-- task_product_table: Updates the product table.
--   Runs after the root task completes.
CREATE OR REPLACE TASK task_product_table
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_a
  AS
    BEGIN
      LET VALUE := (SELECT product_id FROM ref_item_table
        WHERE PRODUCT_NAME = "widget";);
      INSERT INTO product_table VALUES('product_id',:value);
    END;
;

-- task_date_time_table: Updates the date/time table.
--   Runs after the root task completes.
CREATE OR REPLACE TASK task_date_time_table
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_a
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'));
      INSERT INTO "date_time_table" VALUES('order_date',:value);
    END;
;

-- task_sales_table: Aggregates changes from other tables.
--   Runs only after updates are complete to all three other tables.
CREATE OR REPLACE TASK task_sales_table
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_customer_table, task_product_table, task_date_time_table
  AS
    BEGIN
      LET VALUE := (SELECT sales_order_id FROM ORDERS);
      JOIN CUSTOMER_TABLE ON orders.customer_id=customer_table.customer_id;
      INSERT INTO sales_table VALUES('sales_order_id',:value);
    END;
;
Copy

ファイナライザタスクの例:電子メール通知の送信

この例では、ファイナライザタスクがタスクグラフのパフォーマンスを要約したメールを送信する方法を示しています。タスクは2つの外部関数を呼び出します。1つはタスクの完了ステータスに関する情報を集約し、もう1つはその情報を使ってリモートメッセージングサービスで送信できるメールを作成します。

タスクシーケンスとは、ルートタスクが2つの子タスクを指し示し、その子タスクがさらに別のタスクを指し示すというものです。ファイナライザ・タスクは一番下に表示され、他のすべてのタスクが終了した後、あるいは終了に失敗した後に実行されます。
CREATE OR REPLACE TASK notify_finalizer
  USER_TASK_TIMEOUT_MS = 60000
  FINALIZE = task_root
AS
  DECLARE
    my_root_task_id STRING;
    my_start_time TIMESTAMP_LTZ;
    summary_json STRING;
    summary_html STRING;
  BEGIN
    --- Get root task ID
    my_root_task_id := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_UUID'));
    --- Get root task scheduled time
    my_start_time := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP')::timestamp_ltz);
    --- Combine all task run info into one JSON string
    summary_json := (SELECT get_task_graph_run_summary(:my_root_task_id, :my_start_time));
    --- Convert JSON into HTML table
    summary_html := (SELECT HTML_FROM_JSON_TASK_RUNS(:summary_json));

    --- Send HTML to email
    CALL SYSTEM$SEND_EMAIL(
        'email_notification',
        'admin@snowflake.com',
        'notification task run summary',
        :summary_html,
        'text/html');
    --- Set return value for finalizer
    CALL SYSTEM$SET_RETURN_VALUE('✅ Graph run summary sent.');
  END

CREATE OR REPLACE FUNCTION get_task_graph_run_summary(my_root_task_id STRING, my_start_time TIMESTAMP_LTZ)
  RETURNS STRING
AS
$$
  (SELECT
    ARRAY_AGG(OBJECT_CONSTRUCT(
      'task_name', name,
      'run_status', state,
      'return_value', return_value,
      'started', query_start_time,
      'duration', duration,
      'error_message', error_message
      )
    ) AS GRAPH_RUN_SUMMARY
  FROM
    (SELECT
      NAME,
      CASE
        WHEN STATE = 'SUCCEED' then '🟢 Succeeded'
        WHEN STATE = 'FAILED' then '🔴 Failed'
        WHEN STATE = 'SKIPPED' then '🔵 Skipped'
        WHEN STATE = 'CANCELLED' then '🔘 Cancelled'
      END AS STATE,
      RETURN_VALUE,
      TO_VARCHAR(QUERY_START_TIME, 'YYYY-MM-DD HH24:MI:SS') AS QUERY_START_TIME,
      CONCAT(TIMESTAMPDIFF('seconds', query_start_time, completed_time),
        ' s') AS DURATION,
      ERROR_MESSAGE
    FROM
      TABLE(my-database.information_schema.task_history(
        ROOT_TASK_ID => my_root_task_id ::STRING,
        SCHEDULED_TIME_RANGE_START => my_start_time,
        SCHEDULED_TIME_RANGE_END => current_timestamp()
      ))
    ORDER BY
      SCHEDULED_TIME)
  )::STRING
$$
;

CREATE OR REPLACE FUNCTION HTML_FROM_JSON_TASK_RUNS(JSON_DATA STRING)
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = '3.8'
  HANDLER = 'GENERATE_HTML_TABLE'
AS
$$
  IMPORT JSON

  def GENERATE_HTML_TABLE(JSON_DATA):
    column_widths = ["320px", "120px", "400px", "160px", "80px", "480px"]

  DATA = json.loads(JSON_DATA)
  HTML = f"""
    <img src="https://example.com/logo.jpg"
      alt="Company logo" height="72">
    <p><strong>Task Graph Run Summary</strong>
      <br>Sign in to Snowsight to see more details.</p>
    <table border="1" style="border-color:#DEE3EA"
      cellpadding="5" cellspacing="0">
      <thead>
        <tr>
        """
        headers = ["Task name", "Run status", "Return value", "Started", "Duration", "Error message"]
        for i, header in enumerate(headers):
            HTML += f'<th scope="col" style="text-align:left;
            width: {column_widths[i]}">{header.capitalize()}</th>'

        HTML +="""
        </tr>
      </thead>
      <tbody>
        """
        for ROW_DATA in DATA:
          HTML += "<tr>"
          for header in headers:
            key = header.replace(" ", "_").upper()
            CELL_DATA = ROW_DATA.get(key, "")
            HTML += f'<td style="text-align:left;
            width: {column_widths[headers.index(header)]}">{CELL_DATA}</td>'
          HTML += "</tr>"
        HTML +="""
      </tbody>
    </table>
    """
  return HTML
$$
;
Copy

ファイナライザタスクの例:エラーの修正

この例は、ファイナライザータスクがエラーを修正する方法を示しています。

デモンストレーションのため、タスクは最初の実行時に失敗するように設計されています。ファイナライザ・タスクは問題を修正し、タスクを再起動します。

タスクシリーズを示す図。左上がタスクA。矢印はタスクAからタスクBを指し、タスクBはタスクCを指し、タスクCはタスクDを指します。タスクAの下にある矢印は、ファイナライザタスクであるタスクFを指しています。
-- Configuration
-- By default, the notebook creates the objects in the public schema.
-- USE DATABASE <database name>;
-- USE SCHEMA <schema name>;

-- 1. Set the default configurations.
--    Creates a root task ("task_a"), and sets the default configurations
--    used throughout the task graph.
--    Configurations include:
--    * Each task runs after one minute, with a 60-second timeout.
--    * If a task fails, retry it twice. if it fails twice,
--      the entire task graph is considered as failed.
--    * If the task graph fails consecutively three times, suspend the task.
--    * Other environment values are set.

CREATE OR REPLACE TASK task_a
  SCHEDULE = '1 MINUTE'
  USER_TASK_TIMEOUT_MS = 60000
  TASK_AUTO_RETRY_ATTEMPTS = 2
  SUSPEND_TASK_AFTER_NUM_FAILURES = 3
  AS
    BEGIN
      CALL SYSTEM$SET_RETURN_VALUE('task a successful');
    END;
;

-- 2. Use a runtime reflection variable.
--    Creates a child task ("task_b").
--    By design, this example fails the first time it runs, because
--    it writes to a table ("demo_table") that doesn’t exist.
CREATE OR REPLACE TASK task_b
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_a
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('current_task_name'));
      INSERT INTO demo_table VALUES('task b name',:VALUE);
    END;
;

-- 3. Get a task graph configuration value.
--    Creates the child task ("task_c").
--    By design, this example fails the first time it runs, because
--    the predecessor task ("task_b") fails.
CREATE OR REPLACE TASK task_c
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_b
  AS
    BEGIN
      CALL SYSTEM$GET_TASK_GRAPH_CONFIG('path');
      LET VALUE := (SELECT SYSTEM$GET_TASK_GRAPH_CONFIG('path'));
      INSERT INTO demo_table VALUES('task c path',:value);
    END;
;

-- 4. Get a value from a predecessor.
--    Creates the child task ("task_d").
--    By design, this example fails the first time it runs, because
--    the predecessor task ("task_c") fails.
CREATE OR REPLACE TASK task_d
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_c
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$GET_PREDECESSOR_RETURN_VALUE('TASK_A'));
      INSERT INTO demo_table VALUES('task d: predecessor return value', :value);
    END;
;

-- 5. Create the finalizer task ("task_f"), which creates the missing demo table.
--    After the finalizer completes, the task should automatically retry
--    (see task_a: task_auto_retry_attempts).
--    On retry, task_b, task_c, and task_d should complete successfully.
CREATE OR REPLACE TASK task_f
  USER_TASK_TIMEOUT_MS = 60000
  FINALIZE = task_a
  AS
    BEGIN
      CREATE TABLE IF NOT EXISTS demo_table(NAME VARCHAR, VALUE VARCHAR);
    END;
;

-- 6. Resume the finalizer. Upon creation, tasks start in a suspended state.
--    Use this command to resume the finalizer.
ALTER TASK task_f RESUME;
SELECT SYSTEM$TASK_DEPENDENTS_ENABLE('task_a');

-- 7. Query the task history
SELECT
    name, state, attempt_number, scheduled_from
  FROM
    TABLE(information_schema.task_history(task_name=> 'task_b'))
  LIMIT 5;
;

-- 8. Suspend the task graph to stop incurring costs
--    Note: To stop the task graph, you only need to suspend the root task
--    (task_a). Child tasks don’t run unless the root task is run.
--    If any child tasks are running, they have a limited duration
--    and will end soon.
ALTER TASK task_a SUSPEND;
DROP TABLE demo_table;

-- 9. Check tasks during execution (optional)
--    Run this command to query the demo table during execution
--    to check which tasks have run.
SELECT * FROM demo_table;

-- 10. Demo reset (optional)
--     Run this command to remove the demo table.
--     This causes task_b to fail during its first run.
--     After the task graph retries, task_b will succeed.
DROP TABLE demo_table;
Copy