Python UDF ハンドラーの例

このトピックには、Pythonで記述された UDF ハンドラーコードの簡単な例が含まれています。

Pythonを使用して UDF ハンドラーを作成する方法については、 Python UDFsの作成 をご参照ください。

runtime_version をコードに必要なPythonランタイムのバージョンに設定します。サポートされているPythonのバージョンは次のとおりです。

  • 3.8

  • 3.9

  • 3.10

  • 3.11

インラインハンドラーでのパッケージのインポート

Anacondaのサードパーティパッケージの厳選されたリストが利用可能です。詳細については、 サードパーティパッケージの使用 をご参照ください。

注釈

Anacondaが提供するパッケージを使用する前に、Snowflake組織管理者はSnowflakeサードパーティの規約に同意する必要があります。詳細については、 Anacondaのサードパーティパッケージの使用 をご参照ください。

次のコードは、パッケージをインポートしてそのバージョンを返す方法を示しています。

UDF を作成します。

CREATE OR REPLACE FUNCTION py_udf()
RETURNS VARIANT
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
PACKAGES = ('numpy','pandas','xgboost==1.5.0')
HANDLER = 'udf'
AS $$
import numpy as np
import pandas as pd
import xgboost as xgb
def udf():
  return [np.__version__, pd.__version__, xgb.__version__]
$$;
Copy

UDF を呼び出します。

SELECT py_udf();
+-------------+
| PY_UDF()    |
|-------------|
| [           |
|   "1.19.2", |
|   "1.4.0",  |
|   "1.5.0"   |
| ]           |
+-------------+
Copy

ファイルの読み取り

Python UDF ハンドラーコードを使用してファイルのコンテンツを読み取ることができます。たとえば、ファイルを読み取って、非構造化データを処理する場合があります。

ファイルのコンテンツを読み取るには、

IMPORTS を使用した静的に指定されたファイルの読み取り

CREATE FUNCTION コマンドの IMPORTS 句にファイル名とステージ名を指定することで、ファイルを読み込めます。

IMPORTS 句でファイルを指定すると、Snowflakeはそのファイルをステージから UDF の ホームディレクトリ (別称 インポートディレクトリ)にコピーします。UDF は、ホームディレクトリからファイルを読み取ります。

Snowflakeは、インポートしたファイルを単一のディレクトリにコピーします。そのディレクトリにあるすべてのファイルは一意な名前でなければならないため、 IMPORTS 句の各ファイルも個別の名前にする必要があります。これは、ステージングされたファイルが異なるステージや、ステージ内の異なるサブディレクトリで開始された場合でも同様です。

注釈

ファイルはステージのトップレベルディレクトリからのみインポートでき、サブフォルダからはインポートできません。

次の例では、 my_stage という名前のステージから file.txt と呼ばれるファイルを読み取るインラインPythonハンドラーを使用しています。ハンドラーは、Python sys._xoptions メソッドと snowflake_import_directory システムオプションを使用して、 UDF のホームディレクトリの位置を取得することができます。

Snowflakeは UDF の作成中に1回だけファイルを読み取り、ファイルの読み取りがターゲットハンドラーの外部で発生した場合は、 UDF の実行中にファイルを再度読み取ることはありません。

インラインハンドラーを使用して UDF を作成します。

CREATE OR REPLACE FUNCTION my_udf()
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION=3.8
IMPORTS=('@my_stage/file.txt')
HANDLER='compute'
AS
$$
import sys
import os

with open(os.path.join(sys._xoptions["snowflake_import_directory"], 'file.txt'), "r") as f:
  s = f.read()

def compute():
  return s
$$;
Copy

SnowflakeFile を使用した動的に指定されたファイルの読み取り

Snowpark snowflake.snowpark.files モジュールの SnowflakeFile クラスを使用すると、ステージからファイルを読み取ることができます。 SnowflakeFile クラスは、動的なファイルアクセスを提供し、あらゆるサイズのファイルをストリーミングすることができます。動的なファイルアクセスは、複数のファイルを反復処理する場合にも有効です。例については、 複数ファイルの処理 をご参照ください。

SnowflakeFile クラスには、ファイルを開くためのメソッド、 open があります。 open メソッドは、Pythonの IOBase ファイルオブジェクトを拡張する SnowflakeFile オブジェクトを返します。

SnowflakeFile オブジェクトは、以下の IOBaseBufferedIOBaseRawIOBase のメソッドをサポートしています。

  • IOBase.fileno

  • IOBase.isatty

  • IOBase.readable

  • IOBase.readinto

  • IOBase.readline

  • IOBase.readlines

  • IOBase.seek

  • IOBase.seekable

  • IOBase.tell

  • BufferedIOBase.readinto1

  • RawIOBase.read

  • RawIOBase.readall

詳細については、 IOBase でのPython 3.8ドキュメント をご参照ください。メソッド fileno など、Snowflakeサーバーでサポートされていないメソッドを呼び出すと、エラーが返されます。

注釈

ファイルインジェクション攻撃に対して耐久性のあるコードにするため、 SnowflakeFile を使用したファイルアクセスには、デフォルトでスコープ付き URLs が必要です。組み込み関数 BUILD_SCOPED_FILE_URL を使用して、スコープ URL を SQL に作成できます。スコープ付き URLs の詳細については、 ファイルにアクセスできる URLs の型 をご参照ください。ファイルにアクセスできるユーザーのみが、スコープ付き URL を作成できます。

前提条件

ファイルをコードで使用できるようにするには、Pythonハンドラーコードがステージ上のファイルを読み取る前に、次を実行する必要があります。

  1. ハンドラーが使用可能なステージを作成します。

    外部ステージまたは内部ステージを使用できます。内部ステージを使用する場合、呼び出し元権限のストアドプロシージャを作成する予定であれば、これをユーザーステージにすることができます。それ以外の場合は、名前付きステージを使用する必要があります。Snowflakeは現在、UDF 依存関係のためにテーブルステージを使用してハンドラーコードを格納することはサポートしていません。

    ステージの作成の詳細については、 CREATE STAGE をご参照ください。内部ステージ型の選択の詳細については、 ローカルファイルに対する内部ステージの選択 をご参照ください。

    ユースケースに応じて、ステージに対する適切な権限を以下のロールに割り当てる必要があります。

    ユースケース

    ロール

    UDF または所有者権限ストアドプロシージャ

    実行中の UDF またはストアドプロシージャを所有するロール。

    呼び出し元権限ストアドプロシージャ

    ユーザーロール。

    詳細については、 Granting Privileges for User-Defined Functions をご参照ください。

  2. コードが読み取るファイルをステージにコピーします。

    PUT コマンドを使用して、ローカルドライブから内部ステージにファイルをコピーできます。PUT を使用したファイルのステージングについては、 ローカルファイルシステムからのデータファイルのステージング をご参照ください。

    クラウドストレージサービスが提供するツールを使用して、ローカルドライブから外部ステージの場所にファイルをコピーできます。ヘルプについては、ご使用のクラウドストレージサービスのドキュメントをご参照ください。

インラインPythonハンドラーを使用した画像の知覚ハッシュの算出

この例では、 SnowflakeFile を使用して、一対のステージングされた画像ファイルを読み取り、 知覚ハッシュ (pHash)を使用して、各ファイルの画像の類似性を判定します。

mode の引数に rb を渡して入力モードをバイナリに指定し、画像のphash値を返す UDF を作成します。

CREATE OR REPLACE FUNCTION calc_phash(file_path string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','imagehash','pillow')
HANDLER = 'run'
AS
$$
from PIL import Image
import imagehash
from snowflake.snowpark.files import SnowflakeFile

def run(file_path):
  with SnowflakeFile.open(file_path, 'rb') as f:
  return imagehash.average_hash(Image.open(f))
$$;
Copy

2つの画像のphash値間における距離を計算する2番目の UDF を作成します。

CREATE OR REPLACE FUNCTION calc_phash_distance(h1 string, h2 string)
RETURNS INT
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('imagehash')
HANDLER = 'run'
as
$$
import imagehash

def run(h1, h2):
  return imagehash.hex_to_hash(h1) - imagehash.hex_to_hash(h2)
$$;
Copy

画像ファイルをステージし、ディレクトリテーブルを更新します。

PUT file:///tmp/image1.jpg @images AUTO_COMPRESS=FALSE;
PUT file:///tmp/image2.jpg @images AUTO_COMPRESS=FALSE;

ALTER STAGE images REFRESH;
Copy

UDFs を呼び出します。

SELECT
  calc_phash_distance(
    calc_phash(build_scoped_file_url(@images, 'image1.jpg')),
    calc_phash(build_scoped_file_url(@images, 'image2.jpg'))
  ) ;
Copy

UDTF を使用した CSV ファイルの処理

この例では、 SnowflakeFile を使って、 CSV ファイルの内容を抽出し、その行をテーブルにして返す UDTF を作成します。

インラインハンドラーを使用して UDTF を作成します。

CREATE FUNCTION parse_csv(file_path string)
RETURNS TABLE (col1 string, col2 string, col3 string)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'csvparser'
AS
$$
from snowflake.snowpark.files import SnowflakeFile

class csvparser:
  def process(self, stagefile):
    with SnowflakeFile.open(stagefile) as f:
      for line in f.readlines():
        lineStr = line.strip()
        row = lineStr.split(",")
        try:
          # Read the columns from the line.
          yield (row[1], row[0], row[2], )
        except:
          pass
$$;
Copy

CSV ファイルをステージし、ディレクトリテーブルを更新します。

PUT file:///tmp/sample.csv @data_stage AUTO_COMPRESS=FALSE;

ALTER STAGE data_stage REFRESH;
Copy

UDTF を呼び出し、ファイル URL を渡します。

SELECT * FROM TABLE(PARSE_CSV(build_scoped_file_url(@data_stage, 'sample.csv')));
Copy

複数ファイルの処理

ディレクトリテーブルの RELATIVE_PATH 列をハンドラーに渡すと、複数のファイルを読み取って処理することができます。RELATIVE_PATH 列の詳細については、 ディレクトリテーブルのクエリからの出力 をご参照ください。

注釈

ファイルサイズや計算の必要性に応じて、複数のファイルを読み取って処理するステートメントを実行する前に、 ALTER WAREHOUSE を使用してウェアハウスをスケールアップするとよいでしょう。

UDF を呼び出して、複数のファイルを処理します。

次の例では、 CREATE TABLE ステートメント内で UDF を呼び出し、ステージ上の各ファイルを処理し、その結果を新しいテーブルに格納します。

デモのために、例では以下のように想定しています。

  • my_stage という名前のステージには、複数のテキストファイルがあります。

  • 非構造化テキストに対して感情分析を行う、 get_sentiment という名前の既存の UDF があります。UDF は、テキストファイルへのパスを入力とし、感情を表す値を返します。

CREATE OR REPLACE TABLE sentiment_results AS
SELECT
  relative_path
  , get_sentiment(build_scoped_file_url(@my_stage, relative_path)) AS sentiment
FROM directory(@my_stage);
Copy
UDTF を呼び出して、複数のファイルを処理します。

次の例では、 UDTF という名前の parse_excel_udtf を呼び出します。この例では、ディレクトリテーブルから relative_pathmy_excel_stage というステージに渡します。

SELECT t.*
FROM directory(@my_stage) d,
table(parse_excel_udtf(build_scoped_file_url(@my_excel_stage, relative_path)) t;
Copy

ステージ URLs および URIs ハンドラーを使用したファイルの読み取り

SnowflakeFile を使用したファイルアクセスには、デフォルトでスコープ付き URLs が必要です。これにより、ファイルインジェクション攻撃に対して耐久性のあるコードになります。ただし、代わりにステージ URI、またはステージ URL を使用してファイルの場所を参照することはできます。そのためには、キーワード引数 require_scoped_url = False を使用して、 SnowflakeFile.open メソッドを呼び出す必要があります。

このオプションは、 UDF 所有者のみがアクセスできる URI を呼び出し元が提供できるようにする場合に便利です。たとえば、 UDF を所有していて、構成ファイルや機械学習モデルを読み取る場合、ファイルアクセスにステージ URI を使用することがあります。ユーザーの入力に基づいて作成されるファイルなど、予測不可能な名前を持つファイルを扱う場合は、このオプションを推奨しません。

この例では、機械学習モデルをファイルから読み取り、そのモデルを関数で使用して、感情分析のための自然言語処理を実行します。この例では、 openrequire_scoped_url = False で呼び出します。ファイルの場所の形式両方(ステージ URI とステージ URL)で、 UDF 所有者はモデルファイルにアクセスできる必要があります。

インラインハンドラーを使用して UDF を作成します。

CREATE OR REPLACE FUNCTION extract_sentiment(input_data string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','scikit-learn')
HANDLER = 'run'
AS
$$
from snowflake.snowpark.files import SnowflakeFile
from sklearn.linear_model import SGDClassifier
import pickle

def run(input_data):
  model_file = '@models/NLP_model.pickle'
  # Specify 'mode = rb' to open the file in binary mode.
  with SnowflakeFile.open(model_file, 'rb', require_scoped_url = False) as f:
    model = pickle.load(f)
    return model.predict([input_data])[0]
$$;
Copy

モデルファイルをステージし、ディレクトリテーブルを更新します。

PUT file:///tmp/NLP_model.pickle @models AUTO_COMPRESS=FALSE;

ALTER STAGE models REFRESH;
Copy

また、 UDF にモデルのステージ URL を指定して、感情をを抽出することもできます。

たとえば、ステージ URL を使用して、ファイルを指定するインラインハンドラー付き UDF を作成します。

CREATE OR REPLACE FUNCTION extract_sentiment(input_data string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','scikit-learn')
HANDLER = 'run'
AS
$$
from snowflake.snowpark.files import SnowflakeFile
from sklearn.linear_model import SGDClassifier
import pickle

def run(input_data):
  model_file = 'https://my_account/api/files/my_db/my_schema/models/NLP_model.pickle'
  # Specify 'rb' to open the file in binary mode.
  with SnowflakeFile.open(model_file, 'rb', require_scoped_url = False) as f:
    model = pickle.load(f)
    return model.predict([input_data])[0]
$$;
Copy

入力されたデータで UDF を呼び出します。

SELECT extract_sentiment('I am writing to express my interest in a recent posting made.');
Copy

ファイルの記述

UDF ハンドラーは、 UDF を呼び出すクエリ用に作成された /tmp ディレクトリにファイルを書き込むことができます。

/tmp ディレクトリは単一の呼び出しクエリ用に確保されていますが、複数のPythonワーカープロセスが同時に実行されている可能性があることに注意してください。競合を防ぐには、/tmpディレクトリへのアクセスが他のPythonワーカープロセスと同期されていること、または/tmpに書き込まれるファイルの名前が一意であることを確認する必要があります。

コード例については、このトピックの ステージングされたファイルの解凍 をご参照ください。

次の例のコードは、入力された text/tmp ディレクトリに書き込みます。また、ファイルの場所の一意性を確保するために、関数のプロセス ID を追加します。

def func(text):
   # Append the function's process ID to ensure the file name's uniqueness.
   file_path = '/tmp/content' + str(os.getpid())
   with open(file_path, "w") as file:
      file.write(text)
Copy

ステージングされたファイルの解凍

.zipファイルをステージに保存し、Python zipfileモジュールを使用してUDFで解凍できます。

たとえば、.zipファイルをステージにアップロードし、UDFを作成するときに、IMPORTS句のステージングされた場所で.zipファイルを参照できます。実行時に、Snowflakeはステージングされたファイルを、コードがアクセスできるインポートディレクトリにコピーします。

ファイルの読み取りと書き込みの詳細については、 ファイルの読み取り および ファイルの記述 をご参照ください。

次の例では、UDFコードはNLPモデルを使用してテキスト内のエンティティを検出します。コードはこれらのエンティティの配列を返します。テキストを処理するためのNLPモデルを設定するには、コードは最初にzipfileモジュールを使用して、.zipファイルからモデルのファイル(en_core_web_sm-2.3.1)を抽出します。次に、コードはspaCyモジュールを使用して、ファイルからモデルをロードします。

このコードは、抽出されたファイルの内容を、この関数を呼び出すクエリ用に作成された/tmpディレクトリに書き込むことに注意してください。このコードはファイルロックを使用して、Pythonワーカープロセス間で抽出が同期されるようにします。このようにして、コンテンツは1回だけ解凍されます。ファイルの書き込みの詳細については、 ファイルの記述 をご参照ください。

zipfileモジュールの詳細については、 zipfileリファレンス をご参照ください。spaCyモジュールの詳細については、 spaCyAPIのドキュメント をご参照ください。

インラインハンドラーを使用して UDF を作成します。

CREATE OR REPLACE FUNCTION py_spacy(str string)
RETURNS ARRAY
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'func'
PACKAGES = ('spacy')
IMPORTS = ('@spacy_stage/spacy_en_core_web_sm.zip')
AS
$$
import fcntl
import os
import spacy
import sys
import threading
import zipfile

 # File lock class for synchronizing write access to /tmp.
 class FileLock:
   def __enter__(self):
       self._lock = threading.Lock()
       self._lock.acquire()
       self._fd = open('/tmp/lockfile.LOCK', 'w+')
       fcntl.lockf(self._fd, fcntl.LOCK_EX)

    def __exit__(self, type, value, traceback):
       self._fd.close()
       self._lock.release()

 # Get the location of the import directory. Snowflake sets the import
 # directory location so code can retrieve the location via sys._xoptions.
 IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
 import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]

 # Get the path to the ZIP file and set the location to extract to.
 zip_file_path = import_dir + "spacy_en_core_web_sm.zip"
 extracted = '/tmp/en_core_web_sm'

 # Extract the contents of the ZIP. This is done under the file lock
 # to ensure that only one worker process unzips the contents.
 with FileLock():
    if not os.path.isdir(extracted + '/en_core_web_sm/en_core_web_sm-2.3.1'):
       with zipfile.ZipFile(zip_file_path, 'r') as myzip:
          myzip.extractall(extracted)

 # Load the model from the extracted file.
 nlp = spacy.load(extracted + "/en_core_web_sm/en_core_web_sm-2.3.1")

 def func(text):
    doc = nlp(text)
    result = []

    for ent in doc.ents:
       result.append((ent.text, ent.start_char, ent.end_char, ent.label_))
    return result
 $$;
Copy

NULL 値の取り扱い

次のコードは、NULL値の処理方法を示しています。詳細については、 NULL 値 をご参照ください。

UDF を作成します。

CREATE OR REPLACE FUNCTION py_udf_null(a variant)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'udf'
AS $$

def udf(a):
    if not a:
        return 'JSON null'
    elif getattr(a, "is_sql_null", False):
        return 'SQL null'
    else:
        return 'not null'
$$;
Copy

UDF を呼び出します。

SELECT py_udf_null(null);
SELECT py_udf_null(parse_json('null'));
SELECT py_udf_null(10);
+-------------------+
| PY_UDF_NULL(NULL) |
|-------------------|
| SQL null          |
+-------------------+

+---------------------------------+
| PY_UDF_NULL(PARSE_JSON('NULL')) |
|---------------------------------|
| JSON null                       |
+---------------------------------+

+-----------------+
| PY_UDF_NULL(10) |
|-----------------|
| not null        |
+-----------------+
Copy