PythonでDataFramesのストアドプロシージャを作成¶
Snowpark APIは、Pythonでストアドプロシージャを作成するために使用できるメソッドを提供します。このトピックでは、ストアドプロシージャを作成する方法について説明します。
このトピックの内容:
概要¶
Snowparkを使用すると、カスタムラムダと関数のストアドプロシージャを作成でき、これらのストアドプロシージャを呼び出してDataFrameのデータを処理できます。
現在のセッション内にのみ存在するストアドプロシージャ(一時的なストアドプロシージャ)と、他のセッションで使用できるストアドプロシージャ(永続的なストアドプロシージャ)を作成できます。
ストアドプロシージャでのAnacondaのサードパーティパッケージの使用¶
Pythonストアドプロシージャを作成するときにインストールするAnacondaパッケージを指定できます。Snowflakeウェアハウス内でPythonストアドプロシージャを呼び出すと、ユーザーに代わってAnacondaパッケージをシームレスにインストールし、仮想ウェアハウスにキャッシュします。ベストプラクティス、利用可能なパッケージの表示方法、ローカル開発環境のセットアップ方法の詳細については、 サードパーティパッケージの使用 をご参照ください。
session.add_packages
を使用して、セッションレベルでパッケージを追加します。
このコード例は、パッケージをインポートしてそのバージョンを返す方法を示しています。
>>> import pandas as pd
>>> import snowflake.snowpark
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import sproc
>>> session.add_packages("snowflake-snowpark-python", "pandas", "xgboost==1.5.0")
>>> @sproc
... def compute(session: snowflake.snowpark.Session) -> list:
... return [pd.__version__, xgb.__version__]
session.add_requirements
を使用して、 要件ファイル でパッケージを指定することもできます。
>>> session.add_requirements("mydir/requirements.txt")
ストアドプロシージャレベルのパッケージを追加して、以前に追加した可能性のあるセッションレベルのパッケージを上書きできます。
>>> import pandas as pd
>>> import snowflake.snowpark
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import sproc
>>> @sproc(packages=["snowflake-snowpark-python", "pandas", "xgboost==1.5.0"])
... def compute(session: snowflake.snowpark.Session) -> list:
... return [pd.__version__, xgb.__version__]
重要
パッケージバージョンを指定しない場合、Snowflakeは依存関係を解決するときに最新バージョンを使用します。ただし、ストアドプロシージャを本番環境にデプロイする場合は、コードが常に同じ依存関係バージョンを使用するようにする必要があります。これは、永続的なストアドプロシージャと一時的なストアドプロシージャの両方で実行できます。
永続的なストアドプロシージャを作成すると、ストアドプロシージャが作成され、登録されるのは1回のみです。これにより、依存関係が一度解決され、選択したバージョンが本番ワークロードに使用されます。ストアドプロシージャが実行されると、常に同じ依存関係バージョンが使用されます。
一時ストアドプロシージャを作成するときは、バージョン仕様の一部として依存関係のバージョンを指定します。このように、ストアドプロシージャが登録されると、パッケージ解決では指定されたバージョンが使用されます。バージョンを指定しないと、新しいバージョンが利用可能になったときに依存関係が更新される可能性があります。
匿名ストアドプロシージャの作成¶
匿名ストアドプロシージャを作成するには、次のいずれかを実行できます。
snowflake.snowpark.functions
モジュールでsproc
関数を呼び出し、匿名関数の定義を渡します。StoredProcedureRegistration
クラスでregister
メソッドを呼び出し、匿名関数の定義を渡します。StoredProcedureRegistration
クラスの属性またはメソッドにアクセスするには、Session
クラスのsproc
プロパティを呼び出します。
匿名ストアドプロシージャの例を次に示します。
>>> from snowflake.snowpark.functions import sproc
>>> from snowflake.snowpark.types import IntegerType
>>> add_one = sproc(lambda session, x: session.sql(f"select {x} + 1").collect()[0][0], return_type=IntegerType(), input_types=[IntegerType()], packages=["snowflake-snowpark-python"])
注釈
複数のセッションで実行される可能性のあるコードを作成する場合は、 sproc
関数を使用するのではなく、 register
メソッドを使用してストアドプロシージャを登録してください。これにより、デフォルトのSnowflake Session
オブジェクトが見つからないエラーを防ぐことができます。
名前付きストアドプロシージャの作成と登録¶
ストアドプロシージャを名前で呼び出す場合(たとえば、 Session
オブジェクトの call
関数を使用)、名前付きストアドプロシージャを作成して登録できます。これを行うには、次のいずれかを実行できます。
snowflake.snowpark.functions
モジュールでsproc
関数を呼び出し、name
引数と匿名関数の定義を渡します。StoredProcedureRegistration
クラスでregister
メソッドを呼び出し、name
引数と匿名関数の定義を渡します。StoredProcedureRegistration
クラスの属性またはメソッドにアクセスするには、Session
クラスのsproc
プロパティを呼び出します。
register
を呼び出すか、または sproc
が現在のセッションで使用できる一時ストアドプロシージャを作成します。
永続的なストアドプロシージャを作成するには、 register
メソッドまたは sproc
関数を呼び出し、 is_permanent
引数を True
に設定します。永続的なストアドプロシージャを作成するときは、 stage_location
引数を、Snowparkが使用するPythonコネクタがストアドプロシージャとその依存関係のPythonファイルをアップロードするステージの場所にも設定する必要があります。
名前付きの一時ストアドプロシージャを登録する方法の例を次に示します。
>>> from snowflake.snowpark.functions import sproc
>>> from snowflake.snowpark.types import IntegerType
>>> add_one = sproc(lambda session, x: session.sql(f"select {x} + 1").collect()[0][0],
return_type=IntegerType(), input_types=[IntegerType()], name="my_sproc", replace=True,
packages=["snowflake-snowpark-python"])
is_permanent
引数を True
に設定して、名前付きの永続ストアドプロシージャを登録する方法の例を次に示します。
>>> import snowflake.snowpark
>>> from snowflake.snowpark.functions import sproc
>>> @sproc(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True, packages=["snowflake-snowpark-python"])
... def minus_one(session: snowflake.snowpark.Session, x: int) -> int:
... return session.sql(f"select {x} - 1").collect()[0][0]
これらのストアドプロシージャが呼び出される例を次に示します。
>>> add_one(1)
2
>>> session.call("minus_one", 1)
0
>>> session.sql("call minus_one(1)").collect()
[Row(MINUS_ONE(1)=0)]
ストアドプロシージャを使用したファイルの読み取り¶
ストアドプロシージャでファイルのコンテンツを読み取るには、
ファイルをインポートしてストアドプロシージャのホームディレクトリから読み取ることで、 静的に指定されたファイルを読み取ります。
SnowflakeFile を使用して、動的に指定されたファイルを読み取ります。計算中にファイルにアクセスする必要があるときは、このような操作を実行できる場合があります。
静的に指定されたファイルの読み取り¶
ファイルが依存関係であることを指定します。これにより、ファイルがサーバーにアップロードされます。これは、UDFsの場合と同じ方法で行われます。詳細については、 UDF の依存関係の指定 をご参照ください。
例:
>>> # Import a file from your local machine as a dependency. >>> session.add_import("/<path>/my_file.txt") >>> # Or import a file that you uploaded to a stage as a dependency. >>> session.add_import("@my_stage/<path>/my_file.txt")
ストアドプロシージャで、ファイルを読み取ります。
>>> def read_file(name: str) -> str: ... import sys ... IMPORT_DIRECTORY_NAME = "snowflake_import_directory" ... import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME] ... ... with open(import_dir + 'my_file.txt', 'r') as file: ... return file.read()
SnowflakeFile
で動的に指定されたファイルの読み取り¶
Snowpark snowflake.snowpark.files
モジュールの SnowflakeFile
クラスを使用すると、ステージからファイルを読み取ることができます。 SnowflakeFile
クラスは、動的なファイルアクセスを提供し、あらゆるサイズのファイルをストリーミングすることができます。動的なファイルアクセスは、複数のファイルを反復処理する場合にも有効です。例については、 複数ファイルの処理 をご参照ください。
SnowflakeFile
を使用したファイルの読み取りの詳細および例については、 Python UDF ハンドラーで SnowflakeFile クラスを使用したファイルの読み取り をご参照ください。
次の例では、 SnowflakeFile
を使用してステージからファイルを読み取り、ファイルの長さを返す永続ストアドプロシージャを作成します。
ストアドプロシージャを作成します。
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import sproc
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark.types import StringType, IntegerType
@sproc(name="calc_size", is_permanent=True, stage_location="@my_procedures", replace=True, packages=["snowflake-snowpark-python"])
def calc_size(ignored_session: snowpark.Session, file_path: str) -> int:
with SnowflakeFile.open(file_path) as f:
s = f.read()
return len(s);
ストアドプロシージャを呼び出します。
file_size = session.sql("call calc_size(build_scoped_file_url('@my_stage', 'my_file.csv'))")