Pythonでの DataFrames 用ユーザー定義関数(UDFs)の作成¶
Snowpark API は、Pythonのラムダまたは関数からユーザー定義関数を作成するために使用できるメソッドを提供します。このトピックでは、これらのタイプの関数を作成する方法について説明します。
このトピックの内容:
概要¶
Snowparkを使用すると、カスタムラムダと関数のユーザー定義関数(UDFs)を作成でき、これらの UDFs を呼び出して DataFrame のデータを処理できます。
Snowpark API を使用して UDF を作成すると、Snowparkライブラリは関数のコードを内部ステージにアップロードします。UDF を呼び出すと、Snowparkライブラリはデータがあるサーバー上で関数を実行します。その結果、関数でデータを処理するためにデータをクライアントに転送する必要はありません。
カスタムコードでは、Pythonファイルまたはサードパーティパッケージからモジュールをインポートすることもできます。
次に挙げる2つの方法のいずれかで、カスタムコードの UDF を作成できます。
匿名の UDF を作成し、関数を変数に割り当てることができます。この変数がスコープ内にある限り、この変数を使用して UDF を呼び出すことができます。
名前付き UDF を作成し、名前で UDF を呼び出すことができます。これは、たとえば、名前で UDF を呼び出す必要がある場合や、後続のセッションで UDF を使用する必要がある場合に使用できます。
次のセクションでは、ローカル開発環境または Python ワークシート を使用してこれらの UDFs を作成する方法について説明します。
CREATE FUNCTION
コマンドを実行して UDF を定義した場合は、Snowparkでその UDF を呼び出すことができます。詳細については、 ユーザー定義関数(UDFs)の呼び出し をご参照ください。
注釈
ベクトル化されたPython UDFs を使用すると、入力行のバッチをPandas DataFrames として受け取るPython関数を定義できます。これにより、機械学習の推論シナリオでのパフォーマンスが大幅に向上します。詳細については、 ベクトル化された UDFs の使用 をご参照ください。
注釈
Pythonワークシートを使用している場合は、ハンドラー関数内でこれらの例を使用します。
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
def main(session: snowpark.Session):
df_table = session.table("sample_product_data")
例が Row
オブジェクトの list
など、 DataFrame 以外のものを返す場合は、例の戻り型と一致するように 戻り型を変更 します。
コード例を実行した後、 Results タブを使用して、返された出力を表示します。詳細については、 Pythonワークシートの実行 をご参照ください。
UDF の依存関係の指定¶
Snowpark API を使用して UDF を定義するには、Pythonファイル、zipファイル、リソースファイルなど、UDF が依存するモジュールを含むファイルをインポートする必要があります。
Pythonワークシートを使用してこれを行うには、 ステージからワークシートにPythonファイルを追加する を参照してください。
ローカル開発環境を使用してこれを行うには、コードで
Session.add_import()
を呼び出す必要があります。
ディレクトリを指定することもでき、Snowparkライブラリはディレクトリを自動的に圧縮してzipファイルとしてアップロードします。(UDF からの読み取りリソースの詳細については、 UDF を使用したファイルの読み取り を参照。)
Session.add_import()
を呼び出す場合、Snowparkライブラリは、指定したファイルを内部ステージにアップロードし、UDF の実行時にファイルをインポートします。
次の例は、コードへの依存関係としてステージにzipファイルを追加する方法を示しています。
>>> # Add a zip file that you uploaded to a stage.
>>> session.add_import("@my_stage/<path>/my_library.zip")
次の例は、ローカルマシンからPythonファイルを追加する方法を示しています。
>>> # Import a Python file from your local machine.
>>> session.add_import("/<path>/my_module.py")
>>> # Import a Python file from your local machine and specify a relative Python import path.
>>> session.add_import("/<path>/my_module.py", import_path="my_dir.my_module")
次の例は、他のタイプの依存関係を追加する方法を示しています。
>>> # Add a directory of resource files.
>>> session.add_import("/<path>/my-resource-dir/")
>>> # Add a resource file.
>>> session.add_import("/<path>/my-resource.xml")
注釈
Python Snowparkライブラリは自動的にアップロードされません。
次の依存関係は、指定する必要がありません。
Python組み込みライブラリ。
これらのライブラリは、 UDFs が実行されるサーバーのランタイム環境ですでに使用可能です。
UDFでのAnacondaのサードパーティパッケージの使用¶
UDF でSnowflake Anacondaチャネルのサードパーティパッケージを使用できます。
PythonワークシートでPython UDF を作成する場合、Anacondaパッケージは既にワークシートで使用できます。 ステージからワークシートにPythonファイルを追加する をご参照ください。
ローカル開発環境でPython UDF を作成する場合、インストールするAnacondaパッケージを指定できます。
Python UDFsを呼び出すクエリがSnowflakeウェアハウス内で実行されると、ユーザーに代わってAnacondaパッケージをシームレスにインストールし、仮想ウェアハウスにキャッシュします。
ベストプラクティス、利用可能なパッケージの表示方法、ローカル開発環境のセットアップ方法の詳細については、 サードパーティパッケージの使用 をご参照ください。
ローカル開発環境でPython UDF を作成する場合、 session.add_packages
を使用してセッションレベルでパッケージを追加します。
このコード例は、パッケージをインポートしてそのバージョンを返す方法を示しています。
>>> import numpy as np
>>> import pandas as pd
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import udf
>>> session.add_packages("numpy", "pandas", "xgboost==1.5.0")
>>> @udf
... def compute() -> list:
... return [np.__version__, pd.__version__, xgb.__version__]
session.add_requirements
を使用して、 要件ファイル でパッケージを指定することもできます。
>>> session.add_requirements("mydir/requirements.txt")
UDFレベルのパッケージを追加して、以前に追加した可能性のあるセッションレベルのパッケージを上書きできます。
>>> import numpy as np
>>> import pandas as pd
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import udf
>>> @udf(packages=["numpy", "pandas", "xgboost==1.5.0"])
... def compute() -> list:
... return [np.__version__, pd.__version__, xgb.__version__]
重要
パッケージバージョンを指定しない場合、Snowflakeは依存関係を解決するときに最新バージョンを使用します。ただし、UDF を本番環境にデプロイする場合は、コードが常に同じ依存関係バージョンを使用するようにする必要があります。これは、永続および仮UDFsの両方に対して実行できます。
パーマネントUDFを作成すると、UDFは1回だけ作成および登録されます。これにより、依存関係が一度解決され、選択したバージョンが本番ワークロードに使用されます。UDF が実行されると、常に同じ依存バージョンが使用されます。
仮UDFを作成するときは、バージョン仕様の一部として依存関係のバージョンを指定します。そうすると、UDF が登録されると、パッケージ解決は指定されたバージョンを使用します。バージョンを指定しないと、新しいバージョンが利用可能になったときに依存関係が更新される可能性があります。
匿名 UDF の作成¶
匿名 UDF を作成するには、次のいずれかを実行できます。
snowflake.snowpark.functions
モジュールでudf
関数を呼び出し、匿名関数の定義を渡します。UDFRegistration
クラスでregister
メソッドを呼び出し、匿名関数の定義を渡します。
匿名のUDFの例を次に示します。
>>> from snowflake.snowpark.types import IntegerType
>>> from snowflake.snowpark.functions import udf
>>> add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()])
注釈
複数のセッションで実行される可能性のあるコードを作成する場合は、 udf
関数を使用するのではなく、 register
メソッドを使用してUDFsを登録してください。これにより、デフォルトのSnowflake Session
オブジェクトが見つからないエラーを防ぐことができます。
名前付き UDF の作成と登録¶
UDFを名前で呼び出す場合(例: functions
モジュールの call_udf
関数を使用する場合)、名前付きUDFを作成して登録できます。これを実行するには、次のいずれかを使用します。
name
が引数のUDFRegistration
クラスのregister
メソッド。name
が引数のsnowflake.snowpark.functions
モジュールのudf
関数。
UDFRegistration
クラスの属性またはメソッドにアクセスするには、 Session
クラスの udf
プロパティを呼び出します。
register
の呼び出し、または udf
は、現在のセッションで使用できる仮UDFを作成します。
永続的なUDFを作成するには、 register
メソッドまたは udf
関数を呼び出し、 is_permanent
引数を True
に設定します。永続的なUDFを作成するときは、 stage_location
引数を、UDFのPythonファイルとその依存関係がアップロードされるステージの場所に設定する必要もあります。
名前付き仮UDFを登録する方法の例を次に示します。
>>> from snowflake.snowpark.types import IntegerType
>>> from snowflake.snowpark.functions import udf
>>> add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()], name="my_udf", replace=True)
is_permanent
引数を True
に設定して、名前付き永続UDFを登録する方法の例を次に示します。
>>> @udf(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True)
... def minus_one(x: int) -> int:
... return x-1
これらのUDFsが呼び出される例を次に示します。
>>> df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b")
>>> df.select(add_one("a"), minus_one("b")).collect()
[Row(MY_UDF("A")=2, MINUS_ONE("B")=1), Row(MY_UDF("A")=4, MINUS_ONE("B")=3)]
SQL を使用して UDF を呼び出すこともできます。
>>> session.sql("select minus_one(1)").collect()
[Row(MINUS_ONE(1)=0)]
Pythonソースファイルからの UDF の作成¶
ローカル開発環境で UDF を作成する場合、Pythonファイルで UDF ハンドラーを定義し、 UDFRegistration
クラスの register_from_file
メソッドを使用して UDF を作成することもできます。
注釈
このメソッドはPythonワークシートでは使用できません。
次は register_from_file
の使用例です。
以下を含むPythonファイル test_udf_file.py
があるとします。
def mod5(x: int) -> int:
return x % 5
次に、ファイル test_udf_file.py
のこの関数から UDF を作成できます。
>>> # mod5() in that file has type hints
>>> mod5_udf = session.udf.register_from_file(
... file_path="tests/resources/test_udf_dir/test_udf_file.py",
... func_name="mod5",
... )
>>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
ファイルをステージの場所にアップロードし、それを使用して UDF を作成することもできます。
>>> from snowflake.snowpark.types import IntegerType
>>> # suppose you have uploaded test_udf_file.py to stage location @mystage.
>>> mod5_udf = session.udf.register_from_file(
... file_path="@mystage/test_udf_file.py",
... func_name="mod5",
... return_type=IntegerType(),
... input_types=[IntegerType()],
... )
>>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
UDF を使用したファイルの読み取り¶
ファイルのコンテンツを読み取るには、Pythonコードで、
ファイルをインポートして UDF のホームディレクトリから読み取ることで、 静的に指定されたファイルを読み取ります。
SnowflakeFile を使用して、動的に指定されたファイルを読み取ります。計算中にファイルにアクセスする必要があるときは、このような操作を実行できる場合があります。
静的に指定されたファイルの読み取り¶
Snowparkライブラリは、サーバーに UDFs をアップロードして実行します。UDF がファイルからデータを読み取る必要がある場合は、ファイルが UDF とともにアップロードされていることを確認する必要があります。
注釈
UDF をPythonワークシートに記述した場合、UDF はステージからのみファイルを読み取ることができます。
ファイルを読み取るように UDF を設定するには、
ファイルが依存関係であることを指定します。これにより、ファイルがサーバーにアップロードされます。詳細については、 UDF の依存関係の指定 をご参照ください。
例:
>>> # Import a file from your local machine as a dependency. >>> session.add_import("/<path>/my_file.txt") >>> # Or import a file that you uploaded to a stage as a dependency. >>> session.add_import("@my_stage/<path>/my_file.txt")
UDFで、ファイルを読み取ります。次の例では、ファイルは UDF の作成中に一度だけ読み取られ、 UDF の実行中に再度読み取られることはありません。これは、サードパーティライブラリ cachetools を使用して実現されます。
>>> import sys >>> import os >>> import cachetools >>> from snowflake.snowpark.types import StringType >>> @cachetools.cached(cache={}) ... def read_file(filename): ... import_dir = sys._xoptions.get("snowflake_import_directory") ... if import_dir: ... with open(os.path.join(import_dir, filename), "r") as f: ... return f.read() >>> >>> # create a temporary text file for test >>> temp_file_name = "/tmp/temp.txt" >>> with open(temp_file_name, "w") as t: ... _ = t.write("snowpark") >>> session.add_import(temp_file_name) >>> session.add_packages("cachetools") >>> >>> def add_suffix(s): ... return f"{read_file(os.path.basename(temp_file_name))}-{s}" >>> >>> concat_file_content_with_str_udf = session.udf.register( ... add_suffix, ... return_type=StringType(), ... input_types=[StringType()] ... ) >>> >>> df = session.create_dataframe(["snowflake", "python"], schema=["a"]) >>> df.select(concat_file_content_with_str_udf("a")).to_df("col1").collect() [Row(COL1='snowpark-snowflake'), Row(COL1='snowpark-python')] >>> os.remove(temp_file_name) >>> session.clear_imports()
SnowflakeFile
で動的に指定されたファイルの読み取り¶
Snowpark snowflake.snowpark.files
モジュールの SnowflakeFile
クラスを使用すると、ステージからファイルを読み取ることができます。 SnowflakeFile
クラスは、動的なファイルアクセスを提供し、あらゆるサイズのファイルをストリーミングすることができます。動的なファイルアクセスは、複数のファイルを反復処理する場合にも有効です。例については、 複数ファイルの処理 をご参照ください。
SnowflakeFile
を使用したファイルの読み取りの詳細および例については、 Python UDF ハンドラーで SnowflakeFile クラスを使用したファイルの読み取り をご参照ください。
次の例では、 SnowflakeFile
を使用してステージからテキストファイルを読み取り、ファイルの長さを返す仮 UDF を登録します。
UDF を登録します。
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import udf
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark.types import StringType, IntegerType
@udf(name="get_file_length", replace=True, input_types=[StringType()], return_type=IntegerType(), packages=['snowflake-snowpark-python'])
def get_file_length(file_path):
with SnowflakeFile.open(file_path) as f:
s = f.read()
return len(s);
UDF を呼び出します。
session.sql("select get_file_length(build_scoped_file_url(@my_stage, 'example-file.txt'));")
ベクトル化された UDFs の使用¶
ベクトル化されたPython UDFs を使用すると、入力行のバッチを Pandas DataFrames として受け取り、結果のバッチを Pandas配列 または Series として返すPython関数を定義します。Snowpark dataframe
の列は、UDF内のPandasシリーズとしてベクトル化されます。
バッチインターフェイスの使用方法の例を次に示します。
from sklearn.linear_model import LinearRegression
model = LinearRegression()
model.fit(X, y)
@udf(packages=['pandas', 'scikit-learn','xgboost'])
def predict(df: PandasDataFrame[float, float, float, float]) -> PandasSeries[float]:
# The input pandas DataFrame doesn't include column names. Specify the column names explicitly when needed.
df.columns = ["col1", "col2", "col3", "col4"]
return model.predict(df)
他のPython UDFs を呼び出すのと同じ方法で、ベクトル化されたPython UDFs を呼び出します。詳細については、 ベクトル化されたPython UDFs をご参照ください。ここでは、 SQL ステートメントを使用してベクトル化された UDF を作成する方法が説明されています。たとえば、 SQL ステートメントでPythonコードを指定すると、 vectorized
デコレーターを使用できます。このドキュメントで説明されているSnowpark Python API を使用すると、 SQL ステートメントを使用してベクトル化された UDF を作成する必要がなくなります。したがって、 vectorized
デコレーターは使用しません。
バッチあたりの行数を制限することができます。詳細については、 ターゲットバッチサイズの設定 をご参照ください。
Snowpark Python API を使用してベクトル化された UDFs を作成する詳細な説明と例については、 Snowpark API リファレンスの UDFs セクション をご参照ください。