タスクグラフで一連のタスクを作成¶
Snowflakeでは、有向無サイクルグラフ (DAG) としても知られるタスクグラフで複数のタスクを管理できます。タスクグラフはルートタスクと依存関係のある子タスクで構成されます。依存関係は、ループのない、最初から最後までの方向で実行されなければなりません。オプションの最終タスク(ファイナライザ)は、他のすべてのタスクが完了した後にクリーンアップ処理を実行することができます。
ランタイム値、グラフレベルの構成、親タスクの戻り値を使用してタスク本文でロジックベースの操作を指定することにより、動的な動作をするタスクグラフを構築します。
SQL, JavaScript, Python, Java, Scala, Snowflake Scriptingのような サポートされている言語とツール を使用してタスクとタスクグラフを作成できます。このトピックでは SQL の例を示します。Python の例については、 PythonによるSnowflakeタスクとタスクグラフの管理 を参照してください。
タスクグラフの作成¶
CREATE TASK を使ってルートタスクを作成し、 CREATE TASK を使って子タスクを作成します。AFTER で親タスクを選択します。
ルートタスクは、 タスクグラフがいつ実行されるか を定義します。子タスクはタスクグラフで定義された順序で実行されます。
複数の子タスクが同じ親を持つ場合、子タスクは並行して実行されます。
タスクが複数の親タスクを持つ場合、タスクはすべての先行タスクの正常終了を待ってから開始します。(親タスクがスキップされたときにタスクが実行されることもあります。詳細については、 子タスクのスキップまたは中断 をご参照ください。)
次の例では、1分ごとに実行するようにスケジュールされたルートタスクから始まるサーバーレスタスクグラフを作成しています。ルートタスクには2つの子タスクがあり、並行して実行されます。(この図は、どちらかのタスクが他方よりも長く実行される例を示しています。)両方のタスクが完了すると、3番目の子タスクが実行されます。ファイナライザタスクは、他のすべてのタスクが完了した後、または完了しなかった後に実行されます。
CREATE TASK task_root
SCHEDULE = '1 MINUTE'
AS SELECT 1;
CREATE TASK task_a
AFTER task_root
AS SELECT 1;
CREATE TASK task_b
AFTER task_root
AS SELECT 1;
CREATE TASK task_c
AFTER task_a, task_b
AS SELECT 1;
考慮事項:
タスクグラフは、最大1000タスクに制限されます。
単一のタスクには、最大100個の親タスクと100個の子タスクを含めることができます。
同じユーザーが管理するウェアハウスでタスクが並行して実行される場合、 コンピュートリソース は、並行タスクの実行に対応できるサイズでなければなりません。
ファイナライザタスク¶
タスクグラフの他のすべてのタスクが完了した(または完了しなかった)後に実行するファイナライザタスクをオプションで追加できます。次のような場合に使用します。
不要になった中間データのクリーンアップなど、クリーンアップのパフォーマンスを実行します。
タスクの成否に関する通知を送信します。
ファイナライザタスクを作成するには、ルートタスクで CREATE TASK ... FINALIZE ... を使います。例:
CREATE TASK task_finalizer
FINALIZE = task_root
AS SELECT 1;
考慮事項:
ファイナライザタスクは常にルートタスクと関連付けられます。各ルートタスクは1つのファイナライザタスクを持つことができ、ファイナライザタスクは1つのルートタスクにのみ関連付けることができます。
タスク・グラフのルート・タスクがスキップされると(例えば、 タスク・グラフが重複して実行されている ため)、ファイナライザ・タスクは開始されません。
ファイナライザタスクには子タスクを指定できません。
ファイナライザタスクは、現在のタスクグラフの実行中に他のタスクが実行中でないか、キューに入っていない場合にのみスケジュールされます。
その他の例については、 ファイナライザタスクの例:電子メール通知の送信 と ファイナライザタスクの例:エラーの修正 をご参照ください。
タスクグラフの所有権の管理¶
タスクグラフのすべてのタスクには、同じタスク所有者が必要で、同一のデータベースとスキーマに保存されている必要があります。
タスクグラフの全タスクの所有権は、以下のいずれかの操作で移譲できます。
DROP ROLE を使用して、タスクグラフのすべてのタスクの所有者を削除します。Snowflake は、DROPROLEコマンドを実行するロールに所有権を移します。
スキーマ内のすべてのタスクに対して GRANT OWNERSHIP を使用して、タスクグラフ内のすべてのタスクの所有権を転送します。
これらの方法でタスクグラフ内のタスクの所有権を移譲しても、タスクグラフ内のタ スクは互いの関係を保持したままです。
1つのタスクの所有権を譲渡すると、そのタスクと親タスクおよび子タスクの間の依存関係がなくなります。詳細については、 親タスクと子タスクのリンク解除 (このトピック内)をご参照ください。
注釈
レプリケーションを実行するロールとは別のロールがグラフを所有している場合、データベースのレプリケーションはタスクグラフでは機能しません。
タスクグラフでタスクを実行またはスケジュール¶
タスクグラフの手動実行¶
タスクグラフの単一のインスタンスを実行することができます。これは、本番環境でタスクグラフを有効にする前に、新しいタスクグラフや変更されたタスクグラフをテストする場合や、必要に応じて1回だけ実行する場合に便利です。
タスクグラフを開始する前に、実行に含めたい各子タスク(オプションのファイナライザタスクを含む)で ALTER TASK ... RESUME を使用します。
タスクグラフの単一のインスタンスを実行するには、ルートタスクで EXECUTE TASK を使用します。ルートタスクを実行すると、タスクグラフ内のすべての再開された子タスクが、タスクグラフで定義された順序で実行されます。
スケジュールまたはトリガータスクとしてタスクを実行¶
ルートタスクで、タスクグラフがいつ実行されるかを定義します。タスクグラフは、定期的なスケジュールで実行することも、イベントによってトリガーすることもできます。詳細については、次のトピックをご参照ください。
タスクグラフを開始するには、以下のどちらかを行います。
実行に含めたい個々の子タスク(ファイナライザを含む)を再開してから、 ALTER TASK ... RESUME を使用してルートタスクを再開します。
ルートタスクで SYSTEM$TASK_DEPENDENTS_ENABLE (<root_task_name>) を使って、タスク・グラフの全タスクを一度に再開します。
タスクグラフで依存タスクを表示する¶
ルートタスクの子タスクを表示するには、 TASK_DEPENDENTS テーブル関数を呼び出します。タスクグラフ内の すべての タスクを取得するには、関数を呼び出すときにルートタスクを入力します。
また、 Snowsight を使ってタスクグラフを管理・表示することもできます。詳細については、 Snowsight でタスクとタスクグラフを表示する をご参照ください。
タスクの変更、中断、再試行¶
タスクグラフのタスク修正¶
スケジュールされたタスクグラフのタスクを修正するには、 ALTER TASK ... SUSPEND を使ってルートタスクを一時停止します。タスクグラフの実行が進行中であれば、現在の実行を完了します。ルートタスクの今後のスケジュール実行はすべてキャンセルされます。
ルートタスクが中断されても、ファイナライザタスクを含む子タスクはその状態(中断、実行、完了)を保持します。子タスクを個別に中断する必要はありません。
ルートタスクを一時停止した後、タスクグラフのどのタスクでも修正できます。
タスクグラフを再開するには、以下のどちらかを行います。
ALTER TASK ... RESUME を使ってルートタスクを再開します。以前に実行されていた個々の子タスクを再開する必要はありません。
SYSTEM$TASK_DEPENDENTS_ENABLE を呼び出し、ルートタスクの名前を渡すことで、タスクグラフ内のすべてのタスクを一度に再開できます。
子タスクのスキップまたは中断¶
タスクグラフの子タスクをスキップするには、 ALTER TASK ... SUSPEND を使って子タスクを一時停止します。
子タスクを一時停止すると、子タスクが成功したかのようにタスクグラフが実行され続けます。複数の先行タスクを持つ子タスクは、 少なくとも1つ の先行タスクが再開状態にある限り実行され、再開されたすべての先行タスクは正常に完了します。
失敗したタスクの再試行¶
EXECUTE TASK ... RETRY LAST を使って、最後に失敗したタスクからタスクグラフの実行を試みます。タスクが成功した場合、すべての子タスクはその前のタスクが完了すると同時に実行され続けます。
自動リトライ¶
デフォルトでは、子タスクが失敗した場合、タスクグラフ全体が失敗したとみなされます。
次にスケジュールされたタスクグラフの実行まで待つのではなく、ルートタスクの TASK_AUTO_RETRY_ATTEMPTS
パラメーターをセットすることで、タスクグラフにすぐに再試行するよう指示することができます。子タスクが失敗すると、指定された回数まで、タスクグラフ全体が直ちに再試行されます。それでもタスクグラフが完了しない場合、タスクグラフは失敗したとみなされます。
失敗したタスクグラフの実行後にタスクグラフを一時停止¶
デフォルトでは、タスクグラフは10回連続して失敗すると中断されます。この値はルートタスクで SUSPEND_TASK_AFTER_NUM_FAILURES
をセットすることで変更できます。
次の例では、子タスクが失敗するたびに、タスクグラフ全体が失敗したとみなされる前に、タスクグラフは直ちに2回リトライします。タスクグラフが3回連続で失敗した場合、タスクグラフは中断されます。
CREATE OR REPLACE TASK task_root
SCHEDULE = '1 MINUTE'
TASK_AUTO_RETRY_ATTEMPTS = 2 -- Failed task graph retries up to 2 times
SUSPEND_TASK_AFTER_NUM_FAILURES = 3 -- Task graph suspends after 3 consecutive failures
AS SELECT 1;
親タスクと子タスクのリンク解除¶
タスクグラフ内のタスク間の依存関係は、次のいずれかのアクションの結果として切断される可能性があります。
ALTER TASK ... REMOVE AFTER および ALTER TASK ... UNSET FINALIZE は、ターゲットタスクと、指定された親タスクまたは完了したルートタスクの間のリンクを削除します。
DROP TASKおよびGRANT OWNERSHIPは、すべてのターゲットタスクのリンクを切断します。たとえば、ルートタスクAには子タスクBがあり、タスクBには子タスクCがあります。タスクBを削除すると、タスクAとBの間のリンクが切断され、タスクBとCの間のリンクも切断されます。
上記のアクションのいずれかの組み合わせにより、子タスクと すべて の親タスクの関係が切断された場合、子タスクはスタンドアロンタスクまたはルートタスクのいずれかになります。
注釈
タスクの所有権を現在の所有者に付与すると、依存関係リンクが切断されない場合があります。
タスクグラフの重複実行¶
デフォルトでは、Snowflakeは特定のタスクグラフの1つのインスタンスのみが一度に実行できるようにします。ルートタスクの次の実行は、タスクグラフ内のすべてのタスクの実行が終了した後にのみスケジュールされます。これは、タスクグラフ内のすべてのタスクを実行するために必要な累積時間がルートタスクの定義で設定された、明示的にスケジュールされた時間を超える場合、タスクグラフの少なくとも1つの実行がスキップされることを意味します。
子タスクが重なるようにするには、ルートタスクで CREATE TASK または ALTER TASK を使い、 ALLOW_OVERLAPPING_EXECUTION を TRUE にセットします。(ルートタスクが重なることはありません。)
タスクグラフの重複実行によって実行される読み取り/書き込み SQL 操作が誤ったデータまたは重複するデータを生成しない場合、重複する実行は許容される(または望ましい)場合があります。ただし、他のタスクグラフの場合、タスク所有者(つまり、タスクグラフ内のすべてのタスクに対して OWNERSHIP 権限を持つロール)は、ルートタスクに適切なスケジュールを設定して、適切なウェアハウス(または、サーバーレスコンピューティングリソース)サイズを選択し、ルートタスクの次回の実行予定前に、タスクグラフのインスタンスが完了するようにする必要があります。
ルートタスクで定義されたスケジュールに合わせてタスクグラフを適切に調整するには、次を実行します。
可能であれば、ルートタスクの実行間のスケジューリング時間を増やします。
コンピューティングの負荷が大きいタスクは、サーバーレスコンピューティングリソースを使用するように変更することを検討します。タスクがユーザー管理のコンピューティングリソースに依存している場合は、タスクグラフで大規模または複雑な SQL ステートメントやストアドプロシージャを実行するためのウェアハウスサイズを拡大することを検討します。
各タスクによって実行される SQL ステートメントまたはストアドプロシージャを分析します。並列処理を活用するためにコードを書き換えられるかどうかを判断します。
上記の解決策のいずれも役に立たない場合は、ルートタスクで ALLOW_OVERLAPPING_EXECUTION = TRUE を設定して、タスクグラフの同時実行を許可する必要があるかどうかを検討してください。タスクの作成時に(CREATE TASK を使用)、または後で(ALTER TASK を使用、または Snowsight で)、このパラメーターを定義できます。
バージョン管理¶
タスクグラフのルートタスクが再開または手動で実行されると、Snowflakeは、タスクグラフ内のすべてのタスクのすべてのプロパティを含む、タスクグラフ全体のバージョンを設定します。タスクが中断および変更された後、ルートタスクが再開されるか、手動で実行されると、Snowflakeが新しいバージョンが設定します。
タスクグラフ内にある任意のタスクを変更または再作成するには、最初にルートタスクを中断する必要があります。ルートタスクが中断されると、将来のスケジュールされたルートタスクの実行はすべてキャンセルされます。ただし、現在実行中のタスクがある場合、これらのタスクと子孫タスクは、現在のバージョンを使用して引き続き実行されます。
注釈
タスクグラフの実行中にタスクによって呼び出されるストアドプロシージャの定義が変更された場合、現在実行中のタスクによってストアドプロシージャが呼び出されたときに、新しいプログラミングを実行できます。
たとえば、タスクグラフのルートタスクが中断されているが、このタスクのスケジュールされた実行がすでに開始されているとします。タスクグラフのすべてのタスクの所有者は、ルートタスクの実行中に子タスクによって呼び出される SQL コードを変更します。子タスクが実行され、ルートタスクが実行を開始したときに最新だったバージョンのタスクグラフを使用して、定義内の SQL コードが実行されます。ルートタスクが再開されるか、手動で実行されると、タスクグラフの新しいバージョンが設定されます。この新しいバージョンには、子タスクへの変更が含まれています。
タスクバージョンの履歴を取得するには、 TASK_VERSIONS Account Usageビュー (SNOWFLAKE 共有データベース内)をクエリします。
タスクグラフの期間¶
タスクグラフの期間には、ルートタスクの開始スケジュールから最後の子タスクが完了するまでの時間が含まれます。タスクグラフの期間を計算するには、 COMPLETE_TASK_GRAPHS ビュー にクエリし、 SCHEDULED_TIME と COMPLETED_TIME を比較します。
例えば、次の図は、1分ごとに実行するようにスケジュールされたタスクグラフです。ルート・タスクとその2つの子タスクはそれぞれ5秒間キューに入れられ、10秒間実行されるため、完了までに合計45秒を要します。
タスクグラフのタイムアウト¶
ルートタスクに USER_TASK_TIMEOUT_MS がセットされると、タイムアウトはタスクグラフ全体に適用されます。
子タスクまたはファイナライザタスクに USER_TASK_TIMEOUT_MS がセットされると、タイムアウトはそのタスクのみに適用されます。
ルートタスクと子タスクの両方で USER_TASK_TIMEOUT_MS がセットされている場合、子タスクのタイムアウトがルートタスクのタイムアウトを上書きします。
考慮事項¶
サーバーレスタスクの場合、Snowflakeは自動的にリソースをスケーリングし、キュー時間を含むターゲット完了間隔内でタスクが完了するようにします。
ユーザー管理タスクの場合、共有ウェアハウスや混雑しているウェアハウスでタスクが実行されるスケジュールでは、キュー期間が長くなるのが一般的です。
タスクグラフの場合、総時間には、子タスクが前タスクの完了を待つための追加のキュー時間が含まれるかもしれません。
ロジック(ランタイム情報、構成、戻り値)を含むタスクグラフの作成¶
タスクグラフのタスクは親タスクの戻り値を使用して、関数本文でロジックベースのオペレーションを実行することができます。
考慮事項:
SYSTEM$GET_PREDECESSOR_RETURN_VALUE のように、ロジックベースのコマンドには大文字と小文字を区別するものがあります。しかし、 CREATE TASK を使って引用符なしで作成されたタスクは、 大文字で保存され、解決されます。これを管理するには、次のいずれかを行います:
タスク名は大文字のみで作成してください。
タスクの名前や呼び出しには引用符を使用してください。
小文字で定義されたタスク名の場合は、大文字を使ってタスクを呼び出します。例: "CREATE TASK task_c..." で定義されたタスクは、 SELECT SYSTEM$GET_PREDECESSOR_RETURN_VALUE('TASK_C') のように呼び出すことができます。
タスクグラフに構成情報を渡します。¶
JSON オブジェクトを使って構成情報を渡すことができます。このオブジェクトはタスクグラフ内の他のタスクから読み取ることができます。CREATE/ALTER TASK ... CONFIG の構文を使用して、ルートタスクの構成情報をセット、解除、修正します。関数 SYSTEM$GET_TASK_GRAPH_CONFIG を使って取得します。例:
CREATE OR REPLACE TASK "task_root"
SCHEDULE = '1 MINUTE'
USER_TASK_TIMEOUT_MS = 60000
CONFIG='{"environment": "production", "path": "/prod_directory/"}'
AS SELECT 1;
CREATE OR REPLACE TASK "task_a"
USER_TASK_TIMEOUT_MS = 600000
AFTER "task_root"
AS
BEGIN
LET VALUE := (SELECT SYSTEM$GET_TASK_GRAPH_CONFIG('path'));
CREATE TABLE IF NOT EXISTS demo_table(NAME VARCHAR, VALUE VARCHAR);
INSERT INTO demo_table VALUES('task c path',:value);
END;
タスク間の戻り値の受け渡し¶
タスクグラフのタスク間で戻り値を渡すことができます。関数 SYSTEM$SET_RETURN_VALUE を使ってタスクから戻り値を追加し、関数 SYSTEM$GET_PREDECESSOR_RETURN_VALUE を使って戻り値を取得します。
タスクが複数の先行タスクを持っている場合、どのタスクが欲しい返り値を持っているかを指定する必要があります。次の例では、構成情報を追加するタスクグラフのルートタスクを作成します。
CREATE OR REPLACE TASK "task_c"
SCHEDULE = '1 MINUTE'
USER_TASK_TIMEOUT_MS = 60000
AS
BEGIN
CALL SYSTEM$SET_RETURN_VALUE('task_c successful');
END;
CREATE OR REPLACE TASK "task_d"
USER_TASK_TIMEOUT_MS = 60000
AFTER "task_c"
AS
BEGIN
LET VALUE := (SELECT SYSTEM$GET_PREDECESSOR_RETURN_VALUE('task_c'));
CREATE TABLE IF NOT EXISTS demo_table(NAME VARCHAR, VALUE VARCHAR);
INSERT INTO demo_table VALUES('Value from predecessor task_c', :value);
END;
ランタイム情報の取得と使用¶
現在のタスク実行に関する情報を報告するには、関数 SYSTEM$TASK_RUNTIME_INFO を使用します。この関数にはタスクグラフ特有のオプションがいくつかあります。例えば、 CURRENT_ROOT_TASK_NAME を使って、現在のタスクグラフのルートタスクの名前を取得します。以下の例では、タスクグラフのルートタスクがいつ開始したかに基づいて、 テーブルに日付スタンプを追加する方法を示します。
-- Updates the date/time table after the root task completes.
CREATE OR REPLACE TASK "task_date_time_table"
USER_TASK_TIMEOUT_MS = 60000
AFTER "task_root"
AS
BEGIN
LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'));
INSERT INTO date_time_table VALUES('order_date',:value);
END;
例¶
例複数のタスクの開始とステータス報告¶
次の例では、ルートタスクが3つの異なるテーブルを更新するタスクを開始します。これら3つのテーブルが更新された後、タスクは他の3つのテーブルの情報を統合し、売上集計テーブルを作成します。
-- Create a notebook in the public schema
-- USE DATABASE <database name>;
-- USE SCHEMA <schema name>;
-- task_a: Root task. Starts the task graph and sets basic configurations.
CREATE OR REPLACE TASK task_a
SCHEDULE = '1 MINUTE'
TASK_AUTO_RETRY_ATTEMPTS = 2
SUSPEND_TASK_AFTER_NUM_FAILURES = 3
USER_TASK_TIMEOUT_MS = 60000
CONFIG='{"environment": "production", "path": "/prod_directory/"}'
AS
BEGIN
CALL SYSTEM$SET_RETURN_VALUE('task_a successful');
END;
;
-- task_customer_table: Updates the customer table.
-- Runs after the root task completes.
CREATE OR REPLACE TASK task_customer_table
USER_TASK_TIMEOUT_MS = 60000
AFTER task_a
AS
BEGIN
LET VALUE := (SELECT customer_id FROM ref_cust_table
WHERE cust_name = "Jane Doe";);
INSERT INTO customer_table VALUES('customer_id',:value);
END;
;
-- task_product_table: Updates the product table.
-- Runs after the root task completes.
CREATE OR REPLACE TASK task_product_table
USER_TASK_TIMEOUT_MS = 60000
AFTER task_a
AS
BEGIN
LET VALUE := (SELECT product_id FROM ref_item_table
WHERE PRODUCT_NAME = "widget";);
INSERT INTO product_table VALUES('product_id',:value);
END;
;
-- task_date_time_table: Updates the date/time table.
-- Runs after the root task completes.
CREATE OR REPLACE TASK task_date_time_table
USER_TASK_TIMEOUT_MS = 60000
AFTER task_a
AS
BEGIN
LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'));
INSERT INTO "date_time_table" VALUES('order_date',:value);
END;
;
-- task_sales_table: Aggregates changes from other tables.
-- Runs only after updates are complete to all three other tables.
CREATE OR REPLACE TASK task_sales_table
USER_TASK_TIMEOUT_MS = 60000
AFTER task_customer_table, task_product_table, task_date_time_table
AS
BEGIN
LET VALUE := (SELECT sales_order_id FROM ORDERS);
JOIN CUSTOMER_TABLE ON orders.customer_id=customer_table.customer_id;
INSERT INTO sales_table VALUES('sales_order_id',:value);
END;
;
ファイナライザタスクの例:電子メール通知の送信¶
この例では、ファイナライザタスクがタスクグラフのパフォーマンスを要約したメールを送信する方法を示しています。タスクは2つの外部関数を呼び出します。1つはタスクの完了ステータスに関する情報を集約し、もう1つはその情報を使ってリモートメッセージングサービスで送信できるメールを作成します。
CREATE OR REPLACE TASK notify_finalizer
USER_TASK_TIMEOUT_MS = 60000
FINALIZE = task_root
AS
DECLARE
my_root_task_id STRING;
my_start_time TIMESTAMP_LTZ;
summary_json STRING;
summary_html STRING;
BEGIN
--- Get root task ID
my_root_task_id := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_UUID'));
--- Get root task scheduled time
my_start_time := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP')::timestamp_ltz);
--- Combine all task run info into one JSON string
summary_json := (SELECT get_task_graph_run_summary(:my_root_task_id, :my_start_time));
--- Convert JSON into HTML table
summary_html := (SELECT HTML_FROM_JSON_TASK_RUNS(:summary_json));
--- Send HTML to email
CALL SYSTEM$SEND_EMAIL(
'email_notification',
'admin@snowflake.com',
'notification task run summary',
:summary_html,
'text/html');
--- Set return value for finalizer
CALL SYSTEM$SET_RETURN_VALUE('✅ Graph run summary sent.');
END
CREATE OR REPLACE FUNCTION get_task_graph_run_summary(my_root_task_id STRING, my_start_time TIMESTAMP_LTZ)
RETURNS STRING
AS
$$
(SELECT
ARRAY_AGG(OBJECT_CONSTRUCT(
'task_name', name,
'run_status', state,
'return_value', return_value,
'started', query_start_time,
'duration', duration,
'error_message', error_message
)
) AS GRAPH_RUN_SUMMARY
FROM
(SELECT
NAME,
CASE
WHEN STATE = 'SUCCEED' then '🟢 Succeeded'
WHEN STATE = 'FAILED' then '🔴 Failed'
WHEN STATE = 'SKIPPED' then '🔵 Skipped'
WHEN STATE = 'CANCELLED' then '🔘 Cancelled'
END AS STATE,
RETURN_VALUE,
TO_VARCHAR(QUERY_START_TIME, 'YYYY-MM-DD HH24:MI:SS') AS QUERY_START_TIME,
CONCAT(TIMESTAMPDIFF('seconds', query_start_time, completed_time),
' s') AS DURATION,
ERROR_MESSAGE
FROM
TABLE(my-database.information_schema.task_history(
ROOT_TASK_ID => my_root_task_id ::STRING,
SCHEDULED_TIME_RANGE_START => my_start_time,
SCHEDULED_TIME_RANGE_END => current_timestamp()
))
ORDER BY
SCHEDULED_TIME)
)::STRING
$$
;
CREATE OR REPLACE FUNCTION HTML_FROM_JSON_TASK_RUNS(JSON_DATA STRING)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
HANDLER = 'GENERATE_HTML_TABLE'
AS
$$
IMPORT JSON
def GENERATE_HTML_TABLE(JSON_DATA):
column_widths = ["320px", "120px", "400px", "160px", "80px", "480px"]
DATA = json.loads(JSON_DATA)
HTML = f"""
<img src="https://example.com/logo.jpg"
alt="Company logo" height="72">
<p><strong>Task Graph Run Summary</strong>
<br>Sign in to Snowsight to see more details.</p>
<table border="1" style="border-color:#DEE3EA"
cellpadding="5" cellspacing="0">
<thead>
<tr>
"""
headers = ["Task name", "Run status", "Return value", "Started", "Duration", "Error message"]
for i, header in enumerate(headers):
HTML += f'<th scope="col" style="text-align:left;
width: {column_widths[i]}">{header.capitalize()}</th>'
HTML +="""
</tr>
</thead>
<tbody>
"""
for ROW_DATA in DATA:
HTML += "<tr>"
for header in headers:
key = header.replace(" ", "_").upper()
CELL_DATA = ROW_DATA.get(key, "")
HTML += f'<td style="text-align:left;
width: {column_widths[headers.index(header)]}">{CELL_DATA}</td>'
HTML += "</tr>"
HTML +="""
</tbody>
</table>
"""
return HTML
$$
;
ファイナライザタスクの例:エラーの修正¶
この例は、ファイナライザータスクがエラーを修正する方法を示しています。
デモンストレーションのため、タスクは最初の実行時に失敗するように設計されています。ファイナライザ・タスクは問題を修正し、タスクを再起動します。
-- Configuration
-- By default, the notebook creates the objects in the public schema.
-- USE DATABASE <database name>;
-- USE SCHEMA <schema name>;
-- 1. Set the default configurations.
-- Creates a root task ("task_a"), and sets the default configurations
-- used throughout the task graph.
-- Configurations include:
-- * Each task runs after one minute, with a 60-second timeout.
-- * If a task fails, retry it twice. if it fails twice,
-- the entire task graph is considered as failed.
-- * If the task graph fails consecutively three times, suspend the task.
-- * Other environment values are set.
CREATE OR REPLACE TASK task_a
SCHEDULE = '1 MINUTE'
USER_TASK_TIMEOUT_MS = 60000
TASK_AUTO_RETRY_ATTEMPTS = 2
SUSPEND_TASK_AFTER_NUM_FAILURES = 3
AS
BEGIN
CALL SYSTEM$SET_RETURN_VALUE('task a successful');
END;
;
-- 2. Use a runtime reflection variable.
-- Creates a child task ("task_b").
-- By design, this example fails the first time it runs, because
-- it writes to a table ("demo_table") that doesn’t exist.
CREATE OR REPLACE TASK task_b
USER_TASK_TIMEOUT_MS = 60000
AFTER task_a
AS
BEGIN
LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('current_task_name'));
INSERT INTO demo_table VALUES('task b name',:VALUE);
END;
;
-- 3. Get a task graph configuration value.
-- Creates the child task ("task_c").
-- By design, this example fails the first time it runs, because
-- the predecessor task ("task_b") fails.
CREATE OR REPLACE TASK task_c
USER_TASK_TIMEOUT_MS = 60000
AFTER task_b
AS
BEGIN
CALL SYSTEM$GET_TASK_GRAPH_CONFIG('path');
LET VALUE := (SELECT SYSTEM$GET_TASK_GRAPH_CONFIG('path'));
INSERT INTO demo_table VALUES('task c path',:value);
END;
;
-- 4. Get a value from a predecessor.
-- Creates the child task ("task_d").
-- By design, this example fails the first time it runs, because
-- the predecessor task ("task_c") fails.
CREATE OR REPLACE TASK task_d
USER_TASK_TIMEOUT_MS = 60000
AFTER task_c
AS
BEGIN
LET VALUE := (SELECT SYSTEM$GET_PREDECESSOR_RETURN_VALUE('TASK_A'));
INSERT INTO demo_table VALUES('task d: predecessor return value', :value);
END;
;
-- 5. Create the finalizer task ("task_f"), which creates the missing demo table.
-- After the finalizer completes, the task should automatically retry
-- (see task_a: task_auto_retry_attempts).
-- On retry, task_b, task_c, and task_d should complete successfully.
CREATE OR REPLACE TASK task_f
USER_TASK_TIMEOUT_MS = 60000
FINALIZE = task_a
AS
BEGIN
CREATE TABLE IF NOT EXISTS demo_table(NAME VARCHAR, VALUE VARCHAR);
END;
;
-- 6. Resume the finalizer. Upon creation, tasks start in a suspended state.
-- Use this command to resume the finalizer.
ALTER TASK task_f RESUME;
SELECT SYSTEM$TASK_DEPENDENTS_ENABLE('task_a');
-- 7. Query the task history
SELECT
name, state, attempt_number, scheduled_from
FROM
TABLE(information_schema.task_history(task_name=> 'task_b'))
LIMIT 5;
;
-- 8. Suspend the task graph to stop incurring costs
-- Note: To stop the task graph, you only need to suspend the root task
-- (task_a). Child tasks don’t run unless the root task is run.
-- If any child tasks are running, they have a limited duration
-- and will end soon.
ALTER TASK task_a SUSPEND;
DROP TABLE demo_table;
-- 9. Check tasks during execution (optional)
-- Run this command to query the demo table during execution
-- to check which tasks have run.
SELECT * FROM demo_table;
-- 10. Demo reset (optional)
-- Run this command to remove the demo table.
-- This causes task_b to fail during its first run.
-- After the task graph retries, task_b will succeed.
DROP TABLE demo_table;