기능 엔지니어링¶
Snowflake ML을 사용하면 원시 데이터를 기능으로 변환하여 머신 러닝 모델에서 효율적으로 사용할 수 있습니다. 다양한 규모와 요구 사항에 적합한 여러 가지 접근 방식을 사용하여 데이터를 변환할 수 있습니다.
오픈 소스 소프트웨어(OSS) 전처리기 - 중소 규모의 데이터 세트와 빠른 프로토타이핑의 경우 로컬에서 또는 Container Runtime 내의 단일 노드에서 실행되는 친숙한 Python ML 라이브러리를 사용합니다.
Snowflake ML 전처리기 - 더 큰 데이터 세트의 경우 Snowflake 플랫폼에서 기본적으로 실행되는 Snowflake ML의 전처리 APIs를 사용합니다. 이러한 APIs는 웨어하우스 컴퓨팅 리소스 전체에 처리를 분산합니다.
Ray map_batches - 특히 비정형 데이터의 경우 사용자 지정이 가능한 대규모 처리를 위해 단일 노드 또는 다중 노드 Container Runtime 환경에서 병렬 리소스 관리형 실행을 사용합니다.
데이터 크기, 성능 요구 사항, 사용자 지정 변환 논리에 대한 요구에 가장 적합한 접근 방식을 선택합니다.
다음 테이블은 Snowflake ML의 기능 엔지니어링을 위한 세 가지 주요 접근 방식을 자세히 비교해서 설명합니다.
기능 및 측면 |
OSS(scikit-learn 포함) |
Snowflake ML 전처리기 |
Ray |
|---|---|---|---|
소수 자릿수 |
중소 규모 데이터 세트 |
대규모 데이터 및 분산 데이터 |
대규모 데이터 및 분산 데이터 |
실행 환경 |
메모리 내 |
SQL 쿼리를 실행하는 데 사용하는 기본 웨어하우스로 푸시다운 |
컴퓨팅 풀의 노드 간 |
컴퓨팅 리소스 |
Snowpark Container Services(컴퓨팅 풀) |
웨어하우스 |
Snowpark Container Services(컴퓨팅 풀) |
통합 |
표준 Python ML 생태계 |
Snowflake ML과 기본적으로 통합 |
Python ML 및 Snowflake 둘 다와 통합 |
성능 |
로컬, 메모리 내 워크로드의 경우 빠르며, 규모가 제한된 비분산형 |
확장 가능한 분산 기능 엔지니어링을 위해 설계 |
뛰어난 병렬 처리 및 리소스 관리형, 대규모 데이터 및 비정형 데이터에 탁월 |
사용 사례 적합성 |
신속한 프로토타이핑 및 실험 |
대규모 데이터 세트가 있는 프로덕션 워크플로 |
사용자 지정 리소스 제어가 필요한 대규모 데이터 워크플로 |
다음 예는 각 접근 방식을 사용하여 기능 변환을 구현하는 방법을 보여줍니다.
다음 코드를 사용하여 전처리 워크플로를 위한 scikit-learn을 구현합니다.
import pandas as pd
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
# Load your data locally into a Pandas DataFrame
df = pd.DataFrame({
'age': [34, 23, 54, 31],
'city': ['SF', 'NY', 'SF', 'LA'],
'income': [120000, 95000, 135000, 99000]
})
# Define preprocessing steps
numeric_features = ['age', 'income']
numeric_transformer = StandardScaler()
categorical_features = ['city']
categorical_transformer = OneHotEncoder()
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
]
)
pipeline = Pipeline(steps=[
('preprocessor', preprocessor)
])
# Preprocess the data
X_processed = pipeline.fit_transform(df)
print(X_processed)
Snowflake ML 전처리기는 Snowflake 내에서 직접 분산 변환을 처리합니다. 이러한 전처리기는 웨어하우스 전체에서 확장하기 위해 푸시다운됩니다. 대규모 데이터 세트 및 프로덕션 워크로드를 위해 Snowflake ML 전처리기를 사용합니다.
참고
Snowflake ML 전처리기는 sci-kit Learn에서 사용할 수 있는 전처리기의 하위 세트이지만, 가장 일반적인 사용 사례를 다룹니다. 사용 가능한 전처리기에 대한 자세한 내용은 `Snowflake ML 모델링 전처리<https://docs.snowflake.com/en/developer-guide/snowpark-ml/reference/latest/modeling#snowflake-ml-modeling-preprocessing>`_를 참조하세요.
다음 코드는 StandardScaler 및 OneHotEncoder 라이브러리를 사용합니다.
from snowflake.snowpark import Session
from snowflake.ml.modeling.preprocessing import StandardScaler, OneHotEncoder
from snowflake.ml.modeling.pipeline import Pipeline
# Assume your Snowflake connection details are configured
session = Session.builder.configs(...).create()
# Load your data from a Snowflake table as a DataFrame
df = session.table('CUSTOMER_DATA')
# Define Snowflake ML preprocessors
scaler = StandardScaler(input_cols=['AGE', 'INCOME'], output_cols=['AGE_SCALED', 'INCOME_SCALED'])
encoder = OneHotEncoder(input_cols=['CITY'], output_cols=['CITY_ENCODED'])
pipeline = Pipeline(steps=[
('scaling', scaler),
('encoding', encoder)
])
# Fit and transform data in Snowflake (distributed)
result = pipeline.fit_transform(df)
result.show()
사용자 지정 변환이 포함된 분산 병렬 처리에는 Ray를 사용합니다. Ray ``map_batches``는 지연 실행을 사용합니다. 즉, 사용자가 데이터 세트를 구체화할 때까지 처리가 수행되지 않으므로 메모리 사용량을 줄이는 데 도움이 됩니다. 이 접근 방식은 사용자 지정 논리를 사용한 대규모 데이터 처리에 이상적입니다.
import ray
from snowflake.ml.ray.datasource.stage_parquet_file_datasource import SFStageParquetDataSource
from snowflake.ml.data.data_connector import DataConnector
# Example for data transform
def preprocess_batch(batch: pd.DataFrame) -> pd.DataFrame:
batch['AGE_SCALED'] = (batch['age'] - batch['age'].mean()) / batch['age'].std()
return batch
# Example of filtering
def filter_by_value(row):
return row['city'] != 'LA'
# Build Ray dataset from provided datasources
ray_ds = ray.data.read_datasource(data_source)
# Setup filter operations, not executed yet
filtered_ds = ray_ds.filter(filter_by_value)
transformed_ds = filtered_ds.map_batches(example_transform_batch_function)
# Create DataConnector directly from ray dataset
data_connector = DataConnector.from_ray_dataset(transformed_ds)