Pythonでの UDTF の記述

このトピックの内容:

Pythonでユーザー定義の テーブル関数 (UDTF)ハンドラーを実装できます。このハンドラーコードは、UDTFが呼び出されたときに実行されます。このトピックでは、Pythonでハンドラーを実装し、UDTFを作成する方法について説明します。

UDTFは、表形式の結果を返すユーザー定義関数(UDF)です。Pythonで実装されたUDFハンドラーの詳細については、 Python UDFsの作成 をご参照ください。UDFsの一般的な情報については、 ユーザー定義関数の概要 をご参照ください。

UDTFのハンドラーでは、入力行を処理できます(このトピックの 行の処理 をご参照ください)。入力パーティションごとに実行するロジックを作成することもできます(このトピックの パーティションの処理 をご参照ください)。

Python UDTFを作成するときは、次のようにします。

  1. UDTFが呼び出されたときにSnowflakeが呼び出すメソッドを使用してクラスを実装します。

    詳細については、このトピック内の ハンドラーの実装 をご参照ください。

  2. クラスをハンドラーとして指定し、CREATE FUNCTIONコマンドを使用してSQLにUDTFを作成します。UDTFを作成するときは、次のように指定します。

    • UDTF入力パラメーターのデータ型。

    • UDTF によって返される列のデータ型。

    • UDTF が呼び出されたときにハンドラーとして実行するコード。

    • ハンドラーが実装されている言語。

    構文の詳細については、このトピック内の CREATE FUNCTION を使用したUDTFの作成 をご参照ください。

UDF の呼び出し で説明されているように、UDF または UDTF を呼び出すことができます。

注釈

テーブル関数(UDTFs)には、入力引数に500個、出力列に500個の制限があります。

Snowflakeは現在、次のバージョンのPythonでJava UDTFs の記述をサポートしています。

  • 3.8

  • 3.9

  • 3.10

CREATE FUNCTION ステートメントで、 runtime_version を希望のバージョンに設定します。

ハンドラーの実装

UDTF引数値を表形式の結果に処理し、パーティション化された入力を処理するハンドラークラスを実装します。ハンドラークラスの例については、このトピックの ハンドラークラスの例 をご参照ください。

CREATE FUNCTIONでUDTFを作成するときは、このクラスをUDTFのハンドラーとして指定します。関数を作成するためのSQLの詳細については、このトピックの CREATE FUNCTION を使用したUDTFの作成 をご参照ください。

ハンドラークラスは、UDTFが呼び出されたときにSnowflakeが呼び出すメソッドを実装します。このクラスには、UDTFのロジックが含まれています。

メソッド

要件

説明

__init__ メソッド

オプション

入力パーティションのステートフル処理のために状態を初期化します。詳細については、このトピック内の ハンドラーの初期化 をご参照ください。

process メソッド

必須

各入力行を処理し、表形式の値をタプルとして返します。Snowflakeはこのメソッドを呼び出し、UDTFの引数から入力を渡します。詳細については、このトピック内の process メソッドの定義 をご参照ください。

end_partition メソッド

オプション

入力パーティションの処理を終了し、表形式の値をタプルとして返します。詳細については、このトピック内の パーティション処理の終了 をご参照ください。

ハンドラークラスの任意のメソッドから例外をスローすると、処理が停止することに注意してください。UDTF を呼び出したクエリは、エラーメッセージを表示して失敗します。

注釈

コードがここで説明されている要件を満たしていない場合、UDTFの作成または実行が失敗する可能性があります。Snowflakeは、CREATE FUNCTIONステートメントの実行時に違反を検出します。

ハンドラークラスの例

次の例のコードは、ハンドラークラスがパーティション内の行を処理するUDTFを作成します。 process メソッドは、各入力行を処理し、株式売却の合計コストを含む行を返します。パーティション内の行を処理した後、パーティションに含まれるすべての売上の合計を(end_partition メソッドから)返します。

create or replace function stock_sale_sum(symbol varchar, quantity number, price number(10,2))
returns table (symbol varchar, total number(10,2))
language python
runtime_version=3.8
handler='StockSaleSum'
as $$
class StockSaleSum:
    def __init__(self):
        self._cost_total = 0
        self._symbol = ""

    def process(self, symbol, quantity, price):
      self._symbol = symbol
      cost = quantity * price
      self._cost_total += cost
      yield (symbol, cost)

    def end_partition(self):
      yield (self._symbol, self._cost_total)
$$;
Copy

次の例のコードは、前述の UDF を呼び出し、 stocks_table テーブルの列 symbolquantity、および price から値を渡します。UDTF の呼び出しの詳細については、 UDF の呼び出し をご参照ください。

select stock_sale_sum.symbol, total
  from stocks_table, table(stock_sale_sum(symbol, quantity, price) over (partition by symbol));
Copy

ハンドラーの初期化

オプションで、ハンドラーが行の処理を開始する前にSnowflakeが呼び出す __init__ メソッドをハンドラークラスに実装できます。例えば、このメソッドを使用して、ハンドラーのパーティションスコープの状態を確立できます。 __init__ メソッドは出力行を生成しない場合があります。

メソッドのシグネチャは、次の形式である必要があります。

def __init__(self):
Copy

例えば、次のようにします。

  • パーティションの状態を初期化し、この状態を process および end_partition メソッドで使用します。

  • 行ごとに1回ではなく、パーティションごとに1回だけ実行する必要がある、長時間実行の初期化を実行します。

注釈

クラス宣言の前など、ハンドラークラスの外部にそのコードを含めることにより、パーティション処理を開始する前にロジックを1回実行することもできます。

パーティションの処理の詳細については、このトピック内の パーティションの処理 をご参照ください。

__init__ メソッドを使用する場合は、 __init__ に注意してください。

  • 引数として self のみを取ることができます。

  • 出力行を生成できません。そのためには、 process メソッドの実装を使用してください。

  • パーティションごとに1回呼び出され、 process メソッドが呼び出される前にも呼び出されます。

行の処理

Snowflakeが入力行ごとに呼び出す process メソッドを実装します。

process メソッドの定義

SQL型から変換されたUDTF引数を値として受け取る process メソッドを定義し、SnowflakeがUDTFの表形式の戻り値を作成するために使用するデータを返します。

メソッドのシグネチャは、次の形式である必要があります。

def process(self, *args):
Copy

process メソッドには次が必要です。

  • self パラメータがある。

  • UDTF パラメーターに対応するメソッドパラメーターを宣言する。

    メソッドパラメーター名はUDTFパラメータ名と一致する必要はありませんが、メソッドパラメーターはUDTFパラメータが宣言されるのと 同じ順序で 宣言される必要があります。

    UDTF引数値をメソッドに渡すと、Snowflakeは値をSQLタイプからメソッドで使用するPython型に変換します。SnowflakeがSQLとPythonのデータ型の間でどのようにマッピングされるかについては、 SQL-Pythonデータ型マッピング をご参照ください。

  • タプルのシーケンスがUDTF戻り値列のシーケンスに対応する、1つ以上のタプルを生成します(またはタプルを含むイテレータを返します)。

    タプル要素は、UDTF戻り値列が宣言されているのと 同じ順序 で表示される必要があります。詳細については、このトピック内の 値を返す をご参照ください。

    Snowflakeは、値をPython型からUDTF宣言に必要なSQL型に変換します。SnowflakeがSQLとPythonのデータ型の間でどのようにマッピングされるかについては、 SQL-Pythonデータ型マッピング をご参照ください。

ハンドラークラスのメソッドが例外をスローすると、処理は停止します。UDTFを呼び出したクエリは、エラーメッセージを表示して失敗します。 process メソッドが None を返す場合、処理は停止します。(process メソッドが None を返しても、 end_partition メソッドは引き続き呼び出されます。)

processメソッドの例

次の例のコードは、3つのUDTF引数(symbolquantity、および price)を処理する process メソッドを持つ StockSale ハンドラークラスを示し、2つの列(symbol および total)を含む単一の行を返します。 process メソッドパラメーターは stock_sale パラメーターと同じ順序で宣言されていることに注意してください。 process メソッドの yield ステートメントの引数は、 stock_sale RETURNS TABLE句で宣言された列と同じ順序です。

create or replace function stock_sale(symbol varchar, quantity number, price number(10,2))
returns table (symbol varchar, total number(10,2))
language python
runtime_version=3.8
handler='StockSale'
as $$
class StockSale:
    def process(self, symbol, quantity, price):
      cost = quantity * price
      yield (symbol, cost)
$$;
Copy

次の例のコードは、前述の UDF を呼び出し、 stocks_table テーブルの列 symbolquantity、および price から値を渡します。

select stock_sale.symbol, total
  from stocks_table, table(stock_sale(symbol, quantity, price) over (partition by symbol));
Copy

値を返す

出力行を返す場合、 yield または return のいずれか(両方ではない)を使用して、表形式の値を持つタプルを返すことができます。メソッドが None を返すか生成する場合、現在の行の処理は停止します。

  • yield を使用する場合は、出力行ごとに個別の yield ステートメントを実行します。 yield に付属する遅延評価により、より効率的な処理が可能になり、タイムアウトを回避できるため、これがベストプラクティスです。

    タプルの各要素は、UDTFによって返される結果の列値になります。 yield 引数の順序は、CREATE FUNCTIONのRETURNS TABLE句で戻り値として宣言された列の順序と一致する必要があります。

    次の例のコードは、2つの行を表す値を返します。

    def process(self, symbol, quantity, price):
      cost = quantity * price
      yield (symbol, cost)
      yield (symbol, cost)
    
    Copy

    次の例のように、yield引数はタプルであるため、タプルに単一の値を渡す場合は、後続するにコンマを含める必要があることに注意してください。

    yield (cost,)
    
    Copy
  • return を使用する場合、タプルで反復可能を返します。

    タプルの各値は、UDTFによって返される結果の列値になります。タプル内の列値の順序は、CREATE FUNCTIONのRETURNS TABLE句で戻り値として宣言された列の順序と一致する必要があります。

    次の例のコードは、symbolとtotalの2つの列を持つ2つの行を返します。

    def process(self, symbol, quantity, price):
      cost = quantity * price
      return [(symbol, cost), (symbol, cost)]
    
    Copy

行をスキップする

入力行をスキップして次の行を処理するには(入力行を検証する場合など)、 process メソッドに次のいずれかを返すようにします。

  • return を使用する場合は、 NoneNone を含むリスト、または行をスキップするための空のリストを返します。

  • yield を使用する場合は、 None を返して行をスキップします。

    yield への呼び出しが複数ある場合、 None を返す呼び出しの後の呼び出しはSnowflakeによって無視されることに注意してください。

次の例のコードは、 number が正の整数である行のみを返します。 number が正ではない場合、メソッドは None を返し、現在の行をスキップして次の行の処理を続行します。

def process(self, number):
  if number < 1:
    yield None
  else:
    yield (number)
Copy

ステートフルおよびステートレス処理

ハンドラーを実装して、パーティション対応の方法で行を処理することも、行ごとに行を処理することもできます。

  • パーティション対応処理 では、ハンドラーにパーティションスコープの状態を管理するためのコードが含まれます。これには、パーティション処理の開始時に実行される __init__ メソッドと、パーティションの最後の行の処理後にSnowflakeが呼び出す end_partition メソッドが含まれます。詳細については、このトピック内の パーティションの処理 をご参照ください。

  • パーティションを認識しない処理 では、ハンドラーはパーティションの境界を無視してステートレスに実行されます。

    ハンドラーをこのように実行するには、 __init__ または end_partition メソッドを含めないでください。

パーティションの処理

パーティションごとに実行されるコード(状態の管理など)と、パーティション内の各行に対して実行されるコードを使用して、入力でパーティションを処理できます。

注釈

UDTF を呼び出すときにパーティションを指定する方法の詳細については、 テーブル関数とパーティション をご参照ください。

クエリにパーティションが含まれている場合、列の値などの指定された値を使用して行を集約します。ハンドラーが受け取る集約された行は、その値によってパーティション化されていると言われます。コードでこれらのパーティションとその行を処理できるため、各パーティションの処理にはパーティションスコープの状態が含まれます。

次のSQLサンプルクエリのコードで株式売却情報を取得します。 symbol 列の値によって入力がパーティション化された stock_sale_sum UDTFを実行します。

select stock_sale_sum.symbol, total
  from stocks_table, table(stock_sale_sum(symbol, quantity, price) over (partition by symbol));
Copy

入ってくる行がパーティション化されている場合でも、コードはパーティションの分離を無視して行を処理するだけであることに注意してください。例えば、ハンドラークラスの __init__ メソッドや end_partition メソッドなど、パーティションスコープの状態を処理するように設計されたコードを省略して、 process メソッドを実装するだけで済みます。詳細については、このトピック内の ステートフルおよびステートレス処理 をご参照ください。

各パーティションを1つの単位として処理するには、次のようにします。

  • パーティションの処理を初期化するハンドラークラス __init__ メソッドを実装します。

    詳細については、このトピック内の ハンドラーの初期化 をご参照ください。

  • process メソッドで各行を処理するときに、パーティション対応コードを含めます。

    行の処理の詳細については、このトピック内の 行の処理 をご参照ください。

  • end_partition メソッドを実装して、パーティション処理を完了します。

    詳細については、このトピック内の パーティション処理の終了 をご参照ください。

以下では、パーティションごとに実行するように設計されたコードを含めた場合のハンドラーへの呼び出しのシーケンスについて説明します。

  1. パーティションの処理が開始され、最初の行が処理される前に、Snowflakeはハンドラークラスの __init__ メソッドを使用してクラスのインスタンスを作成します。

    ここで、パーティションスコープの状態を確立できます。例えば、パーティション内の行から計算された値を保持するようにインスタンス変数を初期化できます。

  2. パーティションの各行に対して、Snowflakeは process メソッドを呼び出します。

    メソッドが実行されるたびに、状態値を変更できます。例えば、 process メソッドでインスタンス変数の値を更新する場合があります。

  3. コードがパーティションの最後の行を処理した後、Snowflakeは end_partition メソッドを呼び出します。

    このメソッドから、返したいパーティションレベルの値を含む出力行を返すことができます。例えば、パーティション内の行を処理するときに更新していたインスタンス変数の値を返す場合があります。

    end_partition メソッドはSnowflakeから引数を受け取りません。Snowflakeは、パーティションの最後の行を処理した後にメソッドを呼び出すだけです。

パーティション処理の終了

オプションで、パーティション内のすべての行を処理した後にSnowflakeが呼び出す end_partition メソッドをハンドラークラスに実装できます。このメソッドでは、パーティションのすべての行が処理された後、パーティションのコードを実行できます。 end_partition メソッドは、パーティションスコープの計算結果を返すなどの出力行を生成する場合があります。詳細については、このトピック内の パーティションの処理 をご参照ください。

メソッドのシグネチャは、次の形式である必要があります。

def end_partition(self):
Copy

Snowflakeは、次の end_partition メソッドの実装を想定しています。

  • 静的であってはなりません。

  • self 以外のパラメータは含まれていない可能性があります。

  • 表形式の値を返す代わりに、空のリストまたは None を生成する場合があります。

注釈

Snowflakeは、正常に処理するためにタイムアウトが調整された大型のパーティションをサポートしていますが、特に大型のパーティションでは、処理中にタイムアウトする可能性があります(end_partition の完了に時間がかかりすぎる場合など)。特定の使用シナリオに合わせてタイムアウトのしきい値を調整する必要がある場合は、 Snowflakeサポート にお問い合わせください。

パーティション処理の例

次の例のコードは、(process メソッドで)最初に購入あたりのコストを計算し、購入を合計することによって、株式の購入全体で支払われる合計コストを計算します。このコードは、 end_partition メソッドで合計を返します。

UDTF の呼び出しと共に、このハンドラーを含んでいる UDTF の例については、 ハンドラークラスの例 をご参照ください。

class StockSaleSum:
  def __init__(self):
    self._cost_total = 0
    self._symbol = ""

  def process(self, symbol, quantity, price):
    self._symbol = symbol
    cost = quantity * price
    self._cost_total += cost
    yield (symbol, cost)

  def end_partition(self):
    yield (self._symbol, self._cost_total)
Copy

パーティションを処理するときは、次の点に注意してください。

  • コードは、UDTFの呼び出しで明示的に指定されていないパーティションを処理する場合があります。UDTFの呼び出しにPARTITION BY句が含まれていない場合でも、Snowflakeはデータを暗黙的にパーティション化します。

  • process メソッドは、パーティションのORDER BY句で指定された順序で行データを受け取ります(存在する場合)。

インポートされたパッケージの使用

Snowflakeで利用可能なAnacondaのサードパーティパッケージの厳選されたリストに含まれているPythonパッケージを使用できます。これらのパッケージをUDTFの依存関係として指定するには、CREATE FUNCTIONのPACKAGES句を使用します。

Snowflakeで次のSQLを実行すると、含まれているパッケージのリストを見つけることができます。

select * from information_schema.packages where language = 'python';
Copy

詳細については、 サードパーティパッケージの使用 および Python UDFsの作成 をご参照ください。

次の例のコードは、 NumPy(Numerical Python) パッケージの関数を使用して、1株あたりの価格がそれぞれ異なる一連の株式購入から1株あたりの平均価格を計算します。

create or replace function stock_sale_average(symbol varchar, quantity number, price number(10,2))
returns table (symbol varchar, total number(10,2))
language python
runtime_version=3.8
packages = ('numpy')
handler='StockSaleAverage'
as $$
import numpy as np

class StockSaleAverage:
    def __init__(self):
      self._price_array = []
      self._quantity_total = 0
      self._symbol = ""

    def process(self, symbol, quantity, price):
      self._symbol = symbol
      self._price_array.append(float(price))
      cost = quantity * price
      yield (symbol, cost)

    def end_partition(self):
      np_array = np.array(self._price_array)
      avg = np.average(np_array)
      yield (self._symbol, avg)
$$;
Copy

次の例のコードは、前述の UDF を呼び出し、 stocks_table テーブルの列 symbolquantity、および price から値を渡します。UDTF の呼び出しの詳細については、 UDF の呼び出し をご参照ください。

select stock_sale_average.symbol, total
  from stocks_table,
  table(stock_sale_average(symbol, quantity, price)
    over (partition by symbol));
Copy

ワーカープロセスによる並行タスクの実行

Pythonワーカープロセスを使用して、並行タスクを実行することができます。これは、ウェアハウスノードの複数の CPU コアを活用した並列タスクを実行する必要がある場合に役立つ可能性があります。

注釈

Snowflakeは、組み込みのPythonのマルチプロセスモジュールは使用しないことを推奨しています。

Pythonグローバルインタープリターロック によって、マルチタスクのアプローチが CPU のすべてのコアに渡ってスケーリングできない場合に対処するために、スレッドではなく、別々のワーカープロセスを使って並行タスクを実行することができます。

Snowflakeウェアハウスでは、次の例のように、 joblib ライブラリの Parallel クラスを使用してこれを実行できます。

CREATE OR REPLACE FUNCTION joblib_multiprocessing_udtf(i INT)
  RETURNS TABLE (result INT)
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.8
  HANDLER = 'JoblibMultiprocessing'
  PACKAGES = ('joblib')
AS $$
import joblib
from math import sqrt

class JoblibMultiprocessing:
  def process(self, i):
    pass

  def end_partition(self):
    result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
    for r in result:
      yield (r, )
$$;
Copy

注釈

joblib.Parallel に使用されるデフォルトのバックエンドは、Snowflake標準のウェアハウスとSnowparkに最適されたウェアハウスで異なります。

  • 標準ウェアハウスのデフォルト: threading

  • Snowparkに最適化されたウェアハウスのデフォルト: loky (マルチプロセス)

次の例のように、 joblib.parallel_backend 関数を呼び出すと、デフォルトのバックエンド設定を上書きできます。

import joblib
joblib.parallel_backend('loky')
Copy

CREATE FUNCTION を使用したUDTFの作成

CREATE FUNCTIONコマンドを使用してSQLにUDTFを作成し、作成したコードをハンドラーとして指定します。コマンドリファレンスについては、 CREATE FUNCTION をご参照ください。

UDTFを作成するときは、次の構文を使用してください。

CREATE OR REPLACE FUNCTION <name> ( [ <arguments> ] )
  RETURNS TABLE ( <output_column_name> <output_column_type> [, <output_column_name> <output_column_type> ... ] )
  LANGUAGE PYTHON
  [ IMPORTS = ( '<imports>' ) ]
  RUNTIME_VERSION = 3.8
  [ PACKAGES = ( '<package_name>' [, '<package_name>' . . .] ) ]
  [ TARGET_PATH = '<stage_path_and_file_name_to_write>' ]
  HANDLER = '<handler_class>'
  [ AS '<python_code>' ]
Copy

作成したハンドラーコードを UDTF に関連付けるには、CREATE FUNCTION を実行するときに次のようにします。

  • RETURNS TABLE で、列名と型のペアで出力列を指定します。

  • LANGUAGE を PYTHON に設定します。

  • クラスがステージなどの外部の場所にある場合は、IMPORTS 句の値をハンドラークラスのパスと名前に設定します。

    詳細については、 Python UDFsの作成 をご参照ください。

  • RUNTIME_VERSIONをコードに必要なPythonランタイムのバージョンに設定します。サポートされているPythonのバージョンは次のとおりです。

    • 3.8

    • 3.9

    • 3.10

  • PACKAGES句の値を、ハンドラークラスに必要な1つ以上のパッケージの名前に設定します(存在する場合)。

    詳細については、 サードパーティパッケージの使用 および Python UDFsの作成 をご参照ください。

  • HANDLER句の値をハンドラークラスの名前に設定します。

    PythonハンドラーコードをUDTFに関連付ける場合、コードをインラインで含めるか、Snowflakeステージ上の場所で参照することができます。HANDLERの値では大文字と小文字が区別され、Pythonクラスの名前と一致する必要があります。

    詳細については、 インラインコードを使用するUDFsとステージからアップロードされたコードを使用するUDFsの対比 をご参照ください。

    重要

    スカラー Python UDFの場合、HANDLER句の値にはメソッド名が含まれます。

    Python UDTFの場合、HANDLER句の値にはクラス名が含まれますが、メソッド名は 含まれません

    違いの理由は、スカラーPython UDF の場合、ハンドラーメソッドの名前はユーザーが選択するため、Snowflakeは事前に知ることができませんが、Python UDTF の場合、メソッドの名前(end_partition メソッドなど)は、Snowflakeで指定された名前と一致する必要があるため既知だからです。

  • ハンドラーコードがCREATE FUNCTIONとインラインで指定されている場合は、 AS '<python_code>' 句が必要です。