チュートリアル 2: タスクとタスクグラフの作成と管理 (DAGs)

概要

このチュートリアルでは、Snowflake タスクを作成して使用し、いくつかの基本的なストアド プロシージャを管理します。また、タスクグラフ(有向非巡回グラフ(DAG)とも呼ばれる)を作成し、上位レベルのタスクグラフ(API)でタスクをオーケストレーションすることもできます。

前提条件

注釈

すでに Snowflake Python APIs チュートリアルの共通セットアップチュートリアル1:データベース、スキーマ、テーブル、ウェアハウスの作成 の両方を完了している場合は、これらの前提条件をスキップして、このチュートリアルの最初のステップに進むことができます。

このチュートリアルを始める前に、以下のステップを完了する必要があります。

  1. 以下の 共通セットアップ の手順に従ってください。

    • 開発環境を設定します。

    • Snowflake Python APIs パッケージをインストールします。

    • Snowflake接続を設定します。

    • Python API チュートリアルに必要なすべてのモジュールをインポートします。

    • API Root オブジェクトを作成します。

  2. 以下のコードを実行して、 PYTHON_API_DB という名前のデータベースと、そのデータベース内に PYTHON_API_SCHEMA という名前のスキーマを作成します。

    database = root.databases.create(
      Database(
        name="PYTHON_API_DB"),
        mode=CreateMode.or_replace
      )
    
    schema = database.schemas.create(
      Schema(
        name="PYTHON_API_SCHEMA"),
        mode=CreateMode.or_replace,
      )
    
    Copy

    これらは、 チュートリアル1 で作成したものと同じデータベースとスキーマ・オブジェクトです。

これらの前提条件が完了したら、 API を使ってタスク管理を始める準備ができたことになります。

Snowflake オブジェクトの設定

タスクが呼び出すストアドプロシージャと、ストアドプロシージャを保持するステージを設定します。 Snowflake Python APIs root オブジェクトを使って、以前に作成した PYTHON_API_DB データベースと PYTHON_API_SCHEMA スキーマにステージを作成することができます。

  1. TASKS_STAGE という名前のステージを作るには、ノートブックの次のセルに次のコードを実行します。

    stages = root.databases[database.name].schemas[schema.name].stages
    stages.create(Stage(name="TASKS_STAGE"))
    
    Copy

    このステージは、ストアドプロシージャと、それらのプロシージャが必要とする依存関係を保持します。

  2. タスクがストアドプロシージャとして実行する2つの基本的なPython関数を作成するために、次のセルで以下のコードを実行します。

    def trunc(session: Session, from_table: str, to_table: str, count: int) -> str:
      (
        session
        .table(from_table)
        .limit(count)
        .write.save_as_table(to_table)
      )
      return "Truncated table successfully created!"
    
    def filter_by_shipmode(session: Session, mode: str) -> str:
      (
        session
        .table("snowflake_sample_data.tpch_sf100.lineitem")
        .filter(col("L_SHIPMODE") == mode)
        .limit(10)
        .write.save_as_table("filter_table")
      )
      return "Filter table successfully created!"
    
    Copy

    これらの機能は以下のようなものです。

    • trunc(): 入力テーブルの切り捨て版を作成します。

    • filter_by_shipmode(): SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.LINEITEM テーブルをシップモードでフィルタリングし、結果を10行に制限し、結果を新しいテーブルに書き込みます。

      注釈

      この関数は、 SNOWFLAKE_SAMPLE_DATA データベース内の TPC-H サンプルデータ をクエリします。Snowflakeはデフォルトで新規アカウントにサンプルデータベースを作成します。アカウントにデータベースが作成されていない場合は、 サンプルデータベースの使用 をご参照ください。

    関数は意図的に基本的なものであり、デモンストレーションを目的としています。

タスクの作成と管理

以前に作成したPython関数をストアドプロシージャとして実行する2つのタスクを定義、作成、管理します。

  1. ノートブックの次のセルに2つのタスク、 task1task2 を定義するには、以下のコードを実行します。

    tasks_stage = f"{database.name}.{schema.name}.TASKS_STAGE"
    
    task1 = Task(
        name="task_python_api_trunc",
        definition=StoredProcedureCall(
          func=trunc,
          stage_location=f"@{tasks_stage}",
          packages=["snowflake-snowpark-python"],
        ),
        warehouse="COMPUTE_WH",
        schedule=timedelta(minutes=1)
    )
    
    task2 = Task(
        name="task_python_api_filter",
        definition=StoredProcedureCall(
          func=filter_by_shipmode,
          stage_location=f"@{tasks_stage}",
          packages=["snowflake-snowpark-python"],
        ),
        warehouse="COMPUTE_WH"
    )
    
    Copy

    このコードでは、以下のタスク・パラメーターを指定します。

    • 各タスクについて、以下の属性を含む StoredProcedureCall オブジェクトで表される定義:

      • 実行する呼び出し可能な関数

      • Python関数のコンテンツとその依存関係がアップロードされるステージの場所。

      • ストアドプロシージャのパッケージ依存関係

    • ストアドプロシージャを実行するウェアハウス(StoredProcedureCall オブジェクトでタスクを作成する場合に必要)。このチュートリアルでは、トライアルアカウントに含まれる COMPUTE_WH ウェアハウスを使用します。

    • ルートタスクの実行スケジュール、 task1 。スケジュールは、タスクを定期的に実行する間隔を指定します。

    ストアドプロシージャの詳細については、 Pythonでのストアドプロシージャの記述 をご参照ください。

  2. 2つのタスクを作成するには、データベーススキーマから TaskCollection オブジェクト(tasks)を取り出し、タスクコレクションで .create() を呼び出します。

    # create the task in the Snowflake database
    tasks = schema.tasks
    trunc_task = tasks.create(task1, mode=CreateMode.or_replace)
    
    task2.predecessors = [trunc_task.name]
    filter_task = tasks.create(task2, mode=CreateMode.or_replace)
    
    Copy

    このコード例では、 task1task2 の前任者に設定することで、タスクをリンクし、最小のタスクグラフを作成しています。

  3. 2つのタスクが存在することを確認するために、次のセルで次のコードを実行します。

    taskiter = tasks.iter()
    for t in taskiter:
        print(t.name)
    
    Copy
  4. タスクを作成すると、そのタスクはデフォルトで一時停止されます。

    タスクを開始するには、タスク・リソース・オブジェクトで .resume() を呼び出します。

    trunc_task.resume()
    
    Copy
  5. trunc_task タスクが開始されたことを確認するために、次のセルで以下のコードを実行します。

    taskiter = tasks.iter()
    for t in taskiter:
        print("Name: ", t.name, "| State: ", t.state)
    
    Copy

    出力はこのようになるはずです。

    Name:  TASK_PYTHON_API_FILTER | State:  suspended
    Name:  TASK_PYTHON_API_TRUNC | State:  started
    

    タスクのステータスを確認したいときは、いつでもこのステップを繰り返すことができます。

  6. タスク・リソースをクリーンアップするには、まずタスクを一時停止してからドロップします。

    次のセルで次のコードを実行します。

    trunc_task.suspend()
    
    Copy
  7. タスクが一時停止されたことを確認するには、ステップ5を繰り返します。

  8. 任意: 両方のタスクを削除するには、次のセルで以下のコードを実行します。

    trunc_task.drop()
    filter_task.drop()
    
    Copy

タスクグラフの作成と管理

多数のタスクの実行を調整する場合、各タスクを個別に管理するのは難しいです。 Snowflake Python APIs は、より高いレベルのタスクグラフ API でタスクをオーケストレートする機能を提供します。

タスクグラフ、または有向非巡回グラフ(DAG)は、ルートタスクと子タスクで構成され、依存関係によって編成された一連のタスクです。詳細については、 タスクグラフでタスクの依存関係を管理する をご参照ください。

  1. タスク・グラフを作成してデプロイするには、以下のコードを実行します。

    dag_name = "python_api_dag"
    dag = DAG(name=dag_name, schedule=timedelta(days=1))
    with dag:
        dag_task1 = DAGTask(
            name="task_python_api_trunc",
            definition=StoredProcedureCall(
                func=trunc,
                stage_location=f"@{tasks_stage}",
                packages=["snowflake-snowpark-python"]),
            warehouse="COMPUTE_WH",
        )
        dag_task2 = DAGTask(
            name="task_python_api_filter",
            definition=StoredProcedureCall(
                func=filter_by_shipmode,
                stage_location=f"@{tasks_stage}",
                packages=["snowflake-snowpark-python"]),
            warehouse="COMPUTE_WH",
        )
        dag_task1 >> dag_task2
    dag_op = DAGOperation(schema)
    dag_op.deploy(dag, mode=CreateMode.or_replace)
    
    Copy

    このコードでは次のように行います。

    • DAG コンストラクターを呼び出し、名前とスケジュールを指定してタスクグラフオブジェクトを作成します。

    • DAGTask コンストラクターを使って、タスクグラフ指定のタスクを定義します。コンストラクターは、前のステップで StoredProcedureCall クラスに指定したのと同じ引数を受け付けることに注意してください。

    • dag_task1 をルート・タスクとして指定し、より便利な構文で dag_task2 の前任者とします。

    • PYTHON_API_DB データベースの PYTHON_API_SCHEMA スキーマにタスクグラフをデプロイします。

  2. タスク・グラフの作成を確認するために、次のセルで次のコードを実行します。

    taskiter = tasks.iter()
    for t in taskiter:
        print("Name: ", t.name, "| State: ", t.state)
    
    Copy

    タスクのステータスを確認したいときは、いつでもこのステップを繰り返すことができます。

  3. ルート・タスクを開始してタスク・グラフを開始するには、次のセルで次のコードを実行します。

    dag_op.run(dag)
    
    Copy
  4. PYTHON_API_DAG$TASK_PYTHON_API_TRUNC タスクの開始を確認するには、ステップ2を繰り返します。

    注釈

    タスクグラフによって呼び出される関数呼び出しは、必要な引数とともに呼び出していないため、成功しません。この手順の目的は、タスクグラフをプログラムで開始する方法を示すことのみです。

  5. タスク・グラフをドロップするには、次のセルで次のコードを実行します。

    dag_op.drop(dag)
    
    Copy
  6. これらのチュートリアルで作成したデータベースオブジェクトをクリーンアップします。

    database.drop()
    
    Copy

次の内容

おめでとうございます。このチュートリアルでは、 Snowflake Python APIs を使ってタスクとタスクグラフを作成、管理する方法を学びました。

概要

その過程で、あなたは以下のタスクを完了しました。

  • ストアドプロシージャとその依存関係を保持できるステージを作成します。

  • タスクを作成して管理します。

  • タスクグラフを作成して管理します。

  • Snowflakeのリソースオブジェクトを削除してクリーンアップします。

次のチュートリアル

次に、 チュートリアル3:Snowparkコンテナサービスの作成と管理 に進み、 Snowpark Container Services でコンポーネントを作成し管理する方法を説明します。

追加のリソース

API を使用して Snowflake で他のタイプのオブジェクトを管理する例については、以下の開発者ガイドを参照してください。

ガイド

説明

PythonによるSnowflakeデータベース、スキーマ、テーブル、ビューの管理

データベース、スキーマ、テーブルの作成と管理には API を使用します。

PythonでSnowflakeのユーザー、ロール、付与を管理する

API を使用して、ユーザー、ロール、およびグラントを作成および管理します。

Pythonでのデータのロードとアンロードのリソースの管理

API を使用して、外部ボリューム、パイプ、ステージなど、データのロードとアンロードのリソースを作成および管理します。

PythonによるSnowpark Container Services(サービス関数を含む)の管理

API を使用して、コンピューティングプール、イメージリポジトリ、サービスなど、Snowpark Container Servicesのコンポーネントを管理します。