一般的な機能とクエリのパターン

注釈

Snowflake Feature Store API は、Snowpark ML Pythonパッケージ(snowflake-ml-python)v1.5.0以降で利用可能です。

FeatureView クラスは、機能変換ロジックを含むSnowpark DataFrame オブジェクトを受け入れます。したがって、Snowpark DataFrame APIまたはSnowflake SQLでサポートされている任意の方法で機能を記述することができます。返された DataFrame を FeatureView コンストラクタに直接渡すことができます。

Snowpark Python API は、ウィンドウ集計などの多くの一般的なフィーチャタイプを簡単に定義するために、 ユーティリティ関数 を提供します。このトピックでは、これらの例をいくつかご紹介します。

Github上のオープンソース snowflake-ml-python には、公開データセットを使用した機能ビューとエンティティ定義のサンプルも含まれています。

行ごとの機能

行ごとの機能では、表形式データの各行に関数が適用されます。例えば、次のコードは foo のnullをゼロで埋め、 latlong から ZIP コードを計算します。入力行1つにつき出力行が1つあります。

Python:

def get_zipcode(df: snowpark.DataFrame) -> snowpark.DataFrame:
    df = df.fillna({"foo": 0})
    df = df.with_column(
        "zipcode",
        F.compute_zipcode(df["lat"], df["long"])
    )
    return df
Copy

Snowflake SQL:

SELECT
    COALESCE(foo, 0) AS foo,
    compute_zipcode(lat, long) AS zipcode
FROM <source_table_name>;
Copy

グループごとの機能

グループごとの機能は、グループ内の列の値を集計します。例: 天気予報のために都市ごとにグループ化された毎日の降雨量の合計。出力DataFrameは1グループにつき1行です。

Python:

def sum_rainfall(df: snowpark.DataFrame) -> snowpark.DataFrame:
    df = df.group_by(
        ["location", to_date(timestamp)]
    ).agg(
        sum("rain").alias("sum_rain"),
        avg("humidity").alias("avg_humidity")
    )
    return df
Copy

Snowflake SQL:

SELECT
    location,
    TO_DATE(timestamp) AS date,
    SUM(rain) AS sum_rain,
    AVG(humidity) AS avg_humidity
FROM <source_table_name>
GROUP BY location, date;
Copy

行ベースのウィンドウ機能

行ベースのウィンドウ機能では、例えば過去3回の取引金額の合計など、固定ウィンドウの行に渡って値を集計します。出力 DataFrame は1ウィンドウフレームにつき1行です。

Python:

def sum_past_3_transactions(df: snowpark.DataFrame) -> snowpark.DataFrame:
    window = Window.partition_by("id").order_by("ts").rows_between(2, Window.CURRENT_ROW)

    return df.select(
        sum("amount").over(window).alias("sum_past_3_transactions")
    )
Copy

Snowflake SQL:

SELECT
    id,
    SUM(amount) OVER (PARTITION BY id ORDER BY ts ROWS BETWEEN 2 PRECEDING and 0 FOLLOWING)
        AS sum_past_3_transactions
FROM <source_table_name>;
Copy

移動集計機能

移動集計機能は、指定されたウィンドウサイズ内で合計や平均などの移動統計を計算します。この関数は、定義されたウィンドウのサイズ、順序、およびグループ化に基づいて、DataFrameのさまざまなサブセットにわたってこれらの集計を動的に計算します。出力 DataFrame は1ウィンドウフレームにつき1行です。

new_df =  df.analytics.moving_agg(
    aggs={"SALESAMOUNT": ["SUM", "AVG"]},
    window_sizes=[2, 3],
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"]
)
Copy

累積集計機能

累積集計は、データパーティション全体の継続的な合計、最小値、最大値、およびその他の累積統計値を計算し、指定に従ってソートおよびグループ化されます。移動集計とは異なり、これらの合計は、指定された方向によって、パーティションの開始または終了まで拡張され、リセットされない累積合計を提供します。出力 DataFrame は1グループにつき1行です。

 new_df = df.analytics.cumulative_agg(
    aggs={"SALESAMOUNT": ["SUM", "MIN", "MAX"]},
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"],
    is_forward=True
)
Copy

ラグ機能

ラグ機能は、各パーティション内の前の行の値を含む新しい列を、指定された行数だけオフセットして導入します。この関数は、データセットの過去の値と現在の値を比較し、時間の経過に伴う傾向や変化を検出するために重要です。出力 DataFrame は1グループにつき1行です。

new_df = df.analytics.compute_lag(
    cols=["SALESAMOUNT"],
    lags=[1, 2],
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"]
)
Copy

リード機能

ラグ機能の逆であるリード機能は、後続の行の値を含む新しい列を作成し、データを上方にシフトします。この機能は、データセットに既に存在する将来のデータポイントに基づいて予測や仮定を行うために不可欠です。出力 DataFrame は1グループにつき1行です。

new_df = df.analytics.compute_lead(
    cols=["SALESAMOUNT"],
    leads=[1, 2],
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"]
)
Copy

時系列機能

時系列機能は、時間軸に沿った固定位置と時間ウィンドウに基づいて機能値を計算します。例えば、ライドシェアの過去1週間の移動回数や、過去3日間の売上合計などです。出力 DataFrame は1タイムウィンドウにつき1行です。

Snowflake Feature Storeの最新バージョンには、実験的な時系列集計APIが含まれています。この API を使用すると、以下のようなコードで時系列機能を作成することができます。:

Python:

def custom_column_naming(input_col, agg, window):
    return f"{agg}_{input_col}_{window.replace('-', 'past_')}"

result_df = weather_df.analytics.time_series_agg(
    aggs={"rain": ["SUM"]},
    windows=["-3D", "-5D"],
    sliding_interval="1D",
    group_by=["location"],
    time_col="ts",
    col_formatter=custom_column_naming
)
Copy

SQL の RANGE BETWEEN 構文で時系列機能を構築することもできます。詳細は、 Snowflake Window 関数 をご参照ください。

Snowflake SQL:

select
    TS,
    LOCATION,
    sum(RAIN) over (
        partition by LOCATION
        order by TS
        range between interval '3 days' preceding and current row
    ) SUM_RAIN_3D,
    sum(RAIN) over (
        partition by LOCATION
        order by TS
        range between interval '5 days' preceding and current row
    ) SUM_RAIN_5D
from <source_table_name>
Copy

機能パイプラインでのユーザー定義関数の使用

Snowflake Feature Storeは、機能パイプライン定義のユーザー定義関数(UDFs)をサポートしています。ただし、増分にメンテナンスできるのは決定論的関数(同じ入力に対して常に同じ結果を返す関数)のみです。増分メンテナンスを可能にするには、UDFを登録する際に、不変とマークします。

# In Python
@F.udf(
    name="MY_UDF",
    immutable=True,
    # ...
)
def my_udf(...):
    # ...
Copy

関数がSQLで記述されている場合は、IMMUTABLEキーワードを指定します。 このガイド をご参照ください。