Python-Handler-Beispiele für gespeicherte Prozeduren¶
Ausführen von gleichzeitigen Aufgaben mit Worker-Prozessen¶
Sie können gleichzeitige Aufgaben mithilfe von Python-Worker-Prozessen ausführen. Dies kann nützlich sein, wenn Sie parallele Aufgaben ausführen müssen, die die Vorteile mehrerer CPU-Kerne auf den Warehouse-Knoten ausnutzen.
Bemerkung
Snowflake empfiehlt, das integrierte Python-Multiprocessing-Modul nicht zu verwenden.
Um Fälle zu umgehen, in denen die Python Global Interpreter Lock verhindert, dass ein Multitasking-Ansatz über alle CPU-Kerne skaliert, können Sie gleichzeitige Aufgaben über separate Worker-Prozesse statt über Threads ausführen.
Sie können dies bei Snowflake-Warehouses tun, indem Sie die Klasse Parallel der Bibliothek joblib verwenden, wie in dem folgenden Beispiel.
Bemerkung
Das für joblib.Parallel verwendete Standard-Backend unterscheidet zwischen Standard- und Snowpark-optimierten Snowflake-Warehouses.
Standardeinstellung für Standard-Warehouses:
threadingStandardeinstellung für Snowpark-optimierte Warehouses:
loky(Multiprocessing)
Sie können die Standardeinstellung des Backends außer Kraft setzen, indem Sie die Funktion joblib.parallel_backend aufrufen, wie im folgenden Beispiel.
Verwenden von Snowpark-APIs zur asynchronen Verarbeitung¶
Die folgenden Beispiele zeigen, wie Sie Snowpark-APIs verwenden können, um asynchrone untergeordnete Jobs zu starten, und wie sich diese Jobs unter verschiedenen Bedingungen verhalten.
Prüfen des Status eines asynchronen untergeordneten Jobs¶
Im folgenden Beispiel führt die Prozedur checkStatus einen asynchronen untergeordneten Job aus, der 60 Sekunden wartet. Die Prozedur prüft dann den Status des Jobs, bevor dieser beendet sein kann, sodass die Prüfung False zurückgibt.
Der folgende Code ruft die Prozedur auf.
Abbruch eines asynchronen untergeordneten Jobs¶
Im folgenden Beispiel verwendet die Prozedur cancelJob SQL, um Daten mit einem asynchronen untergeordneten Job in die Tabelle test_tb einzufügen, der 10 Sekunden bis zur Fertigstellung benötigen würde. Der untergeordnete Job wird dann abgebrochen, bevor er beendet ist und die Daten eingefügt wurden.
Der folgende Code fragt die Tabelle test_tb ab, liefert aber keine Ergebnisse, da keine Daten eingefügt wurden.
Warten und Blockieren, während ein asynchroner untergeordneter Job ausgeführt wird¶
Im folgenden Beispiel führt die Prozedur blockUntilDone einen asynchronen untergeordneten Job aus, der 5 Sekunden bis zur Fertigstellung benötigt. Mit der Methode snowflake.snowpark.AsyncJob.result wartet die Prozedur und kehrt zurück, wenn der Job beendet ist.
Der folgende Code ruft die Prozedur blockUntilDone auf, die nach 5 Sekunden Wartezeit zurückkehrt.
Zurückgeben eines Fehlers nach Abfrage von Ergebnissen aus einem nicht beendeten asynchronen untergeordneten Job¶
Im folgenden Beispiel führt die Prozedur earlyReturn einen asynchronen untergeordneten Job aus, der 60 Sekunden bis zur Fertigstellung benötigt. Die Prozedur versucht dann, einen DataFrame aus dem Ergebnis des Jobs zurückzugeben, bevor dieser beendet sein kann. Das Ergebnis ist ein Fehler.
Der folgende Code ruft die Prozedur earlyReturn auf und gibt den Fehler zurück.
Beenden eines übergeordneten Jobs vor Beendigung eines untergeordneten Jobs, Abbruch des untergeordneten Jobs¶
Im folgenden Beispiel führt die Prozedur earlyCancelJob einen asynchronen untergeordneten Job zum Einfügen von Daten in eine Tabelle aus und benötigt 10 Sekunden bis zur Fertigstellung. Der übergeordnete Job async_handler kehrt jedoch zurück, bevor der untergeordnete Job beendet ist, wodurch der untergeordnete Job abgebrochen wird.
Der folgende Code ruft die Prozedur earlyCancelJob auf. Er fragt dann die Tabelle test_tb ab, die kein Ergebnis liefert, da der abgebrochene untergeordnete Job keine Daten eingefügt hat.
Lesen von Dateien und Assets¶
Lesen einer statisch spezifizierten Datei mithilfe von IMPORTS¶
Sie können eine Datei lesen, indem Sie den Dateinamen und den Namen des Stagingbereichs in der IMPORTS-Klausel des Befehls CREATE PROCEDURE angeben.
Wenn in der IMPORTS-Klausel eine Datei angegeben wird, kopiert Snowflake diese Datei vom Stagingbereich in das Basisverzeichnis der gespeicherten Prozedur (auch Importverzeichnis genannt), welches das Verzeichnis ist, aus dem die gespeicherte Prozedur die Datei ausliest.
Snowflake kopiert importierte Dateien in ein einziges Verzeichnis. Alle Dateien in diesem Verzeichnis müssen eindeutige Namen haben, d. h. jede Datei in Ihrer IMPORTS-Klausel muss einen eigenen Namen haben. Dies gilt auch dann, wenn sich die Dateien in verschiedenen Stagingbereichen oder in verschiedenen Unterverzeichnissen innerhalb eines Stagingbereichs befinden.
Das folgende Beispiel verwendet einen Inline-Python-Handler, der eine Datei namens file.txt aus einem Stagingbereich namens my_stage liest. Der Handler ermittelt den Speicherort des Basisverzeichnisses der gespeicherten Prozedur mithilfe der Python-Methode sys._xoptions und der Systemoption snowflake_import_directory.
Snowflake liest die Datei nur einmal beim Erstellen der gespeicherten Prozedur und wird die Datei bei Ausführung der gespeicherten Prozedur nicht noch einmal lesen, wenn das Lesen der Datei außerhalb des Ziel-Handlers erfolgt.
Erstellen Sie die gespeicherte Prozedur mit einem Inline-Handler:
Importieren eines Verzeichnisses mit IMPORTS¶
Sie können ein Verzeichnis mit der IMPORTS-Klausel des CREATE PROCEDURE-Befehls importieren.
Bemerkung
Der Importpfad für ein Verzeichnis muss mit einem abschließenden Schrägstrich (
/) enden. Beispiel:IMPORTS = ('@my_stage/my_dir/').Um ein Verzeichnis beim Import umzubenennen, hängen Sie
/=custom_name/an den Stagingbereichspfad an. Der kundenspezifische Name muss ein einzelner Verzeichnisname sein, kein Pfad. Beispiel:IMPORTS = ('@my_stage/my_dir/=custom_name/').Verzeichnis-Importe werden in Native Apps nicht unterstützt.
Im folgenden Beispiel wird ein Verzeichnis namens my_dir von einem Stagingbereich namens my_stage importiert und die darin enthaltenen Dateien aufgelistet.
