Pythonでの DataFrames 用ユーザー定義関数(UDFs)の作成

Snowpark API は、Pythonのラムダまたは関数からユーザー定義関数を作成するために使用できるメソッドを提供します。このトピックでは、これらのタイプの関数を作成する方法について説明します。

このトピックの内容:

概要

Snowparkを使用すると、カスタムラムダと関数のユーザー定義関数(UDFs)を作成でき、これらの UDFs を呼び出して DataFrame のデータを処理できます。

Snowpark API を使用して UDF を作成すると、Snowparkライブラリは関数のコードを内部ステージにアップロードします。UDF を呼び出すと、Snowparkライブラリはデータがあるサーバー上で関数を実行します。その結果、関数でデータを処理するためにデータをクライアントに転送する必要はありません。

カスタムコードでは、Pythonファイルまたはサードパーティパッケージからモジュールをインポートすることもできます。

次に挙げる2つの方法のいずれかで、カスタムコードの UDF を作成できます。

  • 匿名の UDF を作成し、関数を変数に割り当てることができます。この変数がスコープ内にある限り、この変数を使用して UDF を呼び出すことができます。

  • 名前付き UDF を作成し、名前で UDF を呼び出すことができます。これは、たとえば、名前で UDF を呼び出す必要がある場合や、後続のセッションで UDF を使用する必要がある場合に使用できます。

次のセクションでは、ローカル開発環境または Python ワークシート を使用してこれらの UDFs を作成する方法について説明します。

CREATE FUNCTION コマンドを実行して UDF を定義した場合は、Snowparkでその UDF を呼び出すことができます。詳細については、 ユーザー定義関数(UDFs)の呼び出し をご参照ください。

注釈

ベクトル化されたPython UDFs を使用すると、入力行のバッチをPandas DataFrames として受け取るPython関数を定義できます。これにより、機械学習の推論シナリオでのパフォーマンスが大幅に向上します。詳細については、 ベクトル化された UDFs の使用 をご参照ください。

注釈

Pythonワークシートを使用している場合は、ハンドラー関数内でこれらの例を使用します。

import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col

def main(session: snowpark.Session):
   df_table = session.table("sample_product_data")
Copy

例が Row オブジェクトの list など、 DataFrame 以外のものを返す場合は、例の戻り型と一致するように 戻り型を変更 します。

コード例を実行した後、 Results タブを使用して、返された出力を表示します。詳細については、 Pythonワークシートの実行 をご参照ください。

UDF の依存関係の指定

Snowpark API を使用して UDF を定義するには、Pythonファイル、zipファイル、リソースファイルなど、UDF が依存するモジュールを含むファイルをインポートする必要があります。

ディレクトリを指定することもでき、Snowparkライブラリはディレクトリを自動的に圧縮してzipファイルとしてアップロードします。(UDF からの読み取りリソースの詳細については、 UDF を使用したファイルの読み取り を参照。)

Session.add_import() を呼び出す場合、Snowparkライブラリは、指定したファイルを内部ステージにアップロードし、UDF の実行時にファイルをインポートします。

次の例は、コードへの依存関係としてステージにzipファイルを追加する方法を示しています。

>>> # Add a zip file that you uploaded to a stage.
>>> session.add_import("@my_stage/<path>/my_library.zip")  
Copy

次の例は、ローカルマシンからPythonファイルを追加する方法を示しています。

>>> # Import a Python file from your local machine.
>>> session.add_import("/<path>/my_module.py")  

>>> # Import a Python file from your local machine and specify a relative Python import path.
>>> session.add_import("/<path>/my_module.py", import_path="my_dir.my_module")  
Copy

次の例は、他のタイプの依存関係を追加する方法を示しています。

>>> # Add a directory of resource files.
>>> session.add_import("/<path>/my-resource-dir/")  

>>> # Add a resource file.
>>> session.add_import("/<path>/my-resource.xml")  
Copy

注釈

Python Snowparkライブラリは自動的にアップロードされません。

次の依存関係は、指定する必要がありません。

  • Python組み込みライブラリ。

    これらのライブラリは、 UDFs が実行されるサーバーのランタイム環境ですでに使用可能です。

UDFでのAnacondaのサードパーティパッケージの使用

UDF でSnowflake Anacondaチャネルのサードパーティパッケージを使用できます。

  • PythonワークシートでPython UDF を作成する場合、Anacondaパッケージは既にワークシートで使用できます。 ステージからワークシートにPythonファイルを追加する をご参照ください。

  • ローカル開発環境でPython UDF を作成する場合、インストールするAnacondaパッケージを指定できます。

Python UDFsを呼び出すクエリがSnowflakeウェアハウス内で実行されると、ユーザーに代わってAnacondaパッケージをシームレスにインストールし、仮想ウェアハウスにキャッシュします。

ベストプラクティス、利用可能なパッケージの表示方法、ローカル開発環境のセットアップ方法の詳細については、 サードパーティパッケージの使用 をご参照ください。

ローカル開発環境でPython UDF を作成する場合、 session.add_packages を使用してセッションレベルでパッケージを追加します。

このコード例は、パッケージをインポートしてそのバージョンを返す方法を示しています。

>>> import numpy as np
>>> import pandas as pd
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import udf

>>> session.add_packages("numpy", "pandas", "xgboost==1.5.0")

>>> @udf
... def compute() -> list:
...    return [np.__version__, pd.__version__, xgb.__version__]
Copy

session.add_requirements を使用して、 要件ファイル でパッケージを指定することもできます。

>>> session.add_requirements("mydir/requirements.txt")  
Copy

UDFレベルのパッケージを追加して、以前に追加した可能性のあるセッションレベルのパッケージを上書きできます。

>>> import numpy as np
>>> import pandas as pd
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import udf

>>> @udf(packages=["numpy", "pandas", "xgboost==1.5.0"])
... def compute() -> list:
...     return [np.__version__, pd.__version__, xgb.__version__]
Copy

重要

パッケージバージョンを指定しない場合、Snowflakeは依存関係を解決するときに最新バージョンを使用します。ただし、UDF を本番環境にデプロイする場合は、コードが常に同じ依存関係バージョンを使用するようにする必要があります。これは、永続および仮UDFsの両方に対して実行できます。

  • パーマネントUDFを作成すると、UDFは1回だけ作成および登録されます。これにより、依存関係が一度解決され、選択したバージョンが本番ワークロードに使用されます。UDF が実行されると、常に同じ依存バージョンが使用されます。

  • 仮UDFを作成するときは、バージョン仕様の一部として依存関係のバージョンを指定します。そうすると、UDF が登録されると、パッケージ解決は指定されたバージョンを使用します。バージョンを指定しないと、新しいバージョンが利用可能になったときに依存関係が更新される可能性があります。

匿名 UDF の作成

匿名 UDF を作成するには、次のいずれかを実行できます。

  • snowflake.snowpark.functions モジュールで udf 関数を呼び出し、匿名関数の定義を渡します。

  • UDFRegistration クラスで register メソッドを呼び出し、匿名関数の定義を渡します。

匿名のUDFの例を次に示します。

>>> from snowflake.snowpark.types import IntegerType
>>> from snowflake.snowpark.functions import udf

>>> add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()])
Copy

注釈

複数のセッションで実行される可能性のあるコードを作成する場合は、 udf 関数を使用するのではなく、 register メソッドを使用してUDFsを登録してください。これにより、デフォルトのSnowflake Session オブジェクトが見つからないエラーを防ぐことができます。

名前付き UDF の作成と登録

UDFを名前で呼び出す場合(例: functions モジュールの call_udf 関数を使用する場合)、名前付きUDFを作成して登録できます。これを実行するには、次のいずれかを使用します。

  • name が引数の UDFRegistration クラスの register メソッド。

  • name が引数の snowflake.snowpark.functions モジュールの udf 関数。

UDFRegistration クラスの属性またはメソッドにアクセスするには、 Session クラスの udf プロパティを呼び出します。

register の呼び出し、または udf は、現在のセッションで使用できる仮UDFを作成します。

永続的なUDFを作成するには、 register メソッドまたは udf 関数を呼び出し、 is_permanent 引数を True に設定します。永続的なUDFを作成するときは、 stage_location 引数を、UDFのPythonファイルとその依存関係がアップロードされるステージの場所に設定する必要もあります。

名前付き仮UDFを登録する方法の例を次に示します。

>>> from snowflake.snowpark.types import IntegerType
>>> from snowflake.snowpark.functions import udf

>>> add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()], name="my_udf", replace=True)
Copy

is_permanent 引数を True に設定して、名前付き永続UDFを登録する方法の例を次に示します。

>>> @udf(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True)
... def minus_one(x: int) -> int:
...   return x-1
Copy

これらのUDFsが呼び出される例を次に示します。

>>> df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b")
>>> df.select(add_one("a"), minus_one("b")).collect()
[Row(MY_UDF("A")=2, MINUS_ONE("B")=1), Row(MY_UDF("A")=4, MINUS_ONE("B")=3)]
Copy

SQL を使用して UDF を呼び出すこともできます。

>>> session.sql("select minus_one(1)").collect()
[Row(MINUS_ONE(1)=0)]
Copy

Pythonソースファイルからの UDF の作成

ローカル開発環境で UDF を作成する場合、Pythonファイルで UDF ハンドラーを定義し、 UDFRegistration クラスの register_from_file メソッドを使用して UDF を作成することもできます。

注釈

このメソッドはPythonワークシートでは使用できません。

次は register_from_file の使用例です。

以下を含むPythonファイル test_udf_file.py があるとします。

def mod5(x: int) -> int:
    return x % 5
Copy

次に、ファイル test_udf_file.py のこの関数から UDF を作成できます。

>>> # mod5() in that file has type hints
>>> mod5_udf = session.udf.register_from_file(
...     file_path="tests/resources/test_udf_dir/test_udf_file.py",
...     func_name="mod5",
... )  
>>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()  
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
Copy

ファイルをステージの場所にアップロードし、それを使用して UDF を作成することもできます。

>>> from snowflake.snowpark.types import IntegerType
>>> # suppose you have uploaded test_udf_file.py to stage location @mystage.
>>> mod5_udf = session.udf.register_from_file(
...     file_path="@mystage/test_udf_file.py",
...     func_name="mod5",
...     return_type=IntegerType(),
...     input_types=[IntegerType()],
... )  
>>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()  
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
Copy

UDF を使用したファイルの読み取り

ファイルのコンテンツを読み取るには、Pythonコードで、

静的に指定されたファイルの読み取り

Snowparkライブラリは、サーバーに UDFs をアップロードして実行します。UDF がファイルからデータを読み取る必要がある場合は、ファイルが UDF とともにアップロードされていることを確認する必要があります。

注釈

UDF をPythonワークシートに記述した場合、UDF はステージからのみファイルを読み取ることができます。

ファイルを読み取るように UDF を設定するには、

  1. ファイルが依存関係であることを指定します。これにより、ファイルがサーバーにアップロードされます。詳細については、 UDF の依存関係の指定 をご参照ください。

    例:

    >>> # Import a file from your local machine as a dependency.
    >>> session.add_import("/<path>/my_file.txt")  
    
    >>> # Or import a file that you uploaded to a stage as a dependency.
    >>> session.add_import("@my_stage/<path>/my_file.txt")  
    
    Copy
  2. UDFで、ファイルを読み取ります。次の例では、ファイルは UDF の作成中に一度だけ読み取られ、 UDF の実行中に再度読み取られることはありません。これは、サードパーティライブラリ cachetools を使用して実現されます。

    >>> import sys
    >>> import os
    >>> import cachetools
    >>> from snowflake.snowpark.types import StringType
    >>> @cachetools.cached(cache={})
    ... def read_file(filename):
    ...     import_dir = sys._xoptions.get("snowflake_import_directory")
    ...     if import_dir:
    ...         with open(os.path.join(import_dir, filename), "r") as f:
    ...             return f.read()
    >>>
    >>> # create a temporary text file for test
    >>> temp_file_name = "/tmp/temp.txt"
    >>> with open(temp_file_name, "w") as t:
    ...     _ = t.write("snowpark")
    >>> session.add_import(temp_file_name)
    >>> session.add_packages("cachetools")
    >>>
    >>> def add_suffix(s):
    ...     return f"{read_file(os.path.basename(temp_file_name))}-{s}"
    >>>
    >>> concat_file_content_with_str_udf = session.udf.register(
    ...     add_suffix,
    ...     return_type=StringType(),
    ...     input_types=[StringType()]
    ... )
    >>>
    >>> df = session.create_dataframe(["snowflake", "python"], schema=["a"])
    >>> df.select(concat_file_content_with_str_udf("a")).to_df("col1").collect()
    [Row(COL1='snowpark-snowflake'), Row(COL1='snowpark-python')]
    >>> os.remove(temp_file_name)
    >>> session.clear_imports()
    
    Copy

SnowflakeFile で動的に指定されたファイルの読み取り

Snowpark snowflake.snowpark.files モジュールの SnowflakeFile クラスを使用すると、ステージからファイルを読み取ることができます。 SnowflakeFile クラスは、動的なファイルアクセスを提供し、あらゆるサイズのファイルをストリーミングすることができます。動的なファイルアクセスは、複数のファイルを反復処理する場合にも有効です。例については、 複数ファイルの処理 をご参照ください。

SnowflakeFile を使用したファイルの読み取りの詳細および例については、 Python UDF ハンドラーで SnowflakeFile クラスを使用したファイルの読み取り をご参照ください。

次の例では、 SnowflakeFile を使用してステージからテキストファイルを読み取り、ファイルの長さを返す仮 UDF を登録します。

UDF を登録します。

import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import udf
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark.types import StringType, IntegerType

@udf(name="get_file_length", replace=True, input_types=[StringType()], return_type=IntegerType(), packages=['snowflake-snowpark-python'])
def get_file_length(file_path):
  with SnowflakeFile.open(file_path) as f:
    s = f.read()
  return len(s);
Copy

UDF を呼び出します。

session.sql("select get_file_length(build_scoped_file_url(@my_stage, 'example-file.txt'));")
Copy

ベクトル化された UDFs の使用

ベクトル化されたPython UDFs を使用すると、入力行のバッチを Pandas DataFrames として受け取り、結果のバッチを Pandas配列 または Series として返すPython関数を定義します。Snowpark dataframe の列は、UDF内のPandasシリーズとしてベクトル化されます。

バッチインターフェイスの使用方法の例を次に示します。

from sklearn.linear_model import LinearRegression
model = LinearRegression()
model.fit(X, y)

@udf(packages=['pandas', 'scikit-learn','xgboost'])
def predict(df: PandasDataFrame[float, float, float, float]) -> PandasSeries[float]:
    # The input pandas DataFrame doesn't include column names. Specify the column names explicitly when needed.
    df.columns = ["col1", "col2", "col3", "col4"]
    return model.predict(df)
Copy

他のPython UDFs を呼び出すのと同じ方法で、ベクトル化されたPython UDFs を呼び出します。詳細については、 ベクトル化されたPython UDFs をご参照ください。ここでは、 SQL ステートメントを使用してベクトル化された UDF を作成する方法が説明されています。たとえば、 SQL ステートメントでPythonコードを指定すると、 vectorized デコレーターを使用できます。このドキュメントで説明されているSnowpark Python API を使用すると、 SQL ステートメントを使用してベクトル化された UDF を作成する必要がなくなります。したがって、 vectorized デコレーターは使用しません。

バッチあたりの行数を制限することができます。詳細については、 ターゲットバッチサイズの設定 をご参照ください。

Snowpark Python API を使用してベクトル化された UDFs を作成する詳細な説明と例については、 Snowpark API リファレンスの UDFs セクション をご参照ください。