Aufrufen von Funktionen und gespeicherten Prozeduren in Snowpark Scala¶
Zur Verarbeitung von Daten in einem DataFrame können Sie systemdefinierte SQL-Funktionen, benutzerdefinierte Funktionen und gespeicherte Prozeduren aufrufen. Unter diesem Thema wird erklärt, wie Sie diese Funktionen in Snowpark aufrufen können.
Unter diesem Thema:
Aufrufen von systemdefinierten Funktionen¶
Wenn Sie systemdefinierte SQL-Funktionen aufrufen müssen, verwenden Sie die entsprechenden Funktionen im com.snowflake.snowpark.functions object.
Im folgenden Beispiel wird die Funktion upper
im Objekt functions
(das Äquivalent zur systemdefinierten Funktion UPPER) aufgerufen, um die Werte in der Namensspalte mit Großbuchstaben zurückzugeben:
// Import the upper function from the functions object.
import com.snowflake.snowpark.functions._
...
session.table("products").select(upper(col("name"))).show()
Wenn im Funktionsobjekt keine systemdefinierte SQL-Funktion verfügbar ist, können Sie einen der folgenden Ansätze verwenden:
Verwenden Sie die Funktion
callBuiltin
, um die systemdefinierte Funktion aufzurufen.Verwenden Sie die Funktion
builtin
, um ein Funktionsobjekt zu erstellen, das Sie zum Aufrufen der systemdefinierten Funktion verwenden können.
callBuiltin
und builtin
sind im Objekt com.snowflake.snowpark.functions
definiert.
Für callBuiltin
übergeben Sie den Namen der systemdefinierten Funktion als erstes Argument. Wenn Sie die Werte von Spalten an die systemdefinierte Funktion übergeben müssen, definieren Sie Spalten-Objekte und übergeben diese als zusätzliche Argumente an die callBuiltin
-Funktion.
Im folgenden Beispiel wird die systemdefinierte Funktion RADIANS aufgerufen, die den Wert aus der Spalte col1
übergibt:
// Import the callBuiltin function from the functions object.
import com.snowflake.snowpark.functions._
...
// Call the system-defined function RADIANS() on col1.
val result = df.select(callBuiltin("radians", col("col1"))).collect()
Die Funktion callBuiltin
gibt einen Column
-Wert zurück, den Sie an die DataFrame-Transformationsmethoden (z. B. an Filtern oder Auswählen) übergeben können.
Für builtin
übergeben Sie den Namen der systemdefinierten Funktion, und Sie verwenden das zurückgegebene Funktionsobjekt, um die systemdefinierte Funktion aufzurufen. Beispiel:
// Import the callBuiltin function from the functions object.
import com.snowflake.snowpark.functions._
...
// Create a function object for the system-defined function RADIANS().
val radians = builtin("radians")
// Call the system-defined function RADIANS() on col1.
val result = df.select(radians(col("col1"))).collect()
Aufrufen von skalaren benutzerdefinierten Funktionen (UDFs)¶
Die Methode zum Aufrufen einer UDF hängt davon ab, wie die UDF erstellt wurde:
Um eine anonyme UDF aufzurufen, rufen Sie die
apply
-Methode des UserDefinedFunction-Objekts auf, das beim Erstellen der UDF zurückgegeben wurde.Die Argumente, die Sie an eine UDF übergeben, müssen Column-Objekte sein. Wenn Sie ein Literal übergeben müssen, verwenden Sie
lit()
, wie unter Verwenden von Literalen als Spaltenobjekte erklärt.Zum Aufrufen von UDFs, die Sie mit Namen registriert haben, und von UDFs, die Sie durch Ausführen von CREATE FUNCTION erstellt haben, verwenden Sie die Funktion
callUDF
im Objektcom.snowflake.snowpark.functions
.Übergeben Sie den Namen der UDF als erstes Argument und alle UDF-Parameter als weitere Argumente.
Der Aufruf einer UDF gibt ein Column
-Objekt zurück, das den Rückgabewert der UDF enthält.
Im folgenden Beispiel wird die UDF-Funktion myFunction
aufgerufen, wobei die Werte aus den Spalten col1
und col2
übergeben werden. Im Beispiel wird der Rückgabewert von myFunction
an die Methode select
des DataFrame übergeben.
// Import the callUDF function from the functions object.
import com.snowflake.snowpark.functions._
...
// Runs the scalar function 'myFunction' on col1 and col2 of df.
val result =
df.select(
callUDF("myDB.schema.myFunction", col("col1"), col("col2"))
).collect()
Aufrufen von Tabellenfunktionen (Systemfunktionen und UDTFs)¶
So rufen Sie eine Tabellenfunktion oder eine benutzerdefinierte Tabellenfunktion (UDTF) auf:
Erstellen Sie ein TableFunction-Objekt, und übergeben Sie den Namen der Tabellenfunktion.
Wenn Sie eine UDTF in Snowpark erstellen, können Sie einfach das
TableFunction
-Objekt verwenden, das von derUDTFRegistration.registerTemporary
- oderUDTFRegistration.registerPermanent
-Methode zurückgegeben wird. Siehe Erstellen von benutzerdefinierten Tabellenfunktionen (UDTFs).Rufen Sie session.tableFunction auf, und übergeben Sie dabei das
TableFunction
-Objekt und eineMap
der Namen und Werte von Eingabeargumenten.
table?Function
gibt einen DataFrame zurück, der die Ausgabe der Tabellenfunktion enthält.
Angenommen, Sie haben den folgenden Befehl ausgeführt, um einen SQL-UDTF zu erstellen:
CREATE OR REPLACE FUNCTION product_by_category_id(cat_id INT)
RETURNS TABLE(id INT, name VARCHAR)
AS
$$
SELECT id, name
FROM sample_product_data
WHERE category_id = cat_id
$$
;
Der folgende Code ruft diese UDTF auf und erstellt einen DataFrame für die Ausgabe der UDTF. Im Beispiel werden die ersten 10 Zeilen der Ausgabe auf der Konsole ausgegeben.
val dfTableFunctionOutput = session.tableFunction(TableFunction("product_by_category_id"), Map("cat_id" -> lit(10)))
dfTableFunctionOutput.show()
Wenn Sie die Ausgabe einer Tabellenfunktion mit einem DataFrame verbinden müssen, rufen Sie die DataFrame.join-Methode auf, die ein TableFunction-Objekt übergibt.
Aufrufen von gespeicherten Prozeduren¶
Sie können eine Prozedur entweder auf der Serverseite (in der Snowflake-Umgebung) oder lokal ausführen. Denken Sie daran, dass die beiden Umgebungen unterschiedlich sind und die Bedingungen und Ergebnisse der Ausführung von Prozeduren voneinander abweichen können.
Sie können eine Prozedur mit der Snowpark-API auf eine der folgenden Arten aufrufen:
Führen Sie eine Funktion zum Testen und Debuggen lokal aus, indem Sie die Methode
SProcRegistration.runLocally
verwenden.Führen Sie eine Prozedur in der serverseitigen Snowflake-Umgebung mit der Methode
Session.storedProcedure
aus. Dazu gehört eine Prozedur, die auf die aktuelle Sitzung beschränkt ist, oder eine permanente gespeicherte Prozedur, die auf Snowflake gespeichert ist.
Sie können eine permanente gespeicherte Prozedur, die Sie mit der Snowpark-API erstellt haben, auch aus einem Snowflake-Arbeitsblatt aufrufen. Weitere Informationen dazu finden Sie unter Aufrufen einer gespeicherten Prozedur.
Weitere Informationen zum Erstellen von Prozeduren mit der Snowpark-API finden Sie unter Erstellen von gespeicherten Prozeduren für DataFrames in Scala.
Lokales Ausführen der Prozedurlogik¶
Sie können die Lambda-Funktion für Ihre Prozedur in Ihrer lokalen Umgebung mit der Methode SProcRegistration.runLocally
ausführen. Die Methode führt die Funktion aus und gibt das Ergebnis mit dem von der Funktion zurückgegebenen Typ zurück.
So können Sie beispielsweise eine Lambda-Funktion, die Sie in einer Prozedur verwenden möchten, lokal (auf der Client-Seite) aufrufen, bevor Sie eine Prozedur daraus in Snowflake registrieren. Sie beginnen damit, dass Sie den Lambda-Code als Wert einer Variablen zuweisen. Sie übergeben diese Variable an die Methode SProcRegistration.runLocally
, um sie auf der Client-Seite auszuführen. Sie können die Variable auch verwenden, um die Funktion zu repräsentieren, wenn Sie die Prozedur registrieren.
Der Code im folgenden Beispiel weist die Funktion der Variablen func
zu. Anschließend wird die Funktion lokal getestet, indem die Variable mit dem Argumentwert der Funktion an die Methode SProcRegistration.runLocally
übergeben wird. Die Variable wird auch verwendet, um die Prozedur zu registrieren.
val session = Session.builder.configFile("my_config.properties").create
// Assign the lambda function.
val func = (session: Session, num: Int) => num + 1
// Execute the function locally.
val result = session.sproc.runLocally(func, 1)
print("\nResult: " + result)
Ausführen einer Prozedur auf dem Server¶
Um eine Prozedur in der Snowflake-Umgebung auf dem Server auszuführen, verwenden Sie die Methode Session.storedProcedure
. Diese Methode gibt ein DataFrame
-Objekt zurück.
Sie können beispielsweise Folgendes ausführen:
Eine temporäre oder permanente Prozedur, die Sie mit der Snowpark-API erstellen.
Eine Prozedur, die mit einer CREATE PROCEDURE-Anweisung erstellt wurde.
Der Code im folgenden Beispiel erstellt eine temporäre Prozedur, die auf dem Server ausgeführt wird, aber nur so lange verfügbar ist wie die aktuelle Snowpark-Sitzung. Anschließend wird die Prozedur ausgeführt, wobei sowohl der Name der Prozedur als auch die StoredProcedure
-Variable verwendet werden, die die Prozedur repräsentiert.
val session = Session.builder.configFile("my_config.properties").create
val name: String = "add_two"
val tempSP: StoredProcedure =
session.sproc.registerTemporary(
name,
(session: Session, num: Int) => num + 2
)
session.storedProcedure(name, 1).show()
// Execute the procedure on the server by passing the procedure's name.
session.storedProcedure(incrementProc, 1).show();
// Execute the procedure on the server by passing a variable
// representing the procedure.
session.storedProcedure(tempSP, 1).show();