Python UDFsの作成

このトピックでは、Python UDF(ユーザー定義関数)を作成してインストールする方法を示します。

このトピックの内容:

Pythonコードの記述

Pythonモジュールと関数の記述

以下の仕様に従うモジュールを作成します。

  • モジュールを定義します。モジュールは、Pythonの定義とステートメントを含むファイルです。

  • モジュール内で関数を定義します。

  • 関数が引数を受け入れる場合、各引数は SQL-Python型マッピングテーブルPython Data Type 列で指定されたデータ型の1つである必要があります。

    関数引数は、名前ではなく位置によってバインドされます。UDFに渡される最初の引数は、Python関数が最初に受け取る引数です。

  • 適切な戻り値を指定します。Python UDFはスカラー関数である必要があるため、呼び出されるたびに1つの値を返す必要があります。戻り値の型は、 SQL-Python型マッピングテーブルPython Data Type 列で指定されたデータ型の1つである必要があります。戻り値の型は、 CREATE FUNCTION ステートメントの RETURNS 句で指定されたSQLデータ型と互換性がある必要があります。

  • モジュールには複数の関数を含めることができます。Snowflakeによって呼び出される関数は、同じモジュールまたは他のモジュール内の他の関数を呼び出すことができます。

  • 関数(および関数によって呼び出される関数)は、 Python UDFs に対してSnowflakeが課す制約 に準拠する必要があります。

注釈

Python UDFのバッチAPIがあります。これにより、入力行のバッチを Pandas DataFrames として受け取り、結果のバッチを Pandas配列 または シリーズ として返すPython関数を定義できます。詳細については、 Python UDFのバッチAPI をご参照ください。

UDFハンドラーを使用したファイルの読み取りと書き込み

UDFハンドラーコードを使用してファイルの読み取りと書き込みを行うことができます。SnowflakeがUDFsを実行する制限付きエンジン内でこれを安全に行うには、ここで説明するガイドラインに従ってください。

UDFハンドラーを使用したファイルの読み取り

UDFハンドラーは、Snowflakeステージにアップロードされたファイルを読み取ることができます。ファイルをホストするステージは、UDFの所有者によって読み取り可能である必要があります。

CREATE FUNCTIONのIMPORTS句でファイルのステージ位置を指定すると、SnowflakeはステージングされたファイルをUDF専用のインポートディレクトリにコピーします。ハンドラーコードはそこからファイルを読み取ることができます。

Snowflakeは、インポートされたすべてのファイルを、場合によっては複数のステージから単一のインポートディレクトリにコピーすることに注意してください。このため、IMPORTS句で指定されたファイルの名前は、相互に区別する必要があります。

コード例については、このトピックの ステージからPython UDFへのファイルのロード をご参照ください。

UDFハンドラーコードを使用してファイルを読み取るには:

  1. ファイルをSnowflakeステージにコピーします。

    PUTコマンドを使用して、クライアントコンピューターのローカルディレクトリからファイルをアップロードできます。詳細については、 PUT をご参照ください。ステージへのファイルのロードに関するより一般的な情報については、 データのロードの概要 をご参照ください。

  2. CREATE FUNCTIONを使用してUDFを作成する場合は、IMPORTS句でファイルの場所を指定します。

    次の例のコードは、 my_stage というステージで file.txt ファイルを指定しています。

    create or replace function my_udf()
       ...
       imports=('@my_stage/file.txt')
       ...
    
    Copy
  3. ハンドラーコードで、インポートディレクトリからファイルを読み取ります。

    Snowflakeは、ステージングされたファイルをUDFのインポートディレクトリにコピーします。 snowflake_import_directory システムオプションを使用して、ディレクトリの場所を取得できます。

    Pythonコードでは、次の例のように、Python sys._xoptions メソッドを使用してディレクトリの場所を取得できます。

    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    
    def compute():
       with open(import_dir + 'file.txt', 'r') as file:
          return file.read()
    
    Copy

UDFハンドラーを使用したファイルの書き込み

UDFハンドラーは、UDFを呼び出すクエリ用に作成された/tmpディレクトリにファイルを書き込むことができます。

/tmpディレクトリは単一の呼び出しクエリ用に確保されていますが、複数のPythonワーカープロセスが同時に実行されている可能性があることに注意してください。競合を防ぐには、/tmpディレクトリへのアクセスが他のPythonワーカープロセスと同期されていること、または/tmpに書き込まれるファイルの名前が一意であることを確認する必要があります。

コード例については、このトピックの ステージングされたファイルの解凍 をご参照ください。

次の例のコードは、入力 text を/tmpディレクトリに書き込み、関数のプロセスIDを追加して、ファイルの場所の一意性を確保します。

def func(text):
   # Ensure the file name's uniqueness by appending the function's process ID.
   file_path = '/tmp/content' + str(os.getpid())
   with open(file_path, "w") as file:
      file.write(text)
Copy

Snowflakeでの関数の作成

次を指定するには、 CREATE FUNCTION ステートメントを実行する必要があります。

  • 使用する SQL 関数名。

  • Python UDFが呼び出されたときに呼び出すPython関数の名前。

UDFの名前は、Pythonで記述されたハンドラー関数の名前と一致する必要はありません。CREATE FUNCTIONステートメントは、UDF名をPython関数に関連付けます。

UDF の名前を選択する場合は、

  • オブジェクト識別子 のルールに従います。

  • 一意の名前を選択するか、 オーバーロード のルールに従います。

    重要

    引数の数とデータ型の両方に基づいて関数を区別するSQL UDFsのオーバーロードとは異なり、Python UDFsは、引数の数 のみ に基づいて関数を区別します。

関数引数は、名前ではなく位置によってバインドされます。UDFに渡される最初の引数は、Python関数が最初に受け取る引数です。

引数のデータ型については、 SQL-Pythonデータ型マッピング をご参照ください。

インラインコードを使用するUDFsとステージからアップロードされたコードを使用するUDFsの対比

Python UDFのコードは、次のいずれかの方法で指定できます。

  • ステージからアップロード: CREATE FUNCTIONステートメントは、 ステージ 内の既存のPythonソースコードの場所を指定します。

  • インライン: CREATE FUNCTIONステートメントは、Pythonソースコードを指定します。

インラインPython UDFの作成

インライン UDFの場合は、CREATEFUNCTIONステートメントの一部としてPythonソースコードを提供します。

たとえば、次のステートメントは、指定された整数に1を追加するインラインPython UDFを作成します。

create or replace function addone(i int)
returns int
language python
runtime_version = '3.8'
handler = 'addone_py'
as
$$
def addone_py(i):
  return i+1
$$;
Copy

Pythonソースコードは AS 句で指定されます。ソースコードは、一重引用符またはドル記号($$)のペアで囲むことができます。ソースコードに一重引用符が埋め込まれている場合は、通常、二重ドル記号を使用する方が簡単です。

UDF を呼び出します。

select addone(10);
Copy

出力は次のとおりです。

+------------+
| ADDONE(10) |
|------------|
|         11 |
+------------+
Copy

Pythonソースコードには複数のモジュールとモジュール内の複数の関数を含めることができるため、 HANDLER 句は呼び出すモジュールと関数を指定します。

インラインPython UDFは、 IMPORTS 句に含まれるモジュールのコードを呼び出すことができます。

CREATE FUNCTION ステートメントの構文の詳細については、 CREATE FUNCTION をご参照ください。

その他の例については、 インラインPython UDFの例 をご参照ください。

ステージからアップロードされたコードを使用したPython UDFの作成

次のステートメントは、 ステージ からアップロードされたコードを使用して単純なPython UDFを作成します。ファイルをホストするステージは、UDFの 所有者 によって読み取り可能である必要があります。また、ZIPファイルは自己完結型である必要があり、実行する追加のセットアップスクリプトに依存しないようにする必要があります。

ソースコードを含む sleepy.py という名前のPythonファイルを作成します。

def snore(n):   # return a series of n snores
    result = []
    for a in range(n):
        result.append("Zzz")
    return result
Copy

SnowSQL (CLI クライアント) を起動し、 PUT コマンドを使用して、ファイルをローカルファイルシステムから @~ という名前のデフォルトのユーザーステージにコピーします。(PUT コマンドは、Snowflake GUIで実行できないことに注意してください。)

put
file:///Users/Me/sleepy.py
@~/
auto_compress = false
overwrite = true
;
Copy

ファイルを削除または名前変更すると、UDFを呼び出すことができなくなります。ファイルを更新する必要がある場合はファイルを更新できますが、更新中はUDFを呼び出すことができません。古いファイルがまだステージにある場合、 PUT コマンドには OVERWRITE=TRUE 句を含める必要があります。

UDFを作成します。ハンドラーはモジュールと関数を指定します。

create or replace function dream(i int)
returns variant
language python
runtime_version = '3.8'
handler = 'sleepy.snore'
imports = ('@~/sleepy.py')
Copy

UDF を呼び出します。

select dream(3);

+----------+
| DREAM(3) |
|----------|
| [        |
|   "Zzz", |
|   "Zzz", |
|   "Zzz"  |
| ]        |
+----------+
Copy

複数のインポートファイルの指定

複数のインポートファイルを指定する方法の例を次に示します。

create or replace function multiple_import_files(s string)
returns string
language python
runtime_version=3.8
imports=('@python_udf_dep/bar/python_imports_a.zip', '@python_udf_dep/foo/python_imports_b.zip')
handler='compute'
as
$$
def compute(s):
  return s
$$;
Copy

注釈

指定されたインポートファイル名は異なっている必要があります。例えば、これは機能しません: imports=('@python_udf_dep/bar/python_imports.zip', '@python_udf_dep/foo/python_imports.zip')

関数に対する権限の付与

関数の所有者以外のロールが関数を呼び出すには、所有者がそのロールに適切な権限を付与する必要があります。

Python UDFの GRANT ステートメントは、JavaScript UDFsなどといった、他のUDFsのGRANTステートメントと基本的に同じです。

例:

GRANT USAGE ON FUNCTION my_python_udf(number, number) TO my_role;
Copy

インラインPython UDFでのインポートされたパッケージの使用

Anacondaのサードパーティパッケージの厳選されたリストが利用可能です。詳細については、 サードパーティパッケージの使用 をご参照ください。

注釈

Anacondaが提供するパッケージを使用する前に、Snowflake組織管理者はSnowflakeサードパーティの規約に同意する必要があります。詳細については、 はじめるにあたり をご参照ください。

次のコードは、パッケージをインポートしてそのバージョンを返す方法を示しています。

UDF を作成します。

create or replace function py_udf()
returns variant
language python
runtime_version = 3.8
packages = ('numpy','pandas','xgboost==1.5.0')
handler = 'udf'
as $$
import numpy as np
import pandas as pd
import xgboost as xgb
def udf():
    return [np.__version__, pd.__version__, xgb.__version__]
$$;
Copy

UDF を呼び出します。

select py_udf();
Copy

出力は次のとおりです。

+-------------+
| PY_UDF()    |
|-------------|
| [           |
|   "1.19.2", |
|   "1.4.0",  |
|   "1.5.0"   |
| ]           |
+-------------+
Copy

ステージからPython UDFへのファイルのロード

この例は、非コードファイルをステージからPython UDFにインポートする方法を示しています。ファイルは UDF の作成中に1回だけ読み取られ、ファイルの読み取りがターゲットハンドラーの外部で発生した場合は、 UDF の実行中に再度読み取られることはありません。ファイルの読み取りの詳細については、 UDFハンドラーを使用したファイルの読み取り をご参照ください。

注釈

ファイルはステージのトップレベルディレクトリからのみインポートでき、サブフォルダからはインポートできません。

UDF を作成します。

create or replace function my_udf()
  returns string
  language python
  runtime_version=3.8
  imports=('@my_stage/file.txt')
  handler='compute'
as
$$
import sys
import os

with open(os.path.join(sys._xoptions["snowflake_import_directory"], 'file.txt'), "r") as f:
    s = f.read()

def compute():
    return s
$$;
Copy

ステージングされたファイルの解凍

.zipファイルをステージに保存し、Python zipfileモジュールを使用してUDFで解凍できます。

たとえば、.zipファイルをステージにアップロードし、UDFを作成するときに、IMPORTS句のステージングされた場所で.zipファイルを参照できます。実行時に、Snowflakeはステージングされたファイルを、コードがアクセスできるインポートディレクトリにコピーします。

ファイルの読み取りと書き込みの詳細については、 UDFハンドラーを使用したファイルの読み取りと書き込み をご参照ください。

次の例では、UDFコードはNLPモデルを使用してテキスト内のエンティティを検出します。コードはこれらのエンティティの配列を返します。テキストを処理するためのNLPモデルを設定するには、コードは最初にzipfileモジュールを使用して、.zipファイルからモデルのファイル(en_core_web_sm-2.3.1)を抽出します。次に、コードはspaCyモジュールを使用して、ファイルからモデルをロードします。

このコードは、抽出されたファイルの内容を、この関数を呼び出すクエリ用に作成された/tmpディレクトリに書き込むことに注意してください。このコードはファイルロックを使用して、Pythonワーカープロセス間で抽出が同期されるようにします。このようにして、コンテンツは1回だけ解凍されます。ファイルの書き込みの詳細については、 UDFハンドラーを使用したファイルの書き込み をご参照ください。

zipfileモジュールの詳細については、 zipfileリファレンス をご参照ください。spaCyモジュールの詳細については、 spaCyAPIのドキュメント をご参照ください。

UDF を作成します。

create or replace function py_spacy(str string)
   returns array
   language python
   runtime_version = 3.8
   handler = 'func'
   packages = ('spacy')
   imports = ('@spacy_stage/spacy_en_core_web_sm.zip')
   as
$$
import fcntl
import os
import spacy
import sys
import threading
import zipfile

# File lock class for synchronizing write access to /tmp
class FileLock:
   def __enter__(self):
      self._lock = threading.Lock()
      self._lock.acquire()
      self._fd = open('/tmp/lockfile.LOCK', 'w+')
      fcntl.lockf(self._fd, fcntl.LOCK_EX)

   def __exit__(self, type, value, traceback):
      self._fd.close()
      self._lock.release()

# Get the location of the import directory. Snowflake sets the import
# directory location so code can retrieve the location via sys._xoptions.
IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]

# Get the path to the ZIP file and set the location to extract to.
zip_file_path = import_dir + "spacy_en_core_web_sm.zip"
extracted = '/tmp/en_core_web_sm'

# Extract the contents of the ZIP. This is done under the file lock
# to ensure that only one worker process unzips the contents.
with FileLock():
   if not os.path.isdir(extracted + '/en_core_web_sm/en_core_web_sm-2.3.1'):
      with zipfile.ZipFile(zip_file_path, 'r') as myzip:
         myzip.extractall(extracted)

# Load the model from the extracted file.
nlp = spacy.load(extracted + "/en_core_web_sm/en_core_web_sm-2.3.1")

def func(text):
   doc = nlp(text)
   result = []

   for ent in doc.ents:
      result.append((ent.text, ent.start_char, ent.end_char, ent.label_))
   return result
$$;
Copy

Python UDFsでのNULLの処理

次のコードは、NULL値の処理方法を示しています。詳細については、 NULL 値 をご参照ください。

UDF を作成します。

create or replace function py_udf_null(a variant)
returns string
language python
runtime_version = 3.8
handler = 'udf'
as $$

def udf(a):
    if not a:
        return 'JSON null'
    elif getattr(a, "is_sql_null", False):
        return 'SQL null'
    else:
        return 'not null'
$$;
Copy

UDF を呼び出します。

select py_udf_null(null);
select py_udf_null(parse_json('null'));
select py_udf_null(10);
Copy

出力は次のとおりです。

+-------------------+
| PY_UDF_NULL(NULL) |
|-------------------|
| SQL null          |
+-------------------+

+---------------------------------+
| PY_UDF_NULL(PARSE_JSON('NULL')) |
|---------------------------------|
| JSON null                       |
+---------------------------------+

+-----------------+
| PY_UDF_NULL(10) |
|-----------------|
| not null        |
+-----------------+
Copy