PythonによるSnowflakeタスクとタスクグラフの管理¶
Pythonを使用してSnowflakeタスクを管理し、 SQL ステートメント、プロシージャ呼び出し、および Snowflakeスクリプト のロジックを実行できます。タスクの概要については、 タスクの紹介 をご参照ください。
Snowflake Python Snowflake Python APIs は、2つの別々のタイプでタスクを表します。
Task: スケジュール、パラメーター、先行タスクなどのタスクのプロパティを公開します。TaskResource: 対応するTaskオブジェクトのフェッチ、タスクの実行、タスクの変更に使用できるメソッドを公開します。
前提条件¶
このトピックの例では、Snowflakeと接続するコードを追加して Root オブジェクトを作成し、そこからSnowflake Python Snowflake Python APIs を使用することを想定しています。
たとえば、以下のコードでは、構成ファイルで定義された接続パラメーターを使用してSnowflakeへの接続を作成します。
出来上がった Session オブジェクトを使って、コードは API のタイプとメソッドを使う Root オブジェクトを作成します。詳細については、 Snowflake Python APIs によるSnowflakeへの接続 をご参照ください。
タスクの作成¶
タスクを作成するには、まず Task オブジェクトを作成します。次に、タスクを作成するデータベースとスキーマを指定して TaskCollection オブジェクトを作成します。 TaskCollection.create を使用して、新しいタスクをSnowflakeに追加します。
次の例のコードは、 definition パラメーターで指定された SQL クエリを実行する my_task というタスクを表す Task オブジェクトを作成します。
このコードは、 my_db データベースと my_schema スキーマから TaskCollection 変数 tasks を作成します。 TaskCollection.create を使用して、Snowflakeに新しいタスクを作成します。
このコード例では、タスクのスケジュールに1時間の timedelta 値も指定します。タスクのスケジュールは timedelta 値または Cron 式のいずれかを使用して定義できます。
Python関数やストアドプロシージャを実行するタスクを作成することもできます。次の例のコードは、 my_task2 というタスクを作成し、 StoredProcedureCall オブジェクトで表される関数を実行します。
このオブジェクトは dosomething という名前の関数を @mystage ステージのロケーションに指定します。 StoredProcedureCall オブジェクトでタスクを作成する場合は、 warehouse も指定する必要があります。
タスクの作成または更新¶
Task オブジェクトのプロパティを設定し、 TaskResource.create_or_alter メソッドに渡すことで、タスクが存在しない場合は作成し、存在する場合はタスク定義に従って変更することができます。 create_or_alter の動作はべき等であることを意図しています。つまり、メソッドを呼び出す前にタスクが存在したかどうかにかかわらず、結果として得られるタスクオブジェクトは同じになります。
注釈
create_or_alter メソッドは、明示的に定義していない Task プロパティのデフォルト値を使用します。例えば、 schedule を設定しない場合、そのタスクが以前別の値で存在していたとしても、その値はデフォルトで None になります。
次の例のコードは、 my_task タスクの定義とスケジュールを更新し、Snowflake 上のタスクを変更します。
タスクのリスト¶
TaskCollection.iter メソッドを使用してタスクを一覧表示できます。このメソッドは Task オブジェクトの PagedIter 反復子を返します。
次の例のコードは、名前が my で始まるタスクをリストします。
タスク操作の実行¶
タスクの実行、中断、再開といった一般的なタスク操作を TaskResource オブジェクトで行うことができます。
次の例のコードは、 my_task タスクを実行、中断、再開、ドロップします。
タスクグラフでのタスクの管理¶
タスクグラフで集められたタスクを管理することができます。タスクグラフは、依存関係によってまとめられた単一のルートタスクと追加のタスクで構成された、一連のタスクです。
タスクグラフ内のタスクに関する詳細については、 タスクグラフで一連のタスクを作成 をご参照ください。
タスクグラフの作成¶
タスクグラフを作成するには、まず DAG オブジェクトを作成し、名前とスケジュールなどのオプションのプロパティを指定します。タスクグラフのスケジュールは、 timedelta 値または Cron 式のいずれかを使用して定義できます。
次の例のコードは、Python関数 dosomething を定義し、タスクグラフで dag_task2 という名前の DAGTask オブジェクトとして関数を指定します。
このコードはまた、 SQL ステートメントを dag_task1 という別の DAGTask オブジェクトとして定義してから、 dag_task1 を dag_task2 の先行タスクとして指定します。最後に、 my_db データベースと my_schema スキーマでタスクグラフをSnowflakeに展開します。
cronスケジュール、タスクブランチ、関数の戻り値でタスクグラフを作成します。¶
また、タスクの戻り値として使用される指定されたcronスケジュール、タスクブランチ、関数の戻り値を持つタスクグラフを作成することもできます。
次の例のコードでは、 DAG オブジェクトを、そのスケジュールを指定する Cron オブジェクトで作成します。これは、 DAGTaskBranch オブジェクトを task1_branch という名前で他の DAGTask オブジェクトと一緒に定義し、それらの依存関係を指定します。
このコード例は、タスクハンドラー関数も定義し、タスクに割り当てられた指定のタスクハンドラーを持つ DAGTask と DAGTaskBranch の各オブジェクトを作成します。コードは DAG の use_func_return_value パラメーターを True に設定します。これはPython関数の戻り値を対応するタスクの戻り値として使うことを指定します。それ以外の場合、 use_func_return_value のデフォルト値は False となります。
タスクグラフにおけるタスクの戻り値の設定と取得¶
タスクの定義が StoredProcedureCall オブジェクトである場合、ストアドプロシージャ(または関数)のハンドラーは TaskContext オブジェクトを使用することで、タスクの戻り値を明示的に設定することができます。
詳細については、 SYSTEM$SET_RETURN_VALUE をご参照ください。
次の例のコードは、現在のセッションから TaskContext オブジェクトを context という名前で作成するタスクハンドラー関数を定義します。次に、 TaskContext.set_return_value メソッドを使用して、指定された文字列に戻り値を明示的に設定します。
タスクグラフで、直前のタスクをその先行タスクとして識別する直後のタスクは、先行タスクによって明示的に設定された戻り値を取得できます。
詳細については、 SYSTEM$GET_PREDECESSOR_RETURN_VALUE をご参照ください。
次の例のコードは、 TaskContext.get_predecessor_return_value メソッドを使用して pred_task_name という先行タスクの戻り値を取得するタスクハンドラー関数を定義します。