공통적인 특징 및 쿼리 패턴

참고

Snowflake Feature Store API는 Snowpark ML Python 패키지(snowflake-ml-python) v1.5.0 이상에서 사용할 수 있습니다.

FeatureView 클래스는 기능 변환 논리가 포함된 Snowpark DataFrame 오브젝트를 받습니다. 따라서 Snowpark DataFrame API 또는 Snowflake SQL에서 지원하는 모든 방식으로 기능을 설명할 수 있습니다. DataFrame을 FeatureView 생성자에 직접 전달할 수 있습니다.

Snowpark Python API는 윈도우 집계와 같은 여러 가지 일반적인 함수 유형을 쉽게 정의할 수 있는 분석 함수 를 제공합니다. 이 항목에는 이에 대한 몇 가지 예제가 포함되어 있습니다.

Github의 오픈 소스 snowflake-ml-python 에는 공개 데이터 세트를 사용한 몇 가지 샘플 기능 뷰와 엔터티 정의도 포함되어 있습니다.

행별 기능

행별 기능에서 함수는 테이블 형식 데이터의 각 행에 적용됩니다. 예를 들어, 다음 코드는 foo 의 null을 0으로 채운 다음 latlong 에서 ZIP 코드를 계산합니다. 입력 행마다 출력 행이 하나씩 있습니다.

Python:

def get_zipcode(df: snowpark.DataFrame) -> snowpark.DataFrame:
    df = df.fillna({"foo": 0})
    df = df.with_column(
        "zipcode",
        F.compute_zipcode(df["lat"], df["long"])
    )
    return df
Copy

Snowflake SQL:

SELECT
    COALESCE(foo, 0) AS foo,
    compute_zipcode(lat, long) AS zipcode
FROM <source_table_name>;
Copy

그룹별 기능

그룹별 기능은 그룹 내의 열에 있는 값을 집계합니다. 예를 들어, 일기 예보를 위해 일일 강수량의 합계를 도시별로 그룹화할 수 있습니다. 출력 DataFrame에는 그룹당 하나의 행이 있습니다.

Python:

def sum_rainfall(df: snowpark.DataFrame) -> snowpark.DataFrame:
    df = df.group_by(
        ["location", to_date(timestamp)]
    ).agg(
        sum("rain").alias("sum_rain"),
        avg("humidity").alias("avg_humidity")
    )
    return df
Copy

Snowflake SQL:

SELECT
    location,
    TO_DATE(timestamp) AS date,
    SUM(rain) AS sum_rain,
    AVG(humidity) AS avg_humidity
FROM <source_table_name>
GROUP BY location, date;
Copy

행 기반 윈도우 기능

행 기반 윈도우는 고정된 행 윈도우에 대한 집계 값을 제공합니다(예: 최근 3개 트랜잭션 금액 합산). 출력 DataFrame에는 윈도우 프레임당 하나의 행이 있습니다.

Python:

def sum_past_3_transactions(df: snowpark.DataFrame) -> snowpark.DataFrame:
    window = Window.partition_by("id").order_by("ts").rows_between(2, Window.CURRENT_ROW)

    return df.select(
        sum("amount").over(window).alias("sum_past_3_transactions")
    )
Copy

Snowflake SQL:

SELECT
    id,
    SUM(amount) OVER (PARTITION BY id ORDER BY ts ROWS BETWEEN 2 PRECEDING and 0 FOLLOWING)
        AS sum_past_3_transactions
FROM <source_table_name>;
Copy

이동 집계 기능

이동 집계 기능은 지정된 윈도우 크기 내에서 합계, 평균 등의 이동 통계를 계산합니다. 이 함수는 정의된 윈도우 크기, 순서 및 그룹화에 따라 DataFrame의 여러 하위 세트에 걸쳐 이러한 집계를 동적으로 계산합니다. 출력 DataFrame에는 윈도우 프레임당 하나의 행이 있습니다.

new_df =  df.analytics.moving_agg(
    aggs={"SALESAMOUNT": ["SUM", "AVG"]},
    window_sizes=[2, 3],
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"]
)
Copy

누적 집계 기능

누적 집계는 지정된 대로 정렬 및 그룹화되어 있는 데이터 파티션에서 지속적인 총계, 최소값, 최대값 및 기타 누적 통계를 계산합니다. 이동 집계와 달리, 이러한 합계는 지정된 방향에 따라 분할의 시작부터 끝까지 확장되어 재설정되지 않는 실행 합계를 제공합니다. 출력 DataFrame에는 입력 행당 하나의 행이 있습니다.

 new_df = df.analytics.cumulative_agg(
    aggs={"SALESAMOUNT": ["SUM", "MIN", "MAX"]},
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"],
    is_forward=True
)
Copy

지연 기능

지연 기능은 각 파티션 내의 이전 행의 값을 포함하는 새로운 열을 도입하며, 지정된 수의 행으로 오프셋됩니다. 이 함수는 데이터 세트의 현재 값을 이전 값과 비교하는 데 중요하며, 그러므로 시간에 따른 추세나 변화를 감지하는 데 도움이 됩니다. 출력 DataFrame에는 입력 행당 하나의 행이 있습니다.

new_df = df.analytics.compute_lag(
    cols=["SALESAMOUNT"],
    lags=[1, 2],
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"]
)
Copy

리드 기능

지연 기능의 반대인 리드 기능은 후속 행의 값을 포함하는 새 열을 생성하여 데이터를 위쪽으로 이동시킵니다. 이 기능은 데이터 세트에 이미 존재하는 미래 데이터 포인트를 기반으로 예측 또는 가정을 하는 데 필수적입니다. 출력 DataFrame에는 입력 행당 하나의 행이 있습니다.

new_df = df.analytics.compute_lead(
    cols=["SALESAMOUNT"],
    leads=[1, 2],
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"]
)
Copy

시계열 기능

시계열 기능은 시간 윈도우와 시간 축을 따라 고정된 위치를 기반으로 기능 값을 계산합니다. 그러한 예로는 지난주 승차 공유 횟수나 지난 3일 동안의 매출 합계 등이 있습니다. 출력 DataFrame에는 시간 윈도우당 하나의 행이 있습니다.

최근 버전의 Snowflake 기능 스토어에는 실험적인 시계열 집계 API가 포함되어 있습니다. 이 API를 사용하면 다음과 같은 코드를 사용하여 시계열 기능을 생성할 수 있습니다.

Python:

def custom_column_naming(input_col, agg, window):
    return f"{agg}_{input_col}_{window.replace('-', 'past_')}"

result_df = weather_df.analytics.time_series_agg(
    aggs={"rain": ["SUM"]},
    windows=["-3D", "-5D"],
    sliding_interval="1D",
    group_by=["location"],
    time_col="ts",
    col_formatter=custom_column_naming
)
Copy

SQL에서 RANGE BETWEEN 구문을 사용하여 시계열 함수를 구성할 수도 있습니다. 자세한 내용은 Snowflake Window 함수 를 참조하십시오.

Snowflake SQL:

select
    TS,
    LOCATION,
    sum(RAIN) over (
        partition by LOCATION
        order by TS
        range between interval '3 days' preceding and current row
    ) SUM_RAIN_3D,
    sum(RAIN) over (
        partition by LOCATION
        order by TS
        range between interval '5 days' preceding and current row
    ) SUM_RAIN_5D
from <source_table_name>
Copy

함수 파이프라인에서 사용자 정의 함수 사용하기

Snowflake Feature Store는 기능 파이프라인 정의에서 사용자 정의 함수(UDFs)를 지원합니다. 그러나 결정적 함수(동일한 입력에 대해 항상 동일한 결과를 반환하는 함수)만 점진적으로 유지 관리될 수 있습니다. 증분 유지 관리를 사용하려면 UDF를 등록할 때 변경 불가로 표시합니다.

# In Python
@F.udf(
    name="MY_UDF",
    immutable=True,
    # ...
)
def my_udf(...):
    # ...
Copy

함수가 SQL로 작성된 경우 IMMUTABLE 키워드를 지정합니다. 이 가이드 를 참조하십시오.