Snowpark Javaでの関数およびストアドプロシージャの呼び出し

DataFrame でデータを処理するには、システム定義の SQL 関数、ユーザー定義関数、およびストアドプロシージャを呼び出すことができます。このトピックでは、Snowparkでこれらを呼び出す方法について説明します。

このトピックの内容:

システム定義関数の呼び出し

システム定義の SQL 関数 を呼び出す必要がある場合は、 関数クラス で同等の静的メソッドを使用します。

次の例では、 Functions クラスの upper 静的メソッド(システム定義の UPPER 関数と同等)を呼び出して、名前列の値を大文字で返します。

DataFrame df = session.table("sample_product_data");
df.select(Functions.upper(Functions.col("name"))).show();
Copy

システム定義の SQL 関数が Functions クラスで使用できない場合は、 Functions.callUDF 静的メソッドを使用してシステム定義関数を呼び出すことができます。

callUDF の場合は、最初の引数としてシステム定義関数の名前を渡します。列の値をシステム定義関数に渡す必要がある場合は、 オブジェクトを callUDF メソッドへの追加の引数として定義して渡します。

次の例では、システム定義関数 RADIANS を呼び出し、列 degrees から値を渡します。

// Call the system-defined function RADIANS() on degrees.
DataFrame dfDegrees = session.range(0, 360, 45).rename("degrees", Functions.col("id"));
dfDegrees.select(Functions.col("degrees"), Functions.callUDF("radians", Functions.col("degrees"))).show();
Copy

callUDF メソッドは、 Column を返します。これは、 DataFrame 変換メソッド (例: フィルター、選択など)に渡すことができます。

スカラーユーザー定義関数(UDFs)の呼び出し

UDF を呼び出すメソッドは、 UDF がどのように作成されたかによって異なります。

  • 匿名の UDF を呼び出すには、 UDF の作成時に返された UserDefinedFunction オブジェクトの apply メソッドを呼び出します。

    UDF に渡す引数は、 オブジェクトである必要があります。リテラルを渡す必要がある場合は、 列オブジェクトとしてのリテラルの使用 で説明されているように、 Functions.lit() を使用します。

  • 名前で登録 した UDFs と、 CREATE FUNCTION を実行して作成した UDFs を呼び出すには、 Functions.callUDF 静的メソッドを使用します。

    UDF の名前を最初の引数として渡し、 UDF パラメーターを追加の引数として渡します。

UDF を呼び出すと、 UDF の戻り値を含む Column オブジェクトが返されます。

次の例では、 UDF 関数 doubleUdf を呼び出し、列 quantity から値を渡します。この例では、戻り値を doubleUdf から DataFrame の select メソッドに渡します。

import com.snowflake.snowpark_java.types.*;
...
// Create and register a temporary named UDF
// that takes in an integer argument and returns an integer value.
UserDefinedFunction doubleUdf =
  session
    .udf()
    .registerTemporary(
      "doubleUdf",
      (Integer x) -> x + x,
      DataTypes.IntegerType,
      DataTypes.IntegerType);
// Call the named UDF, passing in the "quantity" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleQuantity".
DataFrame df = session.table("sample_product_data");
DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", doubleUdf.apply(Functions.col("quantity")));
dfWithDoubleQuantity.show();
Copy

テーブル関数の呼び出し(システム関数および UDTFs)

テーブル関数 または ユーザー定義テーブル関数(UDTF) を呼び出すには、

  1. TableFunction オブジェクトを作成し、テーブル関数の名前を渡します。

  2. SessionオブジェクトのtableFunctionメソッド を呼び出し、 TableFunction オブジェクトと入力引数の名前と値の Map を渡します。

table?Function は、テーブル関数の出力を含む DataFrame を返します。

たとえば、次のコマンドを実行して SQL UDTF を作成したとします。

CREATE OR REPLACE FUNCTION product_by_category_id(cat_id INT)
  RETURNS TABLE(id INT, name VARCHAR)
  AS
  $$
    SELECT id, name
      FROM sample_product_data
      WHERE category_id = cat_id
  $$
  ;
Copy

次のコードではこれを UDTF と呼び、 UDTF の出力用に DataFrame を作成します。この例では、出力の最初の10行をコンソールに出力します。

import java.util.HashMap;
import java.util.Map;
...

Map<String, Column> arguments = new HashMap<>();
arguments.put("cat_id", Functions.lit(10));
DataFrame dfTableFunctionOutput = session.tableFunction(new TableFunction("product_by_category_id"), arguments);
dfTableFunctionOutput.show();
Copy

テーブル関数の出力を DataFrame と結合する必要がある場合は、 TableFunction で渡す結合メソッド を呼び出します。

ストアドプロシージャの呼び出し

プロシージャは、サーバー側(Snowflake環境内)またはローカルで実行できます。この2つの環境は異なるため、プロシージャの実行条件や結果が異なる可能性があることに留意してください。

Snowpark API を使用してプロシージャを呼び出すには、次のいずれかの方法があります。

  • SProcRegistration.runLocally メソッドを使用して、テストやデバッグ用に関数をローカルで実行する。

  • Session.storedProcedure メソッドの1つを使用して、サーバー側のSnowflake環境でプロシージャを実行する。これには、現在のセッションにスコープされたプロシージャや、Snowflakeに保存された永続プロシージャが含まれます。

Snowpark API で作成した永続ストアドプロシージャを SQL コードから呼び出すこともできます。詳細については、 ストアドプロシージャの呼び出し をご参照ください。

Snowpark API を使用したプロシージャの作成については、 Javaにおける DataFrames のストアドプロシージャの作成 をご参照ください。

ローカルにおけるプロシージャのロジックの実行

SProcRegistration.runLocally メソッドを使用して、ローカル環境でプロシージャのラムダ関数を実行できます。メソッドは関数を実行し、その結果を関数が返す型として返します。

たとえば、Snowflakeでプロシージャを登録する前に、プロシージャで使用する予定のラムダ関数を呼び出すことができます。まず、ラムダコードを com.snowflake.snowpark_java.sproc.JavaSProc インターフェイスのいずれかを型とする変数に値として代入します。その変数を使用して、 SProcRegistration.runLocally メソッドで関数をテストで呼び出すことができます。また、プロシージャを登録する際に、関数を表す変数を使用することもできます。

次の例のコードは、プロシージャのロジックとなるラムダ関数から JavaSProc 変数を初期化します。そして、その変数を関数の引数とともに SProcRegistration.runLocally メソッドに渡すことで、関数をテストします。この変数は関数の登録にも使われます。

Session session = Session.builder().configFile("my_config.properties").create();

// Assign the lambda function to a variable.
JavaSProc1<Integer, Integer> func =
  (Session session, Integer num) -> num + 1;

// Execute the function locally.
int result = (Integer)session.sproc().runLocally(func, 1);
System.out.println("\nResult: " + result);

// Register the procedure.
StoredProcedure sp =
  session.sproc().registerTemporary(
    func,
    DataTypes.IntegerType,
    DataTypes.IntegerType
  );

// Execute the procedure on the server.
session.storedProcedure(sp, 1).show();
Copy

サーバー上でのプロシージャの実行

サーバー上のSnowflake環境でプロシージャを実行するには、 Session.storedProcedure メソッドを使用します。メソッドは DataFrame オブジェクトを返します。

たとえば、次を実行できます。

次の例のコードは、サーバー上で実行される仮のプロシージャを作成します。そして、プロシージャの名前とそれを表す com.snowflake.snowpark_java.StoredProcedure 変数の両方を使用してプロシージャを実行します。

Session session = Session.builder().configFile("my_config.properties").create();

String incrementProc = "increment";

// Register the procedure.
StoredProcedure tempSP =
  session.sproc().registerTemporary(
    incrementProc,
    (Session session, Integer num) -> num + 1,
    DataTypes.IntegerType,
    DataTypes.IntegerType
  );

// Execute the procedure on the server by passing the procedure's name.
session.storedProcedure(incrementProc, 1).show();

// Execute the procedure on the server by passing a variable
// representing the procedure.
session.storedProcedure(tempSP, 1).show();
Copy