Snowpark Scalaでの関数の呼び出しとストアドプロシージャ

DataFrame でデータを処理するには、システム定義の SQL 関数、ユーザー定義関数、およびストアドプロシージャを呼び出すことができます。このトピックでは、Snowparkでこれらを呼び出す方法について説明します。

このトピックの内容:

システム定義関数の呼び出し

システム定義の SQL 関数 を呼び出す必要がある場合は、 com.snowflake.snowpark.functionsオブジェクト で同等の関数を使用します。

次の例では、 functions オブジェクトの upper 関数(システム定義の UPPER 関数と同等)を呼び出して、名前列の値を大文字で返します。

// Import the upper function from the functions object.
import com.snowflake.snowpark.functions._
...
session.table("products").select(upper(col("name"))).show()
Copy

システム定義の SQL 関数が関数オブジェクトで使用できない場合は、次のいずれかの方法を使用できます。

  • callBuiltin 関数を使用して、システム定義関数を呼び出します。

  • builtin 関数を使用して、システム定義関数の呼び出しに使用できる関数オブジェクトを作成します。

callBuiltin および builtin は、 com.snowflake.snowpark.functions オブジェクトで定義されています。

callBuiltin の場合は、最初の引数としてシステム定義関数の名前を渡します。列の値をシステム定義関数に渡す必要がある場合は、 オブジェクトを callBuiltin 関数への追加の引数として定義して渡します。

次の例では、システム定義関数 RADIANS を呼び出し、列 col1 から値を渡します。

// Import the callBuiltin function from the functions object.
import com.snowflake.snowpark.functions._
...
// Call the system-defined function RADIANS() on col1.
val result = df.select(callBuiltin("radians", col("col1"))).collect()
Copy

callBuiltin 関数は、 Column を返します。これは、 DataFrame 変換メソッド (例: フィルター、選択など)に渡すことができます。

builtin の場合は、システム定義関数の名前を渡し、返された関数オブジェクトを使用してシステム定義関数を呼び出します。例:

// Import the callBuiltin function from the functions object.
import com.snowflake.snowpark.functions._
...
// Create a function object for the system-defined function RADIANS().
val radians = builtin("radians")
// Call the system-defined function RADIANS() on col1.
val result = df.select(radians(col("col1"))).collect()
Copy

スカラーユーザー定義関数(UDFs)の呼び出し

UDF を呼び出すメソッドは、 UDF がどのように作成されたかによって異なります。

  • 匿名の UDF を呼び出すには、 UDF の作成時に返された UserDefinedFunction オブジェクトの apply メソッドを呼び出します。

    UDF に渡す引数は、 オブジェクトである必要があります。リテラルを渡す必要がある場合は、 列オブジェクトとしてのリテラルの使用 で説明されているように、 lit() を使用します。

  • 名前で登録 した UDFs と、 CREATE FUNCTION を実行して作成した UDFs を呼び出すには、 com.snowflake.snowpark.functions オブジェクトで callUDF 関数を使用します。

    UDF の名前を最初の引数として渡し、 UDF パラメーターを追加の引数として渡します。

UDF を呼び出すと、 UDF の戻り値を含む Column オブジェクトが返されます。

次の例では、 UDF 関数 myFunction を呼び出し、列 col1col2 から値を渡します。この例では、戻り値を myFunction から DataFrame の select メソッドに渡します。

// Import the callUDF function from the functions object.
import com.snowflake.snowpark.functions._
...
// Runs the scalar function 'myFunction' on col1 and col2 of df.
val result =
    df.select(
        callUDF("myDB.schema.myFunction", col("col1"), col("col2"))
    ).collect()
Copy

テーブル関数の呼び出し(システム関数および UDTFs)

テーブル関数 または ユーザー定義テーブル関数(UDTF) を呼び出すには、

  1. TableFunction オブジェクトを作成し、テーブル関数の名前を渡します。

    Snowparkで UDTF を作成する場合は、 UDTFRegistration.registerTemporary または UDTFRegistration.registerPermanent メソッドによって返される TableFunction オブジェクトを使用できます。 ユーザー定義のテーブル関数(UDTFs)の作成 をご参照ください。

  2. session.tableFunction を呼び出し、 TableFunction オブジェクト、そして入力引数の名前と値の Map を渡します。

table?Function は、テーブル関数の出力を含む DataFrame を返します。

たとえば、次のコマンドを実行して SQL UDTF を作成したとします。

CREATE OR REPLACE FUNCTION product_by_category_id(cat_id INT)
  RETURNS TABLE(id INT, name VARCHAR)
  AS
  $$
    SELECT id, name
      FROM sample_product_data
      WHERE category_id = cat_id
  $$
  ;
Copy

次のコードではこれを UDTF と呼び、 UDTF の出力用に DataFrame を作成します。この例では、出力の最初の10行をコンソールに出力します。

val dfTableFunctionOutput = session.tableFunction(TableFunction("product_by_category_id"), Map("cat_id" -> lit(10)))
dfTableFunctionOutput.show()
Copy

テーブル関数の出力を DataFrame と結合する必要がある場合は、 TableFunction で渡す DataFrame.joinメソッド を呼び出します。

ストアドプロシージャの呼び出し

プロシージャは、サーバー側(Snowflake環境内)またはローカルで実行できます。この2つの環境は異なるため、プロシージャの実行条件や結果が異なる可能性があることに留意してください。

Snowpark API を使用してプロシージャを呼び出すには、次のいずれかの方法があります。

  • SProcRegistration.runLocally メソッドを使用して、テストやデバッグのためにローカルで関数を実行します。

  • Session.storedProcedure メソッドを使用して、サーバー側のSnowflake環境でプロシージャを実行します。これには、現在のセッションにスコープされたプロシージャや、Snowflakeに保存された永続プロシージャが含まれます。

SnowflakeワークシートからSnowpark API で作成した永続ストアドプロシージャを呼び出すこともできます。詳細については、 ストアドプロシージャの呼び出し をご参照ください。

Snowpark API を使用したプロシージャの作成については、 Scalaにおける DataFrames のストアドプロシージャの作成 をご参照ください。

ローカルにおけるプロシージャのロジックの実行

SProcRegistration.runLocally メソッドを使用して、ローカル環境でプロシージャのラムダ関数を実行できます。メソッドは関数を実行し、その結果を関数が返す型として返します。

たとえば、Snowflakeにプロシージャを登録する前に、プロシージャで使用する予定のラムダ関数をローカルで(クライアント側で)呼び出すことができます。まず、ラムダコードを変数の値として代入します。その変数を SProcRegistration.runLocally メソッドに渡し、クライアント側で実行します。また、プロシージャを登録する際に、関数を表す変数を使用することもできます。

次の例のコードは、 func 変数に関数を代入しています。そして、 SProcRegistration.runLocally メソッドに関数の引数値とともに変数を渡して、ローカルで関数をテストします。この変数は、プロシージャの登録にも使用されます。

val session = Session.builder.configFile("my_config.properties").create

// Assign the lambda function.
val func = (session: Session, num: Int) => num + 1

// Execute the function locally.
val result = session.sproc.runLocally(func, 1)
print("\nResult: " + result)
Copy

サーバー上でのプロシージャの実行

サーバー上のSnowflake環境でプロシージャを実行するには、 Session.storedProcedure メソッドを使用します。メソッドは DataFrame オブジェクトを返します。

たとえば、次を実行できます。

次の例のコードは、サーバー上で実行される仮のプロシージャを作成します。そして、プロシージャの名前とそれを表す StoredProcedure 変数の両方を使用してプロシージャを実行します。

val session = Session.builder.configFile("my_config.properties").create

val name: String = "add_two"

val tempSP: StoredProcedure =
  session.sproc.registerTemporary(
    name,
    (session: Session, num: Int) => num + 2
  )

session.storedProcedure(name, 1).show()

// Execute the procedure on the server by passing the procedure's name.
session.storedProcedure(incrementProc, 1).show();

// Execute the procedure on the server by passing a variable
// representing the procedure.
session.storedProcedure(tempSP, 1).show();
Copy