ベクトル化されたPython UDTFs

このトピックでは、ベクトル化されたPython UDTFs を紹介します。

このトピックの内容:

概要

Vectorized Python UDTFs (user-defined table functions) enable seamless partition-by-partition processing by operating on partitions as pandas DataFrames and returning results as pandas DataFrames or lists of pandas arrays or pandas Series. Vectorized Python UDTFs allow for easy integration with libraries that operate on pandas DataFrames or pandas arrays.

次の場合は、ベクトル化された UDTF を使用します。

  • 行単位ではなく、パーティション単位でデータを処理する必要がある。

  • パーティションごとに複数の行または列を返す必要がある。

  • データ分析にpandas DataFrames で作動するライブラリを使用する。

前提条件

Python用Snowparkライブラリのバージョン1.6.1以降が必要です。

ベクトル化されたPython UDTFs 入門

ベクトル化されたPython UDTF を作成するには、

  • オプションで、各パーティションを処理する前に呼び出される __init__ メソッドを持つハンドラークラスを定義します。

  • process メソッドを定義しないでください。

  • 引数 DataFrame を受け取り、 pandas.DataFrame または pandas.Series または pandas.arrays のタプル(各配列は列)を返す end_partition メソッドを定義します。結果の列型は、 UDTF 定義の列型と一致している必要があります。

  • @vectorized デコレーターまたは _sf_vectorized_input 関数属性を使用して、 end_partition メソッドをベクトル化されたものとしてマークします。詳細については、 ベクトル化されたPython UDFs をご参照ください。 @vectorized デコレーターは、 SQL ワークシートを使用する場合など、Python UDTF がSnowflake内で実行される場合にのみ使用できます。クライアントまたはPythonワークシートを使用して実行する場合は、関数属性を使用する必要があります。

注釈

ベクトル化された UDTF への入力 DataFrame のデフォルト列名は、 SQL 関数の署名と一致します。列名は SQL 識別子の要件 に従います。つまり、識別子が引用符で囲まれていない場合は大文字になり、二重引用符で囲まれている場合はそのまま保持されます。

以下は、 @vectorized デコレーターを使用してPython UDTF をベクトル化した例です。

from _snowflake import vectorized
import pandas

class handler:
  def __init__(self):
    # initialize a state
  @vectorized(input=pandas.DataFrame)
  def end_partition(self, df):
    # process the DataFrame
    return result_df
Copy

以下は、関数属性を使用してベクトル化されたPython UDTF を作成する例です。

import pandas

class handler:
  def __init__(self):
    # initialize a state
  def end_partition(self, df):
    # process the DataFrame
    return result_df

handler.end_partition._sf_vectorized_input = pandas.DataFrame
Copy

注釈

ベクトル化された UDTF は、パーティションを構築するために PARTITION BY 句で呼び出す必要があります。

ベクトル化された UDTF を同じパーティションにあるすべてのデータで呼び出すには、

SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY 1));
Copy

ベクトル化された UDTF をx列で分割されたデータで呼び出すには、

SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY x));
Copy

型サポート

ベクトル化されたPython UDTFs は、引数と戻り値についてベクトル化されたPython UDFs と同じ SQL 型 をサポートします。ただし、ベクトル化されたPython UDTFs の場合、すべてが64ビットまたはより小さい整数型に適合するスケール0を持つ SQL NUMBER 引数は、常に Int16Int32、または Int64 にマッピングされます。言い換えると、スカラー UDFs とは異なり、UDTF の引数がNULL許容でない場合、 int16int32int64 には変換されないということです。

SQL 型がPandas dtypesにどのようにマッピングされるかを示すテーブルを表示するには、ベクトル化されたPython UDFs のトピックにある 型サポートテーブル をご参照ください。

例: 通常の UDTF を使用した行の収集と、ベクトル化された UDTF を使用した行の収集の比較。

以下は、通常の UDTF を使用した行コレクションの例です。

import pandas

class handler:
  def __init__(self):
    self.rows = []
  def process(self, *row):
    self.rows.append(row)
  def end_partition(self):
    df = pandas.DataFrame(self.rows)
    # process the DataFrame
    return result_df
Copy

以下は、ベクトル化された UDTF を使用した行コレクションの例です。

from _snowflake import vectorized
import pandas

class handler:
  def __init__(self):
    self.rows = []
  @vectorized(input=pandas.DataFrame)
  def end_partition(self, df):
  # process the DataFrame
    return result_df
Copy

例: パーティション内の各列の要約統計量を計算する

以下は、pandas describe() メソッドを使用して、パーティション内の各列の要約統計量を計算する例です。

まず、テーブルを作成し、5行ずつのパーティションを3つ作成します。

create or replace table test_values(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float);

-- generate 3 partitions of 5 rows each
insert into test_values
select 'x',
uniform(1.5,1000.5,random(1))::float col1,
uniform(1.5,1000.5,random(2))::float col2,
uniform(1.5,1000.5,random(3))::float col3,
uniform(1.5,1000.5,random(4))::float col4,
uniform(1.5,1000.5,random(5))::float col5
from table(generator(rowcount => 5));

insert into test_values
select 'y',
uniform(1.5,1000.5,random(10))::float col1,
uniform(1.5,1000.5,random(20))::float col2,
uniform(1.5,1000.5,random(30))::float col3,
uniform(1.5,1000.5,random(40))::float col4,
uniform(1.5,1000.5,random(50))::float col5
from table(generator(rowcount => 5));

insert into test_values
select 'z',
uniform(1.5,1000.5,random(100))::float col1,
uniform(1.5,1000.5,random(200))::float col2,
uniform(1.5,1000.5,random(300))::float col3,
uniform(1.5,1000.5,random(400))::float col4,
uniform(1.5,1000.5,random(500))::float col5
from table(generator(rowcount => 5));
Copy

データをご覧ください。

select * from test_values;

-----------------------------------------------------
|"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |
-----------------------------------------------------
|x     |8.0     |99.4    |714.6   |168.7   |397.2   |
|x     |106.4   |237.1   |971.7   |828.4   |988.2   |
|x     |741.3   |207.9   |32.6    |640.6   |63.2    |
|x     |541.3   |828.6   |844.9   |77.3    |403.1   |
|x     |4.3     |723.3   |924.3   |282.5   |158.1   |
|y     |976.1   |562.4   |968.7   |934.3   |977.3   |
|y     |390.0   |244.3   |952.6   |101.7   |24.9    |
|y     |599.7   |191.8   |90.2    |788.2   |761.2   |
|y     |589.5   |201.0   |863.4   |415.1   |696.1   |
|y     |46.7    |659.7   |571.1   |938.0   |513.7   |
|z     |313.9   |188.5   |964.6   |435.4   |519.6   |
|z     |328.3   |643.1   |766.4   |148.1   |596.4   |
|z     |929.0   |255.4   |915.9   |857.2   |425.5   |
|z     |612.8   |816.4   |220.2   |879.5   |331.4   |
|z     |487.1   |704.5   |471.5   |378.9   |481.2   |
-----------------------------------------------------
Copy

次に、関数を作成します。

create or replace function summary_stats(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float)
returns table (column_name varchar, count int, mean float, std float, min float, q1 float, median float, q3 float, max float)
language python
runtime_version=3.8
packages=('pandas')
handler='handler'
as $$
from _snowflake import vectorized
import pandas

class handler:
    @vectorized(input=pandas.DataFrame)
    def end_partition(self, df):
      # using describe function to get the summary statistics
      result = df.describe().transpose()
      # add a column at the beginning for column ids
      result.insert(loc=0, column='column_name', value=['col1', 'col2', 'col3', 'col4', 'col5'])
      return result
$$;
Copy

関数を呼び出し、 id で分割します。

-- partition by id
select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
over (partition by id))
order by id, column_name;

--------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |"COLUMN_NAME"  |"COUNT"  |"MEAN"              |"STD"               |"MIN"  |"Q1"   |"MEDIAN"  |"Q3"   |"MAX"  |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
|x     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |280.25999999999993  |339.5609267863427   |4.3    |8.0    |106.4     |541.3  |741.3  |
|x     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |419.25999999999993  |331.72476995244114  |99.4   |207.9  |237.1     |723.3  |828.6  |
|x     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |697.62              |384.2964311569911   |32.6   |714.6  |844.9     |924.3  |971.7  |
|x     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |399.5               |321.2689294033894   |77.3   |168.7  |282.5     |640.6  |828.4  |
|x     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |401.96000000000004  |359.83584173897964  |63.2   |158.1  |397.2     |403.1  |988.2  |
|y     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |520.4               |339.16133329139984  |46.7   |390.0  |589.5     |599.7  |976.1  |
|y     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |371.84              |221.94799616126298  |191.8  |201.0  |244.3     |562.4  |659.7  |
|y     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |689.2               |371.01012789410476  |90.2   |571.1  |863.4     |952.6  |968.7  |
|y     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |635.46              |366.6140927460372   |101.7  |415.1  |788.2     |934.3  |938.0  |
|y     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |594.64              |359.0334218425911   |24.9   |513.7  |696.1     |761.2  |977.3  |
|z     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |534.22              |252.58182238633088  |313.9  |328.3  |487.1     |612.8  |929.0  |
|z     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |521.58              |281.4870103574941   |188.5  |255.4  |643.1     |704.5  |816.4  |
|z     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |667.72              |315.53336907528495  |220.2  |471.5  |766.4     |915.9  |964.6  |
|z     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |539.8199999999999   |318.73025742781306  |148.1  |378.9  |435.4     |857.2  |879.5  |
|z     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |470.82              |99.68626786072393   |331.4  |425.5  |481.2     |519.6  |596.4  |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
Copy

あるいは、関数を呼び出し、テーブル全体を1つのパーティションとして扱います。

-- treat the whole table as one partition
select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
over (partition by 1))
order by id, column_name;

---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |"COLUMN_NAME"  |"COUNT"  |"MEAN"             |"STD"               |"MIN"  |"Q1"                |"MEDIAN"  |"Q3"    |"MAX"  |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |15       |444.96             |314.01110034974425  |4.3    |210.14999999999998  |487.1     |606.25  |976.1  |
|NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |15       |437.56             |268.95505944302295  |99.4   |204.45              |255.4     |682.1   |828.6  |
|NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |15       |684.8466666666667  |331.87254839915937  |32.6   |521.3               |844.9     |938.45  |971.7  |
|NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |15       |524.9266666666666  |327.074780585783    |77.3   |225.6               |435.4     |842.8   |938.0  |
|NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |15       |489.14             |288.9176669671038   |24.9   |364.29999999999995  |481.2     |646.25  |988.2  |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Copy

ベストプラクティス

このセクションでは、ベストプラクティスについて説明します。

  1. パフォーマンスを向上させ、タイムアウトを防ぐために、 pandas.concat を使用して部分的な結果を蓄積することは避けてください。代わりに、準備ができ次第、部分的な結果を出すようにします。たとえば、次の代わりに:

    results = []
    while(...):
      partial_result = pd.DataFrame(...)
      results.append(partial_result)
    return pd.concat(results)
    
    Copy

    次を実行します:

    while(...):
      partial_result = pd.DataFrame(...)
      yield partial_result
    
    Copy
  2. 各行でスカラーを返す必要がある場合は、 numpy 配列を展開してタプルを作成する代わりに、繰り返し値のリストを作成します。たとえば、2列の結果の場合、次の代わりに:

    return tuple(map(lambda n: (scalar_value, n[0], n[1]), results))
    
    Copy

    次を実行します:

    return tuple([scalar_value] * len(results), results[:, 0], results[:, 1])
    
    Copy
  3. パフォーマンスを向上させるには、半構造化データを列に展開します。たとえば、 obj というバリアント列があり、 x(int)y(float)z(string) という要素がある場合、 UDTF を次のような署名で定義する代わりに:

    create function vec_udtf(variant obj)
    
    Copy

    また、 vec_udtf(obj) を使って呼び出す場合は、次の署名で UDTF を定義する必要があります。

    create function vec_udtf(int, float, string)
    
    Copy

    さらに、それを vec_udtf(obj:x, obj:y, obj:z) で呼び出します。

  4. デフォルトでは、Snowflakeは入力を NULL 値をサポートするpandas dtypesにエンコードします(例: Int64)。プリミティブ型(numpy など)を必要とするライブラリを使用していて、入力に NULL 値がない場合は、ライブラリを使用する前に列をプリミティブ型にキャストする必要があります。例:

    input_df['y'] =  input_df['y'].astype("int64")
    
    Copy

    詳細については、 型のサポート をご参照ください。