Aufrufen von Funktionen und gespeicherten Prozeduren in Snowpark Scala

Zur Verarbeitung von Daten in einem DataFrame können Sie systemdefinierte SQL-Funktionen, benutzerdefinierte Funktionen und gespeicherte Prozeduren aufrufen. Unter diesem Thema wird erklärt, wie Sie diese Funktionen in Snowpark aufrufen können.

Unter diesem Thema:

Aufrufen von systemdefinierten Funktionen

Wenn Sie systemdefinierte SQL-Funktionen aufrufen müssen, verwenden Sie die entsprechenden Funktionen im com.snowflake.snowpark.functions object.

Im folgenden Beispiel wird die Funktion upper im Objekt functions (das Äquivalent zur systemdefinierten Funktion UPPER) aufgerufen, um die Werte in der Namensspalte mit Großbuchstaben zurückzugeben:

// Import the upper function from the functions object.
import com.snowflake.snowpark.functions._
...
session.table("products").select(upper(col("name"))).show()
Copy

Wenn im Funktionsobjekt keine systemdefinierte SQL-Funktion verfügbar ist, können Sie einen der folgenden Ansätze verwenden:

  • Verwenden Sie die Funktion callBuiltin, um die systemdefinierte Funktion aufzurufen.

  • Verwenden Sie die Funktion builtin, um ein Funktionsobjekt zu erstellen, das Sie zum Aufrufen der systemdefinierten Funktion verwenden können.

callBuiltin und builtin sind im Objekt com.snowflake.snowpark.functions definiert.

Für callBuiltin übergeben Sie den Namen der systemdefinierten Funktion als erstes Argument. Wenn Sie die Werte von Spalten an die systemdefinierte Funktion übergeben müssen, definieren Sie Spalten-Objekte und übergeben diese als zusätzliche Argumente an die callBuiltin-Funktion.

Im folgenden Beispiel wird die systemdefinierte Funktion RADIANS aufgerufen, die den Wert aus der Spalte col1 übergibt:

// Import the callBuiltin function from the functions object.
import com.snowflake.snowpark.functions._
...
// Call the system-defined function RADIANS() on col1.
val result = df.select(callBuiltin("radians", col("col1"))).collect()
Copy

Die Funktion callBuiltin gibt einen Column-Wert zurück, den Sie an die DataFrame-Transformationsmethoden (z. B. an Filtern oder Auswählen) übergeben können.

Für builtin übergeben Sie den Namen der systemdefinierten Funktion, und Sie verwenden das zurückgegebene Funktionsobjekt, um die systemdefinierte Funktion aufzurufen. Beispiel:

// Import the callBuiltin function from the functions object.
import com.snowflake.snowpark.functions._
...
// Create a function object for the system-defined function RADIANS().
val radians = builtin("radians")
// Call the system-defined function RADIANS() on col1.
val result = df.select(radians(col("col1"))).collect()
Copy

Aufrufen von skalaren benutzerdefinierten Funktionen (UDFs)

Die Methode zum Aufrufen einer UDF hängt davon ab, wie die UDF erstellt wurde:

  • Um eine anonyme UDF aufzurufen, rufen Sie die apply-Methode des UserDefinedFunction-Objekts auf, das beim Erstellen der UDF zurückgegeben wurde.

    Die Argumente, die Sie an eine UDF übergeben, müssen Column-Objekte sein. Wenn Sie ein Literal übergeben müssen, verwenden Sie lit(), wie unter Verwenden von Literalen als Spaltenobjekte erklärt.

  • Zum Aufrufen von UDFs, die Sie mit Namen registriert haben, und von UDFs, die Sie durch Ausführen von CREATE FUNCTION erstellt haben, verwenden Sie die Funktion callUDF im Objekt com.snowflake.snowpark.functions.

    Übergeben Sie den Namen der UDF als erstes Argument und alle UDF-Parameter als weitere Argumente.

Der Aufruf einer UDF gibt ein Column-Objekt zurück, das den Rückgabewert der UDF enthält.

Im folgenden Beispiel wird die UDF-Funktion myFunction aufgerufen, wobei die Werte aus den Spalten col1 und col2 übergeben werden. Im Beispiel wird der Rückgabewert von myFunction an die Methode select des DataFrame übergeben.

// Import the callUDF function from the functions object.
import com.snowflake.snowpark.functions._
...
// Runs the scalar function 'myFunction' on col1 and col2 of df.
val result =
    df.select(
        callUDF("myDB.schema.myFunction", col("col1"), col("col2"))
    ).collect()
Copy

Aufrufen von Tabellenfunktionen (Systemfunktionen und UDTFs)

So rufen Sie eine Tabellenfunktion oder eine benutzerdefinierte Tabellenfunktion (UDTF) auf:

  1. Erstellen Sie ein TableFunction-Objekt, und übergeben Sie den Namen der Tabellenfunktion.

    Wenn Sie eine UDTF in Snowpark erstellen, können Sie einfach das TableFunction-Objekt verwenden, das von der UDTFRegistration.registerTemporary- oder UDTFRegistration.registerPermanent-Methode zurückgegeben wird. Siehe Erstellen von benutzerdefinierten Tabellenfunktionen (UDTFs).

  2. Rufen Sie session.tableFunction auf, und übergeben Sie dabei das TableFunction-Objekt und eine Map der Namen und Werte von Eingabeargumenten.

table?Function gibt einen DataFrame zurück, der die Ausgabe der Tabellenfunktion enthält.

Angenommen, Sie haben den folgenden Befehl ausgeführt, um einen SQL-UDTF zu erstellen:

CREATE OR REPLACE FUNCTION product_by_category_id(cat_id INT)
  RETURNS TABLE(id INT, name VARCHAR)
  AS
  $$
    SELECT id, name
      FROM sample_product_data
      WHERE category_id = cat_id
  $$
  ;
Copy

Der folgende Code ruft diese UDTF auf und erstellt einen DataFrame für die Ausgabe der UDTF. Im Beispiel werden die ersten 10 Zeilen der Ausgabe auf der Konsole ausgegeben.

val dfTableFunctionOutput = session.tableFunction(TableFunction("product_by_category_id"), Map("cat_id" -> lit(10)))
dfTableFunctionOutput.show()
Copy

Wenn Sie die Ausgabe einer Tabellenfunktion mit einem DataFrame verbinden müssen, rufen Sie die DataFrame.join-Methode auf, die ein TableFunction-Objekt übergibt.

Aufrufen von gespeicherten Prozeduren

Sie können eine Prozedur entweder auf der Serverseite (in der Snowflake-Umgebung) oder lokal ausführen. Denken Sie daran, dass die beiden Umgebungen unterschiedlich sind und die Bedingungen und Ergebnisse der Ausführung von Prozeduren voneinander abweichen können.

Sie können eine Prozedur mit der Snowpark-API auf eine der folgenden Arten aufrufen:

  • Führen Sie eine Funktion zum Testen und Debuggen lokal aus, indem Sie die Methode SProcRegistration.runLocally verwenden.

  • Führen Sie eine Prozedur in der serverseitigen Snowflake-Umgebung mit der Methode Session.storedProcedure aus. Dazu gehört eine Prozedur, die auf die aktuelle Sitzung beschränkt ist, oder eine permanente gespeicherte Prozedur, die auf Snowflake gespeichert ist.

Sie können eine permanente gespeicherte Prozedur, die Sie mit der Snowpark-API erstellt haben, auch aus einem Snowflake-Arbeitsblatt aufrufen. Weitere Informationen dazu finden Sie unter Aufrufen einer gespeicherten Prozedur.

Weitere Informationen zum Erstellen von Prozeduren mit der Snowpark-API finden Sie unter Erstellen von gespeicherten Prozeduren für DataFrames in Scala.

Lokales Ausführen der Prozedurlogik

Sie können die Lambda-Funktion für Ihre Prozedur in Ihrer lokalen Umgebung mit der Methode SProcRegistration.runLocally ausführen. Die Methode führt die Funktion aus und gibt das Ergebnis mit dem von der Funktion zurückgegebenen Typ zurück.

So können Sie beispielsweise eine Lambda-Funktion, die Sie in einer Prozedur verwenden möchten, lokal (auf der Client-Seite) aufrufen, bevor Sie eine Prozedur daraus in Snowflake registrieren. Sie beginnen damit, dass Sie den Lambda-Code als Wert einer Variablen zuweisen. Sie übergeben diese Variable an die Methode SProcRegistration.runLocally, um sie auf der Client-Seite auszuführen. Sie können die Variable auch verwenden, um die Funktion zu repräsentieren, wenn Sie die Prozedur registrieren.

Der Code im folgenden Beispiel weist die Funktion der Variablen func zu. Anschließend wird die Funktion lokal getestet, indem die Variable mit dem Argumentwert der Funktion an die Methode SProcRegistration.runLocally übergeben wird. Die Variable wird auch verwendet, um die Prozedur zu registrieren.

val session = Session.builder.configFile("my_config.properties").create

// Assign the lambda function.
val func = (session: Session, num: Int) => num + 1

// Execute the function locally.
val result = session.sproc.runLocally(func, 1)
print("\nResult: " + result)
Copy

Ausführen einer Prozedur auf dem Server

Um eine Prozedur in der Snowflake-Umgebung auf dem Server auszuführen, verwenden Sie die Methode Session.storedProcedure. Diese Methode gibt ein DataFrame-Objekt zurück.

Sie können beispielsweise Folgendes ausführen:

Der Code im folgenden Beispiel erstellt eine temporäre Prozedur, die auf dem Server ausgeführt wird, aber nur so lange verfügbar ist wie die aktuelle Snowpark-Sitzung. Anschließend wird die Prozedur ausgeführt, wobei sowohl der Name der Prozedur als auch die StoredProcedure-Variable verwendet werden, die die Prozedur repräsentiert.

val session = Session.builder.configFile("my_config.properties").create

val name: String = "add_two"

val tempSP: StoredProcedure =
  session.sproc.registerTemporary(
    name,
    (session: Session, num: Int) => num + 2
  )

session.storedProcedure(name, 1).show()

// Execute the procedure on the server by passing the procedure's name.
session.storedProcedure(incrementProc, 1).show();

// Execute the procedure on the server by passing a variable
// representing the procedure.
session.storedProcedure(tempSP, 1).show();
Copy