Python-Handler-Beispiele für gespeicherte Prozeduren

Ausführen von gleichzeitigen Aufgaben mit Worker-Prozessen

Sie können gleichzeitige Aufgaben mithilfe von Python-Worker-Prozessen ausführen. Dies kann nützlich sein, wenn Sie parallele Aufgaben ausführen müssen, die die Vorteile mehrerer CPU-Kerne auf den Warehouse-Knoten ausnutzen.

Bemerkung

Snowflake empfiehlt, das integrierte Python-Multiprocessing-Modul nicht zu verwenden.

Um Fälle zu umgehen, in denen die Python Global Interpreter Lock verhindert, dass ein Multitasking-Ansatz über alle CPU-Kerne skaliert, können Sie gleichzeitige Aufgaben über separate Worker-Prozesse statt über Threads ausführen.

Sie können dies bei Snowflake-Warehouses tun, indem Sie die Klasse Parallel der Bibliothek joblib verwenden, wie in dem folgenden Beispiel.

CREATE OR REPLACE PROCEDURE joblib_multiprocessing_proc(i INT)
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.12
  HANDLER = 'joblib_multiprocessing'
  PACKAGES = ('snowflake-snowpark-python', 'joblib')
AS $$
import joblib
from math import sqrt

def joblib_multiprocessing(session, i):
  result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
  return str(result)
$$;

Bemerkung

Das für joblib.Parallel verwendete Standard-Backend unterscheidet zwischen Standard- und Snowpark-optimierten Snowflake-Warehouses.

  • Standardeinstellung für Standard-Warehouses: threading

  • Standardeinstellung für Snowpark-optimierte Warehouses: loky (Multiprocessing)

Sie können die Standardeinstellung des Backends außer Kraft setzen, indem Sie die Funktion joblib.parallel_backend aufrufen, wie im folgenden Beispiel.

import joblib
joblib.parallel_backend('loky')

Verwenden von Snowpark-APIs zur asynchronen Verarbeitung

Die folgenden Beispiele zeigen, wie Sie Snowpark-APIs verwenden können, um asynchrone untergeordnete Jobs zu starten, und wie sich diese Jobs unter verschiedenen Bedingungen verhalten.

Prüfen des Status eines asynchronen untergeordneten Jobs

Im folgenden Beispiel führt die Prozedur checkStatus einen asynchronen untergeordneten Job aus, der 60 Sekunden wartet. Die Prozedur prüft dann den Status des Jobs, bevor dieser beendet sein kann, sodass die Prüfung False zurückgibt.

CREATE OR REPLACE PROCEDURE checkStatus()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.12
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(60)").collect_nowait()
    return async_job.is_done()
$$;

Der folgende Code ruft die Prozedur auf.

CALL checkStatus();
+-------------+
| checkStatus |
|-------------|
| False       |
+-------------+

Abbruch eines asynchronen untergeordneten Jobs

Im folgenden Beispiel verwendet die Prozedur cancelJob SQL, um Daten mit einem asynchronen untergeordneten Job in die Tabelle test_tb einzufügen, der 10 Sekunden bis zur Fertigstellung benötigen würde. Der untergeordnete Job wird dann abgebrochen, bevor er beendet ist und die Daten eingefügt wurden.

CREATE OR REPLACE TABLE test_tb(c1 STRING);
CREATE OR REPLACE PROCEDURE cancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.12
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
    async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
    return async_job.cancel()
$$;

CALL cancelJob();

Der folgende Code fragt die Tabelle test_tb ab, liefert aber keine Ergebnisse, da keine Daten eingefügt wurden.

SELECT * FROM test_tb;
+----+
| C1 |
|----|
+----+

Warten und Blockieren, während ein asynchroner untergeordneter Job ausgeführt wird

Im folgenden Beispiel führt die Prozedur blockUntilDone einen asynchronen untergeordneten Job aus, der 5 Sekunden bis zur Fertigstellung benötigt. Mit der Methode snowflake.snowpark.AsyncJob.result wartet die Prozedur und kehrt zurück, wenn der Job beendet ist.

CREATE OR REPLACE PROCEDURE blockUntilDone()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.12
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(5)").collect_nowait()
    return async_job.result()
$$;

Der folgende Code ruft die Prozedur blockUntilDone auf, die nach 5 Sekunden Wartezeit zurückkehrt.

CALL blockUntilDone();
+------------------------------------------+
| blockUntilDone                               |
|------------------------------------------|
| [Row(SYSTEM$WAIT(5)='waited 5 seconds')] |
+------------------------------------------+

Zurückgeben eines Fehlers nach Abfrage von Ergebnissen aus einem nicht beendeten asynchronen untergeordneten Job

Im folgenden Beispiel führt die Prozedur earlyReturn einen asynchronen untergeordneten Job aus, der 60 Sekunden bis zur Fertigstellung benötigt. Die Prozedur versucht dann, einen DataFrame aus dem Ergebnis des Jobs zurückzugeben, bevor dieser beendet sein kann. Das Ergebnis ist ein Fehler.

CREATE OR REPLACE PROCEDURE earlyReturn()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.12
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(60)").collect_nowait()
    df = async_job.to_df()
    try:
        df.collect()
    except Exception as ex:
        return 'Error: (02000): Result for query <UUID> has expired'
$$;

Der folgende Code ruft die Prozedur earlyReturn auf und gibt den Fehler zurück.

CALL earlyReturn();
+------------------------------------------------------------+
| earlyReturn                                                 |
|------------------------------------------------------------|
| Error: (02000): Result for query <UUID> has expired        |
+------------------------------------------------------------+

Beenden eines übergeordneten Jobs vor Beendigung eines untergeordneten Jobs, Abbruch des untergeordneten Jobs

Im folgenden Beispiel führt die Prozedur earlyCancelJob einen asynchronen untergeordneten Job zum Einfügen von Daten in eine Tabelle aus und benötigt 10 Sekunden bis zur Fertigstellung. Der übergeordnete Job async_handler kehrt jedoch zurück, bevor der untergeordnete Job beendet ist, wodurch der untergeordnete Job abgebrochen wird.

CREATE OR REPLACE PROCEDURE earlyCancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.12
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
    async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
$$;

Der folgende Code ruft die Prozedur earlyCancelJob auf. Er fragt dann die Tabelle test_tb ab, die kein Ergebnis liefert, da der abgebrochene untergeordnete Job keine Daten eingefügt hat.

CALL earlyCancelJob();
SELECT * FROM test_tb;
+----+
| C1 |
|----|
+----+

Lesen von Dateien und Assets

Lesen einer statisch spezifizierten Datei mithilfe von IMPORTS

Sie können eine Datei lesen, indem Sie den Dateinamen und den Namen des Stagingbereichs in der IMPORTS-Klausel des Befehls CREATE PROCEDURE angeben.

Wenn in der IMPORTS-Klausel eine Datei angegeben wird, kopiert Snowflake diese Datei vom Stagingbereich in das Basisverzeichnis der gespeicherten Prozedur (auch Importverzeichnis genannt), welches das Verzeichnis ist, aus dem die gespeicherte Prozedur die Datei ausliest.

Snowflake kopiert importierte Dateien in ein einziges Verzeichnis. Alle Dateien in diesem Verzeichnis müssen eindeutige Namen haben, d. h. jede Datei in Ihrer IMPORTS-Klausel muss einen eigenen Namen haben. Dies gilt auch dann, wenn sich die Dateien in verschiedenen Stagingbereichen oder in verschiedenen Unterverzeichnissen innerhalb eines Stagingbereichs befinden.

Das folgende Beispiel verwendet einen Inline-Python-Handler, der eine Datei namens file.txt aus einem Stagingbereich namens my_stage liest. Der Handler ermittelt den Speicherort des Basisverzeichnisses der gespeicherten Prozedur mithilfe der Python-Methode sys._xoptions und der Systemoption snowflake_import_directory.

Snowflake liest die Datei nur einmal beim Erstellen der gespeicherten Prozedur und wird die Datei bei Ausführung der gespeicherten Prozedur nicht noch einmal lesen, wenn das Lesen der Datei außerhalb des Ziel-Handlers erfolgt.

Erstellen Sie die gespeicherte Prozedur mit einem Inline-Handler:

CREATE OR REPLACE PROCEDURE test_file_import_sp()
RETURNS STRING
LANGUAGE PYTHON
PACKAGES = ('snowflake-snowpark-python')
IMPORTS = ('@my_stage/dir/file.txt')
HANDLER = 'run'
RUNTIME_VERSION = 3.12
EXECUTE AS CALLER
AS $$
import os
import sys

def run(session):
  with open(os.path.join(sys._xoptions["snowflake_import_directory"], 'file.txt'), "r") as f:
    return f.read()
$$;
CALL test_file_import_sp();
// return file content

Importieren eines Verzeichnisses mit IMPORTS

Sie können ein Verzeichnis mit der IMPORTS-Klausel des CREATE PROCEDURE-Befehls importieren.

Bemerkung

  • Der Importpfad für ein Verzeichnis muss mit einem abschließenden Schrägstrich (/) enden. Beispiel: IMPORTS = ('@my_stage/my_dir/').

  • Um ein Verzeichnis beim Import umzubenennen, hängen Sie /=custom_name/ an den Stagingbereichspfad an. Der kundenspezifische Name muss ein einzelner Verzeichnisname sein, kein Pfad. Beispiel: IMPORTS = ('@my_stage/my_dir/=custom_name/').

  • Verzeichnis-Importe werden in Native Apps nicht unterstützt.

Im folgenden Beispiel wird ein Verzeichnis namens my_dir von einem Stagingbereich namens my_stage importiert und die darin enthaltenen Dateien aufgelistet.

CREATE OR REPLACE PROCEDURE my_directory_import_list_sp()
RETURNS STRING
LANGUAGE PYTHON
PACKAGES = ('snowflake-snowpark-python')
IMPORTS = ('@my_stage/my_dir/')
HANDLER = 'run'
RUNTIME_VERSION = 3.12
EXECUTE AS CALLER
AS $$
import os
import sys
def list_files(directory):
  files = []
  # Walk through the directory and its subdirectories
  for dirpath, _, filenames in os.walk(directory):
    for filename in filenames:
      # Append the relative path to each file to the list
      full_path = os.path.join(dirpath, filename)
      files.append(os.path.relpath(full_path, directory))
  return files
def run(session):
  directory_path = sys._xoptions["snowflake_import_directory"]
  file_list = list_files(directory_path)
  file_list_str = ' '.join(file_list)
  return file_list_str
$$;
CALL my_directory_import_list_sp();