特徴量のエンジニアリング¶
Snowflake ML では、未加工データを機能に変換し、機械学習モデルで効率的に使用できるようにします。データの変換にはいくつかのアプローチがあり、それぞれ異なる規模や要件に適しています。
オープンソースソフトウェア( OSS )プリプロセッサー - 小規模から中規模のデータセットと迅速なプロトタイピングには、コンテナランタイム内のローカルまたは単一ノード上で実行される使い慣れたPython ML ライブラリを使用します。
Snowflake ML プリプロセッサー - 大規模なデータセットの場合は、Snowflakeプラットフォーム上でネイティブに実行されるSnowflake ML の前処理 APIs を使用します。これらの APIs は、ウェアハウスコンピューティングリソースに処理を分散します。
Ray map_batches - 特に非構造化データで高度にカスタマイズ可能な大規模処理を行うには、単一ノードまたは複数のノードのコンテナランタイム環境でリソース管理された並列実行を使用します。
データサイズ、パフォーマンス要件、カスタム変換ロジックのニーズに最も適したアプローチを選択します。
次の表は、Snowflake ML における特徴量エンジニアリングのための3つの主なアプローチの詳細な比較を示しています。
特徴量/側面 |
OSS (scikit-learnを含む) |
Snowflake ML プリプロセッサー |
Ray |
|---|---|---|---|
スケール |
小規模・中規模データセット |
大規模/分散データ |
大規模/分散データ |
実行環境 |
メモリ内 |
SQL クエリを実行するために使用しているデフォルトのウェアハウスにプッシュダウン |
コンピューティングプール内のノード全体 |
コンピューティングリソース |
Snowpark Container Services(コンピューティングプール) |
ウェアハウス |
Snowpark Container Services(コンピューティングプール) |
統合 |
標準的なPython ML エコシステム |
Snowflake ML とネイティブに統合 |
Python ML とSnowflakeの両方 |
パフォーマンス |
ローカル、インメモリ、スケール制限、非分散型では高速 |
スケーラブルな分散型特徴量エンジニアリングのための設計 |
高度に並列化およびリソース管理され、大規模/非構造化データに最適 |
ユースケースの適合性 |
迅速なプロトタイピングと実験 |
大規模データセットの実稼働ワークフロー |
カスタムリソース制御を必要とする大規模データワークフロー |
以下の例は、それぞれのアプローチによる特徴量変換の実装方法を示しています。
以下のコードを使用して、前処理ワークフローにscikit-learnを実装します。
import pandas as pd
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
# Load your data locally into a Pandas DataFrame
df = pd.DataFrame({
'age': [34, 23, 54, 31],
'city': ['SF', 'NY', 'SF', 'LA'],
'income': [120000, 95000, 135000, 99000]
})
# Define preprocessing steps
numeric_features = ['age', 'income']
numeric_transformer = StandardScaler()
categorical_features = ['city']
categorical_transformer = OneHotEncoder()
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
]
)
pipeline = Pipeline(steps=[
('preprocessor', preprocessor)
])
# Preprocess the data
X_processed = pipeline.fit_transform(df)
print(X_processed)
Snowflake ML プリプロセッサーは、Snowflake内で分散変換を直接処理します。これらのプリプロセッサーは、ウェアハウス全体でスケールするようにプッシュダウンされます。大規模なデータセットと実稼働ワークロードには、Snowflake ML プリプロセッサーを使用します。
注釈
Snowflake ML プリプロセッサーは、sci-kit learnで利用可能なプリプロセッサーのサブセットですが、最も一般的なユースケースをカバーしています。利用可能なプリプロセッサーについては、 Snowflake ML モデリング前処理 をご参照ください。
以下のコードは、 StandardScaler と:code:OneHotEncoder ライブラリを使用します。
from snowflake.snowpark import Session
from snowflake.ml.modeling.preprocessing import StandardScaler, OneHotEncoder
from snowflake.ml.modeling.pipeline import Pipeline
# Assume your Snowflake connection details are configured
session = Session.builder.configs(...).create()
# Load your data from a Snowflake table as a DataFrame
df = session.table('CUSTOMER_DATA')
# Define Snowflake ML preprocessors
scaler = StandardScaler(input_cols=['AGE', 'INCOME'], output_cols=['AGE_SCALED', 'INCOME_SCALED'])
encoder = OneHotEncoder(input_cols=['CITY'], output_cols=['CITY_ENCODED'])
pipeline = Pipeline(steps=[
('scaling', scaler),
('encoding', encoder)
])
# Fit and transform data in Snowflake (distributed)
result = pipeline.fit_transform(df)
result.show()
カスタム変換による分散並列処理にはRayを使用します。Ray map_batches は遅延実行を使用します。つまり、データセットを実体化するまで処理が行われないため、メモリ使用量を減らすことができます。このアプローチは、カスタムロジックによる大規模なデータ処理に最適です。
import ray
from snowflake.ml.ray.datasource.stage_parquet_file_datasource import SFStageParquetDataSource
from snowflake.ml.data.data_connector import DataConnector
# Example for data transform
def preprocess_batch(batch: pd.DataFrame) -> pd.DataFrame:
batch['AGE_SCALED'] = (batch['age'] - batch['age'].mean()) / batch['age'].std()
return batch
# Example of filtering
def filter_by_value(row):
return row['city'] != 'LA'
# Build Ray dataset from provided datasources
ray_ds = ray.data.read_datasource(data_source)
# Setup filter operations, not executed yet
filtered_ds = ray_ds.filter(filter_by_value)
transformed_ds = filtered_ds.map_batches(example_transform_batch_function)
# Create DataConnector directly from ray dataset
data_connector = DataConnector.from_ray_dataset(transformed_ds)