Aufrufen von Funktionen und gespeicherten Prozeduren in Snowpark-Java

Zur Verarbeitung von Daten in einem DataFrame können Sie systemdefinierte SQL-Funktionen, benutzerdefinierte Funktionen und gespeicherte Prozeduren aufrufen. Unter diesem Thema wird erklärt, wie Sie diese Funktionen in Snowpark aufrufen können.

Unter diesem Thema:

Aufrufen von systemdefinierten Funktionen

Wenn Sie systemdefinierte SQL-Funktionen aufrufen möchten, müssen Sie die entsprechenden statischen Methoden der Functions-Klasse verwenden.

Im folgenden Beispiel wird die statische Methode upper der Functions-Klasse (das Äquivalent zur systemdefinierten Funktion UPPER) aufgerufen, um die Werte in der Namensspalte mit Großbuchstaben zurückzugeben:

DataFrame df = session.table("sample_product_data");
df.select(Functions.upper(Functions.col("name"))).show();
Copy

Wenn in der Klasse Functions keine systemdefinierte SQL-Funktion verfügbar ist, können Sie die statische Methode Functions.callUDF verwenden, um die systemdefinierte Funktion aufzurufen.

Für callUDF übergeben Sie den Namen der systemdefinierten Funktion als erstes Argument. Wenn Sie die Werte von Spalten an die systemdefinierte Funktion übergeben müssen, definieren Sie Column-Objekte und übergeben diese als zusätzliche Argumente an die callUDF-Funktion.

Im folgenden Beispiel wird die systemdefinierte Funktion RADIANS aufgerufen, die den Wert aus der Spalte degrees übergibt:

// Call the system-defined function RADIANS() on degrees.
DataFrame dfDegrees = session.range(0, 360, 45).rename("degrees", Functions.col("id"));
dfDegrees.select(Functions.col("degrees"), Functions.callUDF("radians", Functions.col("degrees"))).show();
Copy

Die Funktion callUDF gibt einen Column-Wert zurück, den Sie an die DataFrame-Transformationsmethoden (z. B. Filtern oder Auswählen) übergeben können.

Aufrufen von skalaren benutzerdefinierten Funktionen (UDFs)

Die Methode zum Aufrufen einer UDF hängt davon ab, wie die UDF erstellt wurde:

  • Um eine anonyme UDF aufzurufen, rufen Sie die apply-Methode des UserDefinedFunction-Objekts auf, das beim Erstellen der UDF zurückgegeben wurde.

    Die Argumente, die Sie an eine UDF übergeben, müssen Column-Objekte sein. Wenn Sie ein Literal übergeben müssen, verwenden Sie Functions.lit(), wie unter Verwenden von Literalen als Spaltenobjekte erklärt.

  • Zum Aufrufen von UDFs, die Sie mit Namen registriert haben, und von UDFs, die Sie durch Ausführen von CREATE FUNCTION erstellt haben, verwenden Sie die statische Methode Functions.callUDF.

    Übergeben Sie den Namen der UDF als erstes Argument und alle UDF-Parameter als weitere Argumente.

Der Aufruf einer UDF gibt ein Column-Objekt zurück, das den Rückgabewert der UDF enthält.

Im folgenden Beispiel wird die UDF-Funktion doubleUdf aufgerufen, wobei der Wert aus der Spalte quantity übergeben werden. Im Beispiel wird der Rückgabewert von doubleUdf an die Methode select des DataFrame übergeben.

import com.snowflake.snowpark_java.types.*;
...
// Create and register a temporary named UDF
// that takes in an integer argument and returns an integer value.
UserDefinedFunction doubleUdf =
  session
    .udf()
    .registerTemporary(
      "doubleUdf",
      (Integer x) -> x + x,
      DataTypes.IntegerType,
      DataTypes.IntegerType);
// Call the named UDF, passing in the "quantity" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleQuantity".
DataFrame df = session.table("sample_product_data");
DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", doubleUdf.apply(Functions.col("quantity")));
dfWithDoubleQuantity.show();
Copy

Aufrufen von Tabellenfunktionen (Systemfunktionen und UDTFs)

So rufen Sie eine Tabellenfunktion oder eine benutzerdefinierte Tabellenfunktion (UDTF) auf:

  1. Erstellen Sie ein TableFunction-Objekt, und übergeben Sie den Namen der Tabellenfunktion.

  2. Rufen Sie die tableFunction-Methode des Session-Objekts auf, und übergeben Sie das TableFunction-Objekt und eine Zuordnung (Map) der Namen und Werte der Eingabeargumente.

table?Function gibt einen DataFrame zurück, der die Ausgabe der Tabellenfunktion enthält.

Angenommen, Sie haben den folgenden Befehl ausgeführt, um einen SQL-UDTF zu erstellen:

CREATE OR REPLACE FUNCTION product_by_category_id(cat_id INT)
  RETURNS TABLE(id INT, name VARCHAR)
  AS
  $$
    SELECT id, name
      FROM sample_product_data
      WHERE category_id = cat_id
  $$
  ;
Copy

Der folgende Code ruft diese UDTF auf und erstellt einen DataFrame für die Ausgabe der UDTF. Im Beispiel werden die ersten 10 Zeilen der Ausgabe auf der Konsole ausgegeben.

import java.util.HashMap;
import java.util.Map;
...

Map<String, Column> arguments = new HashMap<>();
arguments.put("cat_id", Functions.lit(10));
DataFrame dfTableFunctionOutput = session.tableFunction(new TableFunction("product_by_category_id"), arguments);
dfTableFunctionOutput.show();
Copy

Wenn Sie die Ausgabe einer Tabellenfunktion mit einem DataFrame verbinden müssen, rufen Sie die join-Methode auf, die ein TableFunction-Objekt übergibt.

Aufrufen von gespeicherten Prozeduren

Sie können eine Prozedur entweder auf der Serverseite (in der Snowflake-Umgebung) oder lokal ausführen. Denken Sie daran, dass die beiden Umgebungen unterschiedlich sind und die Bedingungen und Ergebnisse der Ausführung von Prozeduren voneinander abweichen können.

Sie können eine Prozedur mit der Snowpark-API auf eine der folgenden Arten aufrufen:

  • Führen Sie eine Funktion zum Testen und Debuggen lokal aus, indem Sie die Methode SProcRegistration.runLocally verwenden.

  • Führen Sie eine Prozedur in der serverseitigen Snowflake-Umgebung mit einer der Methoden Session.storedProcedure aus. Dazu gehört eine Prozedur, die auf die aktuelle Sitzung beschränkt ist, oder eine permanente gespeicherte Prozedur, die auf Snowflake gespeichert ist.

Sie können eine permanente gespeicherte Prozedur, die Sie mit der Snowpark-API erstellt haben, auch aus SQL-Code aufrufen. Weitere Informationen dazu finden Sie unter Aufrufen einer gespeicherten Prozedur.

Weitere Informationen zum Erstellen von Prozeduren mit der Snowpark-API finden Sie unter Erstellen von gespeicherten Prozeduren für DataFrames in Java.

Lokales Ausführen der Prozedurlogik

Sie können die Lambda-Funktion für Ihre Prozedur in Ihrer lokalen Umgebung mit der Methode SProcRegistration.runLocally ausführen. Die Methode führt die Funktion aus und gibt das Ergebnis mit dem von der Funktion zurückgegebenen Typ zurück.

Sie können zum Beispiel eine Lambda-Funktion, die Sie in einer Prozedur verwenden möchten, aufrufen, bevor Sie eine Prozedur daraus in Snowflake registrieren. Sie beginnen damit, dass Sie den Lambda-Code als Wert einer Variablen zuweisen, deren Typ eine der com.snowflake.snowpark_java.sproc.JavaSProc-Schnittstellen ist. Bei Verwendung dieser Variablen können Sie die Funktion mit der Methode SProcRegistration.runLocally testen. Sie können die Variable auch verwenden, um die Funktion zu repräsentieren, wenn Sie die Prozedur registrieren.

Der Code im folgenden Beispiel initialisiert eine JavaSProc-Variable aus der Lambda-Funktion, die die Logik der Prozedur sein wird. Dann wird die Funktion getestet, indem die Variable mit dem Argument der Funktion an die Methode SProcRegistration.runLocally übergeben wird. Die Variable wird auch verwendet, um die Funktion zu registrieren.

Session session = Session.builder().configFile("my_config.properties").create();

// Assign the lambda function to a variable.
JavaSProc1<Integer, Integer> func =
  (Session session, Integer num) -> num + 1;

// Execute the function locally.
int result = (Integer)session.sproc().runLocally(func, 1);
System.out.println("\nResult: " + result);

// Register the procedure.
StoredProcedure sp =
  session.sproc().registerTemporary(
    func,
    DataTypes.IntegerType,
    DataTypes.IntegerType
  );

// Execute the procedure on the server.
session.storedProcedure(sp, 1).show();
Copy

Ausführen einer Prozedur auf dem Server

Um eine Prozedur in der Snowflake-Umgebung auf dem Server auszuführen, verwenden Sie die Methode Session.storedProcedure. Diese Methode gibt ein DataFrame-Objekt zurück.

Sie können beispielsweise Folgendes ausführen:

Der Code im folgenden Beispiel erstellt eine temporäre Prozedur, die auf dem Server ausgeführt wird, aber nur so lange verfügbar ist wie die aktuelle Snowpark-Sitzung. Anschließend wird die Prozedur ausgeführt, wobei sowohl der Name der Prozedur als auch die Variable com.snowflake.snowpark_java.StoredProcedure verwendet werden, die die Prozedur repräsentiert.

Session session = Session.builder().configFile("my_config.properties").create();

String incrementProc = "increment";

// Register the procedure.
StoredProcedure tempSP =
  session.sproc().registerTemporary(
    incrementProc,
    (Session session, Integer num) -> num + 1,
    DataTypes.IntegerType,
    DataTypes.IntegerType
  );

// Execute the procedure on the server by passing the procedure's name.
session.storedProcedure(incrementProc, 1).show();

// Execute the procedure on the server by passing a variable
// representing the procedure.
session.storedProcedure(tempSP, 1).show();
Copy