Tutorial 2: Aufgaben und Diagramme (DAGs) erstellen und verwalten¶
Einführung¶
In diesem Tutorial erstellen und verwenden Sie Snowflake-Aufgaben, um einige grundlegende gespeicherte Prozeduren zu verwalten. Sie erstellen auch einen Task-Graph, auch Directed Acyclic Graph genannt (DAG), um Aufgaben (Tasks) mit einer übergeordneten Task-Graph-API zu orchestrieren.
Voraussetzungen¶
Bemerkung
Wenn Sie sowohl Gemeinsame Einrichtung für Snowflake Python APIs-Tutorials als auch Tutorial 1: Eine Datenbank, ein Schema, eine Tabelle und ein Warehouse erstellen bereits abgeschlossen haben, können Sie diese Voraussetzungen überspringen und mit dem ersten Schritt dieses Tutorials fortfahren.
Bevor Sie mit diesem Tutorial beginnen, müssen Sie die folgenden Schritte ausführen:
Befolgen Sie die Anweisungen von common setup, die die folgenden Schritte umfassen:
Richten Sie Ihre Entwicklungsumgebung ein.
Installieren Sie das Snowflake Python APIs-Paket.
Konfigurieren Sie Ihre Snowflake-Verbindung.
Importieren Sie alle Module, die für die Python API-Tutorials erforderlich sind.
Erstellen Sie ein API
Root
-Objekt.
Führen Sie den folgenden Code aus, um eine Datenbank namens
PYTHON_API_DB
und ein Schema namensPYTHON_API_SCHEMA
in dieser Datenbank zu erstellen.database = root.databases.create( Database( name="PYTHON_API_DB"), mode=CreateMode.or_replace ) schema = database.schemas.create( Schema( name="PYTHON_API_SCHEMA"), mode=CreateMode.or_replace, )
Dies sind dieselben Datenbank- und Schema-Objekte, die Sie in Tutorial 1 erstellen.
Sobald Sie diese Voraussetzungen erfüllen, können Sie API für die Verwaltung von Aufgaben verwenden.
Einrichten von Snowflake-Objekten¶
Richten Sie die gespeicherten Prozeduren ein, die Ihre Aufgaben aufrufen werden, sowie den Stagingbereich, in dem die gespeicherten Prozeduren gespeichert werden sollen. Sie können Ihr Snowflake Python APIs root
-Objekt verwenden, um einen Stagingbereich in der PYTHON_API_DB
-Datenbank und dem PYTHON_API_SCHEMA
-Schema zu erstellen, die Sie zuvor erstellt haben.
Um einen Stagingbereich namens
TASKS_STAGE
zu erstellen, führen Sie in der nächsten Zelle Ihres Notebooks den folgenden Code aus:stages = root.databases[database.name].schemas[schema.name].stages stages.create(Stage(name="TASKS_STAGE"))
Dieser Stagingbereich enthält die gespeicherten Prozeduren und alle Abhängigkeiten, die diese Prozeduren benötigen.
Um zwei grundlegende Funktionen in Python zu erstellen, die die Aufgaben als gespeicherte Prozeduren ausführen, führen Sie in Ihrer nächsten Zelle den folgenden Code aus:
def trunc(session: Session, from_table: str, to_table: str, count: int) -> str: ( session .table(from_table) .limit(count) .write.save_as_table(to_table) ) return "Truncated table successfully created!" def filter_by_shipmode(session: Session, mode: str) -> str: ( session .table("snowflake_sample_data.tpch_sf100.lineitem") .filter(col("L_SHIPMODE") == mode) .limit(10) .write.save_as_table("filter_table") ) return "Filter table successfully created!"
Diese Funktionen haben die folgenden Aufgaben:
trunc()
: Erzeugt eine verkürzte Version einer Eingabetabelle.filter_by_shipmode()
: Filtert dieSNOWFLAKE_SAMPLE_DATA.TPCH_SF100.LINEITEM
-Tabelle nach dem Liefermodus, schränkt die Ergebnisse auf 10 Zeilen ein und schreibt die Ergebnisse in eine neue Tabelle.Bemerkung
Diese Funktion fragt die TPC-H Beispieldaten in der SNOWFLAKE_SAMPLE_DATA-Datenbank ab. Snowflake erstellt die Beispieldatenbank standardmäßig in neuen Konten. Wenn die Datenbank nicht in Ihrem Konto erstellt wurde, finden Sie weitere Informationen unter Verwenden der Beispieldatenbank.
Die Funktionen sind absichtlich einfach gehalten und dienen zu Demonstrationszwecken.
Aufgaben erstellen und verwalten¶
Definieren, erstellen und verwalten Sie zwei Aufgaben, die Ihre zuvor erstellten Python-Funktionen als gespeicherte Prozeduren ausführen.
Um die beiden Aufgaben
task1
undtask2
in der nächsten Zelle Ihres Notebooks zu definieren, führen Sie den folgenden Code aus:tasks_stage = f"{database.name}.{schema.name}.TASKS_STAGE" task1 = Task( name="task_python_api_trunc", definition=StoredProcedureCall( func=trunc, stage_location=f"@{tasks_stage}", packages=["snowflake-snowpark-python"], ), warehouse="COMPUTE_WH", schedule=timedelta(minutes=1) ) task2 = Task( name="task_python_api_filter", definition=StoredProcedureCall( func=filter_by_shipmode, stage_location=f"@{tasks_stage}", packages=["snowflake-snowpark-python"], ), warehouse="COMPUTE_WH" )
In diesem Code geben Sie die folgenden Aufgabenparameter an:
Für jede Aufgabe exisitiert eine Definition, die durch ein StoredProcedureCall-Objekt dargestellt wird, das die folgenden Attribute enthält:
Die aufrufbare Funktion zur Ausführung
Den Stagingbereich, in den der Inhalt Ihrer Python-Funktion und ihrer Abhängigkeiten hochgeladen wird
Die Paketabhängigkeiten der gespeicherten Prozedur
Ein Warehouse zur Ausführung der gespeicherten Prozedur (erforderlich bei der Erstellung einer Aufgabe mit einem
StoredProcedureCall
-Objekt). In diesem Tutorial wird das WarehouseCOMPUTE_WH
verwendet, das für Ihr Testkonto zur Verfügung gestellt wird.Einen Zeitplan für die Stammaufgabe
task1
. Der Zeitplan gibt das Intervall an, in dem die Aufgabe periodisch ausgeführt werden soll.
Weitere Informationen zu gespeicherten Prozeduren finden Sie unter Schreiben von gespeicherten Prozeduren in Python.
Um die beiden Aufgaben zu erstellen, rufen Sie ein
TaskCollection
-Objekt (tasks
) aus Ihrem Datenbankschema ab und rufen.create()
in Ihrer Aufgabensammlung auf:# create the task in the Snowflake database tasks = schema.tasks trunc_task = tasks.create(task1, mode=CreateMode.or_replace) task2.predecessors = [trunc_task.name] filter_task = tasks.create(task2, mode=CreateMode.or_replace)
In diesem Codebeispiel verknüpfen Sie die Aufgaben auch, indem Sie
task1
als Vorgänger vontask2
einstellen, wodurch ein minimaler Task-Graph erstellt wird.Um zu bestätigen, dass die beiden Aufgaben nun existieren, führen Sie in Ihrer nächsten Zelle den folgenden Code aus:
taskiter = tasks.iter() for t in taskiter: print(t.name)
Wenn Sie Aufgaben erstellen, werden diese standardmäßig ausgesetzt.
Um eine Aufgabe zu starten, rufen Sie
.resume()
für das Aufgabenressourcenobjekt auf:trunc_task.resume()
Um zu bestätigen, dass die Aufgabe
trunc_task
gestartet wurde, führen Sie in Ihrer nächsten Zelle den folgenden Code aus:taskiter = tasks.iter() for t in taskiter: print("Name: ", t.name, "| State: ", t.state)
Die Ausgabe sollte in etwa so aussehen:
Name: TASK_PYTHON_API_FILTER | State: suspended Name: TASK_PYTHON_API_TRUNC | State: started
Sie können diesen Schritt jederzeit wiederholen, wenn Sie den Status der Aufgaben bestätigen möchten.
Um die Aufgabenressourcen zu bereinigen, setzen Sie die Aufgabe zunächst aus, bevor Sie sie löschen.
Führen Sie in Ihrer nächsten Zelle den folgenden Code aus:
trunc_task.suspend()
Um zu bestätigen, dass die Aufgabe ausgesetzt ist, wiederholen Sie Schritt 5.
Optional: Um beide Aufgaben zu löschen, führen Sie in Ihrer nächsten Zelle den folgenden Code aus:
trunc_task.drop() filter_task.drop()
Erstellen und Verwalten eines Task-Graphs¶
Wenn Sie die Ausführung einer großen Anzahl von Aufgaben koordinieren, kann die individuelle Verwaltung jeder einzelnen Aufgabe eine Herausforderung sein. Snowflake Python APIs bietet Funktionen zur Orchestrierung von Aufgaben mit einer übergeordneten Task-Graph-API.
Ein Task-Graph, auch als Directed Acyclic Graph (DAG) genannt, besteht aus einer Reihe von Aufgaben, die aus einer Stammaufgabe (Root Task) und untergeordneten Aufgaben (Child Tasks) besteht, die nach ihren Abhängigkeiten organisiert sind. Weitere Informationen dazu finden Sie unter Aufgabenabhängigkeiten mit Task-Graphen verwalten.
Um ein Task-Graphs zu erstellen und bereitzustellen, führen Sie den folgenden Code aus:
dag_name = "python_api_dag" dag = DAG(name=dag_name, schedule=timedelta(days=1)) with dag: dag_task1 = DAGTask( name="task_python_api_trunc", definition=StoredProcedureCall( func=trunc, stage_location=f"@{tasks_stage}", packages=["snowflake-snowpark-python"]), warehouse="COMPUTE_WH", ) dag_task2 = DAGTask( name="task_python_api_filter", definition=StoredProcedureCall( func=filter_by_shipmode, stage_location=f"@{tasks_stage}", packages=["snowflake-snowpark-python"]), warehouse="COMPUTE_WH", ) dag_task1 >> dag_task2 dag_op = DAGOperation(schema) dag_op.deploy(dag, mode=CreateMode.or_replace)
In diesem Code gehen Sie wie folgt vor:
Erstellen Sie ein Task-Graph-Objekt, indem Sie den
DAG
-Konstruktor aufrufen und einen Namen und einen Zeitplan angeben.Definieren Sie einen Task-Graph, d. h. spezifische Aufgaben, und verwenden dazu den
DAGTask
-Konstruktor. Beachten Sie, dass der Konstruktor dieselben Arguments akzeptiert, die Sie in einem früheren Schritt für dieStoredProcedureCall
-Klasse angegeben haben.Geben Sie
dag_task1
als Stammaufgabe und Vorgänger vondag_task2
mit einer praktischeren Syntax an.Stellen Sie den Task-Graph im
PYTHON_API_SCHEMA
-Schema derPYTHON_API_DB
-Datenbank bereit.
Um die Erstellung des Task-Graphs zu bestätigen, führen Sie in Ihrer nächsten Zelle den folgenden Code aus:
taskiter = tasks.iter() for t in taskiter: print("Name: ", t.name, "| State: ", t.state)
Sie können diesen Schritt jederzeit wiederholen, wenn Sie den Status der Aufgaben bestätigen möchten.
Um den Task-Graph durch den Start der Stammaufgabe zu starten, führen Sie in der nächsten Zelle den folgenden Code aus:
dag_op.run(dag)
Um zu bestätigen, dass die Aufgabe
PYTHON_API_DAG$TASK_PYTHON_API_TRUNC
gestartet wurde, wiederholen Sie Schritt 2.Bemerkung
Der Funktionsaufruf, der durch den Task-Graph erfolgt, wird nicht erfolgreich sein, wenn Sie ihn nicht mit den erforderlichen Argumente aufrufen. Dieser Schritt dient nur dazu, Ihnen zu zeigen, wie Sie den Task-Graph programmgesteuert starten können.
Um das Task-Graph zu löschen, führen Sie in der nächsten Zelle den folgenden Code aus:
dag_op.drop(dag)
Bereinigen Sie das Datenbankobjekt, das Sie in diesen Tutorials erstellt haben:
database.drop()
Nächste Schritte¶
Herzlichen Glückwunsch! In diesem Tutorial haben Sie gelernt, wie Sie Aufgaben und Task-Graphs mit Snowflake Python APIs erstellen und verwalten.
Zusammenfassung¶
Dabei haben Sie die folgenden Schritte durchgeführt:
Erstellen Sie einen Stagingbereich, der gespeicherte Prozeduren und ihre Abhängigkeiten aufnehmen kann.
Aufgaben erstellen und verwalten.
Erstellen und verwalten Sie einen Task-Graph.
Bereinigen Sie Ihre Snowflake-Ressourcenobjekte, indem Sie sie löschen.
Nächstes Tutorial¶
Sie können nun mit Tutorial 3: Snowpark Container Services erstellen und verwalten fortfahren, wo Sie erfahren, wie Sie Komponenten in Snowpark Container Services erstellen und verwalten können.
Zusätzliche Ressourcen¶
Weitere Beispiele für die Verwendung von API zur Verwaltung anderer Arten von Objekten in Snowflake finden Sie in den folgenden Entwicklerhandbüchern:
Benutzerhandbuch |
Beschreibung |
---|---|
Verwalten von Snowflake-Datenbanken, Schemas, Tabellen und Ansichten mit Python |
Verwenden der API, um Datenbanken, Schemas und Tabellen zu erstellen und zu verwalten. |
Verwalten von Snowflake-Benutzern, Rollen und Berechtigungen mit Python |
Verwenden der API, um Benutzer, Rollen und Berechtigungen zu erstellen und zu verwalten. |
Verwalten des Ladens von Daten und des Entladens von Ressourcen mit Python |
Verwenden der API, um Ressourcen zum Laden und Entladen von Daten zu erstellen und zu verwalten, einschließlich externer Volumes, Pipes und Stagingbereiche. |
Verwalten von Snowpark Container Services (einschließlich Dienstfunktionen) mit Python |
Verwenden der API, um die Komponenten der Snowpark Container Services zu verwalten, einschließlich Computepools, Image-Repositorys, Diensten und Dienstfunktionen. |