공통적인 특징 및 쿼리 패턴¶
참고
Snowflake Feature Store API는 Snowpark ML Python 패키지(snowflake-ml-python
) v1.5.0 이상에서 사용할 수 있습니다.
FeatureView
클래스는 기능 변환 논리가 포함된 Snowpark DataFrame 오브젝트를 받습니다. 따라서 Snowpark DataFrame API 또는 Snowflake SQL에서 지원하는 모든 방식으로 기능을 설명할 수 있습니다. DataFrame을 FeatureView
생성자에 직접 전달할 수 있습니다.
Snowpark Python API는 윈도우 집계와 같은 여러 가지 일반적인 함수 유형을 쉽게 정의할 수 있는 분석 함수 를 제공합니다. 이 항목에는 이에 대한 몇 가지 예제가 포함되어 있습니다.
Github의 오픈 소스 snowflake-ml-python 에는 공개 데이터 세트를 사용한 몇 가지 샘플 기능 뷰와 엔터티 정의도 포함되어 있습니다.
행별 기능¶
행별 기능에서 함수는 테이블 형식 데이터의 각 행에 적용됩니다. 예를 들어, 다음 코드는 foo
의 null을 0으로 채운 다음 lat
및 long
에서 ZIP 코드를 계산합니다. 입력 행마다 출력 행이 하나씩 있습니다.
Python:
def get_zipcode(df: snowpark.DataFrame) -> snowpark.DataFrame:
df = df.fillna({"foo": 0})
df = df.with_column(
"zipcode",
F.compute_zipcode(df["lat"], df["long"])
)
return df
Snowflake SQL:
SELECT
COALESCE(foo, 0) AS foo,
compute_zipcode(lat, long) AS zipcode
FROM <source_table_name>;
그룹별 기능¶
그룹별 기능은 그룹 내의 열에 있는 값을 집계합니다. 예를 들어, 일기 예보를 위해 일일 강수량의 합계를 도시별로 그룹화할 수 있습니다. 출력 DataFrame에는 그룹당 하나의 행이 있습니다.
Python:
def sum_rainfall(df: snowpark.DataFrame) -> snowpark.DataFrame:
df = df.group_by(
["location", to_date(timestamp)]
).agg(
sum("rain").alias("sum_rain"),
avg("humidity").alias("avg_humidity")
)
return df
Snowflake SQL:
SELECT
location,
TO_DATE(timestamp) AS date,
SUM(rain) AS sum_rain,
AVG(humidity) AS avg_humidity
FROM <source_table_name>
GROUP BY location, date;
행 기반 윈도우 기능¶
행 기반 윈도우는 고정된 행 윈도우에 대한 집계 값을 제공합니다(예: 최근 3개 트랜잭션 금액 합산). 출력 DataFrame에는 윈도우 프레임당 하나의 행이 있습니다.
Python:
def sum_past_3_transactions(df: snowpark.DataFrame) -> snowpark.DataFrame:
window = Window.partition_by("id").order_by("ts").rows_between(2, Window.CURRENT_ROW)
return df.select(
sum("amount").over(window).alias("sum_past_3_transactions")
)
Snowflake SQL:
SELECT
id,
SUM(amount) OVER (PARTITION BY id ORDER BY ts ROWS BETWEEN 2 PRECEDING and 0 FOLLOWING)
AS sum_past_3_transactions
FROM <source_table_name>;
이동 집계 기능¶
이동 집계 기능은 지정된 윈도우 크기 내에서 합계, 평균 등의 이동 통계를 계산합니다. 이 함수는 정의된 윈도우 크기, 순서 및 그룹화에 따라 DataFrame의 여러 하위 세트에 걸쳐 이러한 집계를 동적으로 계산합니다. 출력 DataFrame에는 윈도우 프레임당 하나의 행이 있습니다.
new_df = df.analytics.moving_agg(
aggs={"SALESAMOUNT": ["SUM", "AVG"]},
window_sizes=[2, 3],
order_by=["ORDERDATE"],
group_by=["PRODUCTKEY"]
)
누적 집계 기능¶
누적 집계는 지정된 대로 정렬 및 그룹화되어 있는 데이터 파티션에서 지속적인 총계, 최소값, 최대값 및 기타 누적 통계를 계산합니다. 이동 집계와 달리, 이러한 합계는 지정된 방향에 따라 분할의 시작부터 끝까지 확장되어 재설정되지 않는 실행 합계를 제공합니다. 출력 DataFrame에는 입력 행당 하나의 행이 있습니다.
new_df = df.analytics.cumulative_agg(
aggs={"SALESAMOUNT": ["SUM", "MIN", "MAX"]},
order_by=["ORDERDATE"],
group_by=["PRODUCTKEY"],
is_forward=True
)
지연 기능¶
지연 기능은 각 파티션 내의 이전 행의 값을 포함하는 새로운 열을 도입하며, 지정된 수의 행으로 오프셋됩니다. 이 함수는 데이터 세트의 현재 값을 이전 값과 비교하는 데 중요하며, 그러므로 시간에 따른 추세나 변화를 감지하는 데 도움이 됩니다. 출력 DataFrame에는 입력 행당 하나의 행이 있습니다.
new_df = df.analytics.compute_lag(
cols=["SALESAMOUNT"],
lags=[1, 2],
order_by=["ORDERDATE"],
group_by=["PRODUCTKEY"]
)
리드 기능¶
지연 기능의 반대인 리드 기능은 후속 행의 값을 포함하는 새 열을 생성하여 데이터를 위쪽으로 이동시킵니다. 이 기능은 데이터 세트에 이미 존재하는 미래 데이터 포인트를 기반으로 예측 또는 가정을 하는 데 필수적입니다. 출력 DataFrame에는 입력 행당 하나의 행이 있습니다.
new_df = df.analytics.compute_lead(
cols=["SALESAMOUNT"],
leads=[1, 2],
order_by=["ORDERDATE"],
group_by=["PRODUCTKEY"]
)
시계열 기능¶
시계열 기능은 시간 윈도우와 시간 축을 따라 고정된 위치를 기반으로 기능 값을 계산합니다. 그러한 예로는 지난주 승차 공유 횟수나 지난 3일 동안의 매출 합계 등이 있습니다. 출력 DataFrame에는 시간 윈도우당 하나의 행이 있습니다.
최근 버전의 Snowflake 기능 스토어에는 실험적인 시계열 집계 API가 포함되어 있습니다. 이 API를 사용하면 다음과 같은 코드를 사용하여 시계열 기능을 생성할 수 있습니다.
Python:
def custom_column_naming(input_col, agg, window):
return f"{agg}_{input_col}_{window.replace('-', 'past_')}"
result_df = weather_df.analytics.time_series_agg(
aggs={"rain": ["SUM"]},
windows=["-3D", "-5D"],
sliding_interval="1D",
group_by=["location"],
time_col="ts",
col_formatter=custom_column_naming
)
SQL에서 RANGE BETWEEN 구문을 사용하여 시계열 함수를 구성할 수도 있습니다. 자세한 내용은 Snowflake Window 함수 를 참조하십시오.
Snowflake SQL:
select
TS,
LOCATION,
sum(RAIN) over (
partition by LOCATION
order by TS
range between interval '3 days' preceding and current row
) SUM_RAIN_3D,
sum(RAIN) over (
partition by LOCATION
order by TS
range between interval '5 days' preceding and current row
) SUM_RAIN_5D
from <source_table_name>
함수 파이프라인에서 사용자 정의 함수 사용하기¶
Snowflake Feature Store는 기능 파이프라인 정의에서 사용자 정의 함수(UDFs)를 지원합니다. 그러나 결정적 함수(동일한 입력에 대해 항상 동일한 결과를 반환하는 함수)만 점진적으로 유지 관리될 수 있습니다. 증분 유지 관리를 사용하려면 UDF를 등록할 때 변경 불가로 표시합니다.
# In Python
@F.udf(
name="MY_UDF",
immutable=True,
# ...
)
def my_udf(...):
# ...
함수가 SQL로 작성된 경우 IMMUTABLE 키워드를 지정합니다. 이 가이드 를 참조하십시오.