PythonでDataFramesのストアドプロシージャを作成

Snowpark APIは、Pythonでストアドプロシージャを作成するために使用できるメソッドを提供します。このトピックでは、ストアドプロシージャを作成する方法について説明します。

このトピックの内容:

概要

Snowparkを使用すると、カスタムラムダと関数のストアドプロシージャを作成でき、これらのストアドプロシージャを呼び出してDataFrameのデータを処理できます。

現在のセッション内にのみ存在するストアドプロシージャ(一時的なストアドプロシージャ)と、他のセッションで使用できるストアドプロシージャ(永続的なストアドプロシージャ)を作成できます。

ストアドプロシージャでのAnacondaのサードパーティパッケージの使用

Pythonストアドプロシージャを作成するときにインストールするAnacondaパッケージを指定できます。Snowflakeウェアハウス内でPythonストアドプロシージャを呼び出すと、ユーザーに代わってAnacondaパッケージをシームレスにインストールし、仮想ウェアハウスにキャッシュします。ベストプラクティス、利用可能なパッケージの表示方法、ローカル開発環境のセットアップ方法の詳細については、 サードパーティパッケージの使用 をご参照ください。

session.add_packages を使用して、セッションレベルでパッケージを追加します。

このコード例は、パッケージをインポートしてそのバージョンを返す方法を示しています。

>>> import pandas as pd
>>> import snowflake.snowpark
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import sproc

>>> session.add_packages("snowflake-snowpark-python", "pandas", "xgboost==1.5.0")

>>> @sproc
... def compute(session: snowflake.snowpark.Session) -> list:
...   return [pd.__version__, xgb.__version__]
Copy

session.add_requirements を使用して、 要件ファイル でパッケージを指定することもできます。

>>> session.add_requirements("mydir/requirements.txt")
Copy

ストアドプロシージャレベルのパッケージを追加して、以前に追加した可能性のあるセッションレベルのパッケージを上書きできます。

>>> import pandas as pd
>>> import snowflake.snowpark
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import sproc

>>> @sproc(packages=["snowflake-snowpark-python", "pandas", "xgboost==1.5.0"])
... def compute(session: snowflake.snowpark.Session) -> list:
...    return [pd.__version__, xgb.__version__]
Copy

重要

パッケージバージョンを指定しない場合、Snowflakeは依存関係を解決するときに最新バージョンを使用します。ただし、ストアドプロシージャを本番環境にデプロイする場合は、コードが常に同じ依存関係バージョンを使用するようにする必要があります。これは、永続的なストアドプロシージャと一時的なストアドプロシージャの両方で実行できます。

  • 永続的なストアドプロシージャを作成すると、ストアドプロシージャが作成され、登録されるのは1回のみです。これにより、依存関係が一度解決され、選択したバージョンが本番ワークロードに使用されます。ストアドプロシージャが実行されると、常に同じ依存関係バージョンが使用されます。

  • 一時ストアドプロシージャを作成するときは、バージョン仕様の一部として依存関係のバージョンを指定します。このように、ストアドプロシージャが登録されると、パッケージ解決では指定されたバージョンが使用されます。バージョンを指定しないと、新しいバージョンが利用可能になったときに依存関係が更新される可能性があります。

匿名ストアドプロシージャの作成

匿名ストアドプロシージャを作成するには、次のいずれかを実行できます。

  • snowflake.snowpark.functions モジュールで sproc 関数を呼び出し、匿名関数の定義を渡します。

  • StoredProcedureRegistration クラスで register メソッドを呼び出し、匿名関数の定義を渡します。 StoredProcedureRegistration クラスの属性またはメソッドにアクセスするには、 Session クラスの sproc プロパティを呼び出します。

匿名ストアドプロシージャの例を次に示します。

>>> from snowflake.snowpark.functions import sproc
>>> from snowflake.snowpark.types import IntegerType

>>> add_one = sproc(lambda session, x: session.sql(f"select {x} + 1").collect()[0][0], return_type=IntegerType(), input_types=[IntegerType()], packages=["snowflake-snowpark-python"])
Copy

注釈

複数のセッションで実行される可能性のあるコードを作成する場合は、 sproc 関数を使用するのではなく、 register メソッドを使用してストアドプロシージャを登録してください。これにより、デフォルトのSnowflake Session オブジェクトが見つからないエラーを防ぐことができます。

名前付きストアドプロシージャの作成と登録

ストアドプロシージャを名前で呼び出す場合(たとえば、 Session オブジェクトの call 関数を使用)、名前付きストアドプロシージャを作成して登録できます。これを行うには、次のいずれかを実行できます。

  • snowflake.snowpark.functions モジュールで sproc 関数を呼び出し、 name 引数と匿名関数の定義を渡します。

  • StoredProcedureRegistration クラスで register メソッドを呼び出し、 name 引数と匿名関数の定義を渡します。 StoredProcedureRegistration クラスの属性またはメソッドにアクセスするには、 Session クラスの sproc プロパティを呼び出します。

register を呼び出すか、または sproc が現在のセッションで使用できる一時ストアドプロシージャを作成します。

永続的なストアドプロシージャを作成するには、 register メソッドまたは sproc 関数を呼び出し、 is_permanent 引数を True に設定します。永続的なストアドプロシージャを作成するときは、 stage_location 引数を、Snowparkが使用するPythonコネクタがストアドプロシージャとその依存関係のPythonファイルをアップロードするステージの場所にも設定する必要があります。

名前付きの一時ストアドプロシージャを登録する方法の例を次に示します。

>>> from snowflake.snowpark.functions import sproc
>>> from snowflake.snowpark.types import IntegerType

>>> add_one = sproc(lambda session, x: session.sql(f"select {x} + 1").collect()[0][0],
return_type=IntegerType(), input_types=[IntegerType()], name="my_sproc", replace=True,
packages=["snowflake-snowpark-python"])
Copy

is_permanent 引数を True に設定して、名前付きの永続ストアドプロシージャを登録する方法の例を次に示します。

>>> import snowflake.snowpark
>>> from snowflake.snowpark.functions import sproc

>>> @sproc(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True, packages=["snowflake-snowpark-python"])
... def minus_one(session: snowflake.snowpark.Session, x: int) -> int:
...  return session.sql(f"select {x} - 1").collect()[0][0]
Copy

これらのストアドプロシージャが呼び出される例を次に示します。

>>> add_one(1)
2
>>> session.call("minus_one", 1)
0
>>> session.sql("call minus_one(1)").collect()
[Row(MINUS_ONE(1)=0)]
Copy

ストアドプロシージャを使用したファイルの読み取り

ストアドプロシージャでファイルのコンテンツを読み取るには、

静的に指定されたファイルの読み取り

  1. ファイルが依存関係であることを指定します。これにより、ファイルがサーバーにアップロードされます。これは、UDFsの場合と同じ方法で行われます。詳細については、 UDF の依存関係の指定 をご参照ください。

    例:

    >>> # Import a file from your local machine as a dependency.
    >>> session.add_import("/<path>/my_file.txt")
    
    >>> # Or import a file that you uploaded to a stage as a dependency.
    >>> session.add_import("@my_stage/<path>/my_file.txt")
    
    Copy
  2. ストアドプロシージャで、ファイルを読み取ります。

    >>> def read_file(name: str) -> str:
    ...    import sys
    ...    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    ...    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    ...
    ...    with open(import_dir + 'my_file.txt', 'r') as file:
    ...        return file.read()
    
    Copy

SnowflakeFile で動的に指定されたファイルの読み取り

Snowpark snowflake.snowpark.files モジュールの SnowflakeFile クラスを使用すると、ステージからファイルを読み取ることができます。 SnowflakeFile クラスは、動的なファイルアクセスを提供し、あらゆるサイズのファイルをストリーミングすることができます。動的なファイルアクセスは、複数のファイルを反復処理する場合にも有効です。例については、 複数ファイルの処理 をご参照ください。

SnowflakeFile を使用したファイルの読み取りの詳細および例については、 Python UDF ハンドラーで SnowflakeFile クラスを使用したファイルの読み取り をご参照ください。

次の例では、 SnowflakeFile を使用してステージからファイルを読み取り、ファイルの長さを返す永続ストアドプロシージャを作成します。

ストアドプロシージャを作成します。

import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import sproc
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark.types import StringType, IntegerType

@sproc(name="calc_size", is_permanent=True, stage_location="@my_procedures", replace=True, packages=["snowflake-snowpark-python"])
def calc_size(ignored_session: snowpark.Session, file_path: str) -> int:
  with SnowflakeFile.open(file_path) as f:
    s = f.read()
  return len(s);
Copy

ストアドプロシージャを呼び出します。

file_size = session.sql("call calc_size(build_scoped_file_url('@my_stage', 'my_file.csv'))")
Copy