Python UDF ハンドラーの例¶
このトピックには、Pythonで記述された UDF ハンドラーコードの簡単な例が含まれています。
Pythonを使用して UDF ハンドラーを作成する方法については、 Python UDFsの作成 をご参照ください。
runtime_version
をコードに必要なPythonランタイムのバージョンに設定します。サポートされているPythonのバージョンは次のとおりです。
3.8
3.9
3.10
3.11
インラインハンドラーでのパッケージのインポート¶
Anacondaのサードパーティパッケージの厳選されたリストが利用可能です。詳細については、 サードパーティパッケージの使用 をご参照ください。
注釈
Anacondaが提供するパッケージを使用する前に、Snowflake組織管理者はSnowflakeサードパーティの規約に同意する必要があります。詳細については、 Anacondaのサードパーティパッケージの使用 をご参照ください。
次のコードは、パッケージをインポートしてそのバージョンを返す方法を示しています。
UDF を作成します。
CREATE OR REPLACE FUNCTION py_udf()
RETURNS VARIANT
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
PACKAGES = ('numpy','pandas','xgboost==1.5.0')
HANDLER = 'udf'
AS $$
import numpy as np
import pandas as pd
import xgboost as xgb
def udf():
return [np.__version__, pd.__version__, xgb.__version__]
$$;
UDF を呼び出します。
SELECT py_udf();
+-------------+
| PY_UDF() |
|-------------|
| [ |
| "1.19.2", |
| "1.4.0", |
| "1.5.0" |
| ] |
+-------------+
PACKAGES キーワードは、以下のようにパッケージのバージョンを指定するために使用できます。
バージョンなし(例:
numpy
)正確なバージョンをピン留め(例:
numpy==1.25.2
)ワイルドカードを使用したバージョンプレフィックスへの制約(例:
numpy==1.*
)バージョン範囲への制約(例:
numpy>=1.25
)複数のバージョン指定子(例:
numpy>=1.25,<2
)によって制約され、すべてのバージョン指定子を満たすパッケージが選択されるようにします。
注釈
複数の範囲演算子(例: numpy>=1.25,<2
)の使用はパッケージポリシーではサポートされていませんが、Python UDF、 UDTF、およびストアドプロシージャを作成する際には使用できます。
以下は、ワイルドカード *
を使用してパッケージをバージョンプレフィックスに制約する例です。
CREATE OR REPLACE FUNCTION my_udf()
RETURNS STRING
LANGUAGE PYTHON
PACKAGES=('numpy==1.*')
RUNTIME_VERSION=3.10
HANDLER='echo'
AS $$
def echo():
return 'hi'
$$;
この例では、パッケージが指定されたバージョン以上であることを制約する方法を示します。
CREATE OR REPLACE FUNCTION my_udf()
RETURNS STRING
LANGUAGE PYTHON
PACKAGES=('numpy>=1.2')
RUNTIME_VERSION=3.10
HANDLER='echo'
AS $$
def echo():
return 'hi'
$$;
次の例は、複数のパッケージバージョン指定子の使用方法を示しています。
CREATE OR REPLACE FUNCTION my_udf()
RETURNS STRING
LANGUAGE PYTHON
PACKAGES=('numpy>=1.2,<2')
RUNTIME_VERSION=3.10
HANDLER='echo'
AS $$
def echo():
return 'hi'
$$;
ファイルの読み取り¶
Python UDF ハンドラーコードを使用してファイルのコンテンツを読み取ることができます。たとえば、ファイルを読み取って、非構造化データを処理する場合があります。
ファイルのコンテンツを読み取るには、
IMPORTS 句でファイルのパスと名前を静的に指定 し、ファイルを UDF のホームディレクトリから読み取ります。これは、ファイル名が静的で、関数内で一貫性があり、あらかじめファイル名がわかっている場合に有効です。
SnowflakeFile でファイルを動的に指定し、そのコンテンツを読み取ります。計算中にファイルにアクセスする必要があるときは、このような操作を実行できる場合があります。
IMPORTS を使用した静的に指定されたファイルの読み取り¶
CREATE FUNCTION コマンドの IMPORTS 句にファイル名とステージ名を指定することで、ファイルを読み込めます。
IMPORTS 句でファイルを指定すると、Snowflakeはそのファイルをステージから UDF の ホームディレクトリ (別称 インポートディレクトリ)にコピーします。UDF は、ホームディレクトリからファイルを読み取ります。
Snowflakeは、インポートしたファイルを単一のディレクトリにコピーします。そのディレクトリにあるすべてのファイルは一意な名前でなければならないため、 IMPORTS 句の各ファイルも個別の名前にする必要があります。これは、ステージングされたファイルが異なるステージや、ステージ内の異なるサブディレクトリで開始された場合でも同様です。
注釈
ファイルはステージのトップレベルディレクトリからのみインポートでき、サブフォルダからはインポートできません。
次の例では、 my_stage
という名前のステージから file.txt
と呼ばれるファイルを読み取るインラインPythonハンドラーを使用しています。ハンドラーは、Python sys._xoptions
メソッドと snowflake_import_directory
システムオプションを使用して、 UDF のホームディレクトリの位置を取得することができます。
Snowflakeは UDF の作成中に1回だけファイルを読み取り、ファイルの読み取りがターゲットハンドラーの外部で発生した場合は、 UDF の実行中にファイルを再度読み取ることはありません。
インラインハンドラーを使用して UDF を作成します。
CREATE OR REPLACE FUNCTION my_udf()
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION=3.8
IMPORTS=('@my_stage/file.txt')
HANDLER='compute'
AS
$$
import sys
import os
with open(os.path.join(sys._xoptions["snowflake_import_directory"], 'file.txt'), "r") as f:
s = f.read()
def compute():
return s
$$;
SnowflakeFile
を使用した動的に指定されたファイルの読み取り¶
Snowpark snowflake.snowpark.files
モジュールの SnowflakeFile
クラスを使用すると、ステージからファイルを読み取ることができます。 SnowflakeFile
クラスは、動的なファイルアクセスを提供し、あらゆるサイズのファイルをストリーミングすることができます。動的なファイルアクセスは、複数のファイルを反復処理する場合にも有効です。例については、 複数ファイルの処理 をご参照ください。
SnowflakeFile
クラスには、ファイルを開くためのメソッド、 open
があります。 open
メソッドは、Pythonの IOBase
ファイルオブジェクトを拡張する SnowflakeFile
オブジェクトを返します。
SnowflakeFile
オブジェクトは、以下の IOBase
、 BufferedIOBase
、 RawIOBase
のメソッドをサポートしています。
IOBase.fileno
IOBase.isatty
IOBase.readable
IOBase.readinto
IOBase.readline
IOBase.readlines
IOBase.seek
IOBase.seekable
IOBase.tell
BufferedIOBase.readinto1
RawIOBase.read
RawIOBase.readall
詳細については、 IOBase でのPython 3.8ドキュメント をご参照ください。メソッド fileno
など、Snowflakeサーバーでサポートされていないメソッドを呼び出すと、エラーが返されます。
注釈
ファイルインジェクション攻撃に対して耐久性のあるコードにするため、 SnowflakeFile
を使用したファイルアクセスには、デフォルトでスコープ付き URLs が必要です。組み込み関数 BUILD_SCOPED_FILE_URL を使用して、スコープ URL を SQL に作成できます。スコープ付き URLs の詳細については、 ファイルにアクセスできる URLs の型 をご参照ください。ファイルにアクセスできるユーザーのみが、スコープ付き URL を作成できます。
前提条件¶
ファイルをコードで使用できるようにするには、Pythonハンドラーコードがステージ上のファイルを読み取る前に、次を実行する必要があります。
ハンドラーが使用可能なステージを作成します。
外部ステージまたは内部ステージを使用できます。内部ステージを使用する場合、呼び出し元権限のストアドプロシージャを作成する予定であれば、これをユーザーステージにすることができます。それ以外の場合は、名前付きステージを使用する必要があります。Snowflakeは現在、UDF 依存関係のためにテーブルステージを使用してハンドラーコードを格納することはサポートしていません。
ステージの作成の詳細については、 CREATE STAGE をご参照ください。内部ステージ型の選択の詳細については、 ローカルファイルに対する内部ステージの選択 をご参照ください。
ユースケースに応じて、ステージに対する適切な権限を以下のロールに割り当てる必要があります。
ユースケース
ロール
UDF または所有者権限ストアドプロシージャ
実行中の UDF またはストアドプロシージャを所有するロール。
呼び出し元権限ストアドプロシージャ
ユーザーロール。
詳細については、 Granting Privileges for User-Defined Functions をご参照ください。
コードが読み取るファイルをステージにコピーします。
PUT コマンドを使用して、ローカルドライブから内部ステージにファイルをコピーできます。PUT を使用したファイルのステージングについては、 ローカルファイルシステムからのデータファイルのステージング をご参照ください。
クラウドストレージサービスが提供するツールを使用して、ローカルドライブから外部ステージの場所にファイルをコピーできます。ヘルプについては、ご使用のクラウドストレージサービスのドキュメントをご参照ください。
インラインPythonハンドラーを使用した画像の知覚ハッシュの算出¶
この例では、 SnowflakeFile
を使用して、一対のステージングされた画像ファイルを読み取り、 知覚ハッシュ (pHash)を使用して、各ファイルの画像の類似性を判定します。
mode
の引数に rb
を渡して入力モードをバイナリに指定し、画像のphash値を返す UDF を作成します。
CREATE OR REPLACE FUNCTION calc_phash(file_path string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','imagehash','pillow')
HANDLER = 'run'
AS
$$
from PIL import Image
import imagehash
from snowflake.snowpark.files import SnowflakeFile
def run(file_path):
with SnowflakeFile.open(file_path, 'rb') as f:
return imagehash.average_hash(Image.open(f))
$$;
2つの画像のphash値間における距離を計算する2番目の UDF を作成します。
CREATE OR REPLACE FUNCTION calc_phash_distance(h1 string, h2 string)
RETURNS INT
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('imagehash')
HANDLER = 'run'
as
$$
import imagehash
def run(h1, h2):
return imagehash.hex_to_hash(h1) - imagehash.hex_to_hash(h2)
$$;
画像ファイルをステージし、ディレクトリテーブルを更新します。
PUT file:///tmp/image1.jpg @images AUTO_COMPRESS=FALSE;
PUT file:///tmp/image2.jpg @images AUTO_COMPRESS=FALSE;
ALTER STAGE images REFRESH;
UDFs を呼び出します。
SELECT
calc_phash_distance(
calc_phash(build_scoped_file_url(@images, 'image1.jpg')),
calc_phash(build_scoped_file_url(@images, 'image2.jpg'))
) ;
UDTF を使用した CSV ファイルの処理¶
この例では、 SnowflakeFile
を使って、 CSV ファイルの内容を抽出し、その行をテーブルにして返す UDTF を作成します。
インラインハンドラーを使用して UDTF を作成します。
CREATE FUNCTION parse_csv(file_path string)
RETURNS TABLE (col1 string, col2 string, col3 string)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'csvparser'
AS
$$
from snowflake.snowpark.files import SnowflakeFile
class csvparser:
def process(self, stagefile):
with SnowflakeFile.open(stagefile) as f:
for line in f.readlines():
lineStr = line.strip()
row = lineStr.split(",")
try:
# Read the columns from the line.
yield (row[1], row[0], row[2], )
except:
pass
$$;
CSV ファイルをステージし、ディレクトリテーブルを更新します。
PUT file:///tmp/sample.csv @data_stage AUTO_COMPRESS=FALSE;
ALTER STAGE data_stage REFRESH;
UDTF を呼び出し、ファイル URL を渡します。
SELECT * FROM TABLE(PARSE_CSV(build_scoped_file_url(@data_stage, 'sample.csv')));
複数ファイルの処理¶
ディレクトリテーブルの RELATIVE_PATH 列をハンドラーに渡すと、複数のファイルを読み取って処理することができます。RELATIVE_PATH 列の詳細については、 ディレクトリテーブルのクエリからの出力 をご参照ください。
注釈
ファイルサイズや計算の必要性に応じて、複数のファイルを読み取って処理するステートメントを実行する前に、 ALTER WAREHOUSE を使用してウェアハウスをスケールアップするとよいでしょう。
- UDF を呼び出して、複数のファイルを処理します。
次の例では、 CREATE TABLE ステートメント内で UDF を呼び出し、ステージ上の各ファイルを処理し、その結果を新しいテーブルに格納します。
デモのために、例では以下のように想定しています。
my_stage
という名前のステージには、複数のテキストファイルがあります。非構造化テキストに対して感情分析を行う、
get_sentiment
という名前の既存の UDF があります。UDF は、テキストファイルへのパスを入力とし、感情を表す値を返します。
CREATE OR REPLACE TABLE sentiment_results AS SELECT relative_path , get_sentiment(build_scoped_file_url(@my_stage, relative_path)) AS sentiment FROM directory(@my_stage);
- UDTF を呼び出して、複数のファイルを処理します。
次の例では、 UDTF という名前の
parse_excel_udtf
を呼び出します。この例では、ディレクトリテーブルからrelative_path
をmy_excel_stage
というステージに渡します。SELECT t.* FROM directory(@my_stage) d, table(parse_excel_udtf(build_scoped_file_url(@my_excel_stage, relative_path)) t;
ステージ URLs および URIs ハンドラーを使用したファイルの読み取り¶
SnowflakeFile
を使用したファイルアクセスには、デフォルトでスコープ付き URLs が必要です。これにより、ファイルインジェクション攻撃に対して耐久性のあるコードになります。ただし、代わりにステージ URI、またはステージ URL を使用してファイルの場所を参照することはできます。そのためには、キーワード引数 require_scoped_url = False
を使用して、 SnowflakeFile.open
メソッドを呼び出す必要があります。
このオプションは、 UDF 所有者のみがアクセスできる URI を呼び出し元が提供できるようにする場合に便利です。たとえば、 UDF を所有していて、構成ファイルや機械学習モデルを読み取る場合、ファイルアクセスにステージ URI を使用することがあります。ユーザーの入力に基づいて作成されるファイルなど、予測不可能な名前を持つファイルを扱う場合は、このオプションを推奨しません。
この例では、機械学習モデルをファイルから読み取り、そのモデルを関数で使用して、感情分析のための自然言語処理を実行します。この例では、 open
を require_scoped_url = False
で呼び出します。ファイルの場所の形式両方(ステージ URI とステージ URL)で、 UDF 所有者はモデルファイルにアクセスできる必要があります。
インラインハンドラーを使用して UDF を作成します。
CREATE OR REPLACE FUNCTION extract_sentiment(input_data string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','scikit-learn')
HANDLER = 'run'
AS
$$
from snowflake.snowpark.files import SnowflakeFile
from sklearn.linear_model import SGDClassifier
import pickle
def run(input_data):
model_file = '@models/NLP_model.pickle'
# Specify 'mode = rb' to open the file in binary mode.
with SnowflakeFile.open(model_file, 'rb', require_scoped_url = False) as f:
model = pickle.load(f)
return model.predict([input_data])[0]
$$;
モデルファイルをステージし、ディレクトリテーブルを更新します。
PUT file:///tmp/NLP_model.pickle @models AUTO_COMPRESS=FALSE;
ALTER STAGE models REFRESH;
また、 UDF にモデルのステージ URL を指定して、感情をを抽出することもできます。
たとえば、ステージ URL を使用して、ファイルを指定するインラインハンドラー付き UDF を作成します。
CREATE OR REPLACE FUNCTION extract_sentiment(input_data string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','scikit-learn')
HANDLER = 'run'
AS
$$
from snowflake.snowpark.files import SnowflakeFile
from sklearn.linear_model import SGDClassifier
import pickle
def run(input_data):
model_file = 'https://my_account/api/files/my_db/my_schema/models/NLP_model.pickle'
# Specify 'rb' to open the file in binary mode.
with SnowflakeFile.open(model_file, 'rb', require_scoped_url = False) as f:
model = pickle.load(f)
return model.predict([input_data])[0]
$$;
入力されたデータで UDF を呼び出します。
SELECT extract_sentiment('I am writing to express my interest in a recent posting made.');
ファイルの記述¶
UDF ハンドラーは、 UDF を呼び出すクエリ用に作成された /tmp
ディレクトリにファイルを書き込むことができます。
/tmp
ディレクトリは単一の呼び出しクエリ用に確保されていますが、複数のPythonワーカープロセスが同時に実行されている可能性があることに注意してください。競合を防ぐには、/tmpディレクトリへのアクセスが他のPythonワーカープロセスと同期されていること、または/tmpに書き込まれるファイルの名前が一意であることを確認する必要があります。
コード例については、このトピックの ステージングされたファイルの解凍 をご参照ください。
次の例のコードは、入力された text
を /tmp
ディレクトリに書き込みます。また、ファイルの場所の一意性を確保するために、関数のプロセス ID を追加します。
def func(text):
# Append the function's process ID to ensure the file name's uniqueness.
file_path = '/tmp/content' + str(os.getpid())
with open(file_path, "w") as file:
file.write(text)
ステージングされたファイルの解凍¶
.zipファイルをステージに保存し、Python zipfileモジュールを使用してUDFで解凍できます。
たとえば、.zipファイルをステージにアップロードし、UDFを作成するときに、IMPORTS句のステージングされた場所で.zipファイルを参照できます。実行時に、Snowflakeはステージングされたファイルを、コードがアクセスできるインポートディレクトリにコピーします。
ファイルの読み取りと書き込みの詳細については、 ファイルの読み取り および ファイルの記述 をご参照ください。
次の例では、UDFコードはNLPモデルを使用してテキスト内のエンティティを検出します。コードはこれらのエンティティの配列を返します。テキストを処理するためのNLPモデルを設定するには、コードは最初にzipfileモジュールを使用して、.zipファイルからモデルのファイル(en_core_web_sm-2.3.1)を抽出します。次に、コードはspaCyモジュールを使用して、ファイルからモデルをロードします。
このコードは、抽出されたファイルの内容を、この関数を呼び出すクエリ用に作成された/tmpディレクトリに書き込むことに注意してください。このコードはファイルロックを使用して、Pythonワーカープロセス間で抽出が同期されるようにします。このようにして、コンテンツは1回だけ解凍されます。ファイルの書き込みの詳細については、 ファイルの記述 をご参照ください。
zipfileモジュールの詳細については、 zipfileリファレンス をご参照ください。spaCyモジュールの詳細については、 spaCyAPIのドキュメント をご参照ください。
インラインハンドラーを使用して UDF を作成します。
CREATE OR REPLACE FUNCTION py_spacy(str string)
RETURNS ARRAY
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'func'
PACKAGES = ('spacy')
IMPORTS = ('@spacy_stage/spacy_en_core_web_sm.zip')
AS
$$
import fcntl
import os
import spacy
import sys
import threading
import zipfile
# File lock class for synchronizing write access to /tmp.
class FileLock:
def __enter__(self):
self._lock = threading.Lock()
self._lock.acquire()
self._fd = open('/tmp/lockfile.LOCK', 'w+')
fcntl.lockf(self._fd, fcntl.LOCK_EX)
def __exit__(self, type, value, traceback):
self._fd.close()
self._lock.release()
# Get the location of the import directory. Snowflake sets the import
# directory location so code can retrieve the location via sys._xoptions.
IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
# Get the path to the ZIP file and set the location to extract to.
zip_file_path = import_dir + "spacy_en_core_web_sm.zip"
extracted = '/tmp/en_core_web_sm'
# Extract the contents of the ZIP. This is done under the file lock
# to ensure that only one worker process unzips the contents.
with FileLock():
if not os.path.isdir(extracted + '/en_core_web_sm/en_core_web_sm-2.3.1'):
with zipfile.ZipFile(zip_file_path, 'r') as myzip:
myzip.extractall(extracted)
# Load the model from the extracted file.
nlp = spacy.load(extracted + "/en_core_web_sm/en_core_web_sm-2.3.1")
def func(text):
doc = nlp(text)
result = []
for ent in doc.ents:
result.append((ent.text, ent.start_char, ent.end_char, ent.label_))
return result
$$;
NULL 値の取り扱い¶
次のコードは、NULL値の処理方法を示しています。詳細については、 NULL 値 をご参照ください。
UDF を作成します。
CREATE OR REPLACE FUNCTION py_udf_null(a variant)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'udf'
AS $$
def udf(a):
if not a:
return 'JSON null'
elif getattr(a, "is_sql_null", False):
return 'SQL null'
else:
return 'not null'
$$;
UDF を呼び出します。
SELECT py_udf_null(null);
SELECT py_udf_null(parse_json('null'));
SELECT py_udf_null(10);
+-------------------+
| PY_UDF_NULL(NULL) |
|-------------------|
| SQL null |
+-------------------+
+---------------------------------+
| PY_UDF_NULL(PARSE_JSON('NULL')) |
|---------------------------------|
| JSON null |
+---------------------------------+
+-----------------+
| PY_UDF_NULL(10) |
|-----------------|
| not null |
+-----------------+