データパーティション全体でモデルをトレーニングする¶
多モデルトレーニング(MMT)を使用して、データパーティション全体で複数の機械学習モデルを効率的にトレーニングします。分散オーケストレーション、モデルストレージ、アーティファクトの永続化を自動的に処理します。
MMT は、指定された列によってSnowpark DataFrame をパーティション化し、各パーティションの個別のモデルを並行してトレーニングします。モデルトレーニングロジックに焦点を当てる一方、MMT はインフラストラクチャの複雑さを処理し、自動的にスケーリングします。
MMT を使用することで、さまざまなデータセグメントにわたって複数のモデルを効率的にトレーニングできます。このツールは、地域固有の売上予測モデルのトレーニング、各顧客グループが独自のモデルを必要とするパーソナライズされた推奨システムの構築、セグメント固有の予測モデルの作成などのシナリオに最適です。 MMT は、分散モデルのトレーニングを自動的に処理するため、複雑な分散コンピューティングインフラストラクチャの管理が不要になります。
MMT では、XGBoost、scikit-learn、PyTorch、TensorFlow などのオープンソース機械学習モデルやフレームワークを使用してモデルをトレーニングできます。 MMT はモデルのアーキテクトを自動的にシリアル化するので、推論時にアクセスすることができます。
ModelSerde インターフェースを実装してカスタムモデルをトレーニングしたり、サポートされていない ML フレームワークを使用することもできます。これにより、MMT を自分が使用する任意の機械学習フレームワークやカスタムモデルアーキテクチャと統合できます。
重要
MMT を使い始める前に、以下をご確認ください。
コンテナランタイム環境: MMT には、Snowflake ML コンテナランタイム環境が必要です。
ステージアクセス許可: MMT は、モデルアーティファクトをSnowflakeステージに自動的に保管します。指定した名前付きステージにアクセスするための適切な権限を持っていることを確認します。
ML フレームワークのサポート:組み込みの統合を XGBoost、scikit-learn、PyTorch、TensorFlow に利用できます。カスタムモデルの場合は、ModelSerde インターフェースを実装します。
次のセクションでは、ワークフローの例を使って MMT の使用について説明します。
MMT を使用したモデルのトレーニング¶
このセクションでは、完全な MMT ワークフローを5つの主要ステップで説明します。
データのインポート - Snowparkを使用してトレーニングデータをロードする
トレーニング関数の定義 - トレーニング関数を定義する
パーティション全体でモデルのトレーニング - MMT を使用して各パーティションのモデルを並行してトレーニングする
トレーニング済みモデルへのアクセス - 各パーティションのトレーニング済みモデルを取得して使用する
モデルの永続性と取得 - モデルをステージに保存し、後で復元する
このワークフローでは、データパーティション全体での分散トレーニング、モデルのシリアル化、アーティファクトストレージを自動的に処理します。
データのインポート¶
Snowparkセッションを使用して、データのインポートを開始します。多モデルトレーニング関数が、指定した列を使用して、インポートするデータを異なるパーティションに分割します。
MMT を使用する前に、Snowparkセッションを作成します。詳細情報については、Snowpark Pythonのセッションの作成 をご参照ください。
次のコードでは、Snowparkのセッションを使用してトレーニングデータをインポートします。
# Example: sales_data with columns: region, feature1, feature2, feature3, target
sales_data = session.table("SALES_TRAINING_DATA")
トレーニング関数の定義¶
データを取得した後、MMT がパーティション全体でのモデルトレーニングに使用するトレーニング関数を定義します。トレーニング関数は、データコネクタと、トレーニング中のデータパーティションをポイントするコンテキストオブジェクトを受け取ります。このセクションには、TensorFlow と PyTorch を活用する例に加えて、XGBoost モデルをトレーニングするためのトレーニング関数定義の例があります。
トレーニング関数には、この (data_connector, context)
という正確な署名が必要です。データパーティションごとに、MMT は 次の引数を使用して train_xgboost_model
を呼び出します。
data_connector
:MMT が分割するデータへのアクセスを提供するデータコネクタです。train_xgboost_model
はそのデータフレームを pandas に変換します。context
:partition_id
をtrain_xgboost_model
関数に提供するオブジェクトです。この ID は、パーティション分割する列の名前です。
この関数を自分で呼び出すことはありません。MMT は、すべてのパーティションでの実行を処理します。
以下のコードを使用して、トレーニング関数を定義します。データの特徴量を反映するようにコードを変更したら、MMT 関数に渡すことができます。
XGBoost を使用してデータパーティション全体でモデルをトレーニングします。XGBoost は構造化データに優れたパフォーマンスを提供し、欠損値を自動的に処理します。
def train_xgboost_model(data_connector, context):
df = data_connector.to_pandas()
print(f"Training model for partition: {context.partition_id}")
# Prepare features and target
X = df[['feature1', 'feature2', 'feature3']]
y = df['target']
# Train the model
from xgboost import XGBRegressor
model = XGBRegressor(
n_estimators=100,
max_depth=6,
learning_rate=0.1,
random_state=42
)
model.fit(X, y)
return model
trainer = ManyModelTraining(train_xgboost_model, "model_stage")
PyTorch を使用してデータパーティション全体でディープラーニングモデルをトレーニングします。PyTorch は、柔軟なニュートラルネットワークアーキテクチャと動的計算グラフを提供します。
def train_pytorch_model(data_connector, context):
import torch
import torch.nn as nn
df = data_connector.to_pandas()
# ... prepare data for PyTorch ...
model = nn.Sequential(nn.Linear(10, 1))
# ... training logic ...
return model # Automatically saved as model.pth
from snowflake.ml.modeling.distributors.many_model import TorchSerde
trainer = ManyModelTraining(train_pytorch_model, "models_stage", serde=TorchSerde())
TensorFlow を使用してデータパーティション全体でディープラーニングモデルをトレーニングします。TensorFlow は調査と実稼働展開の両方に役立つ包括的ツールを提供します。
def train_tf_model(data_connector, context):
import tensorflow as tf
df = data_connector.to_pandas()
# ... prepare data for TensorFlow ...
model = tf.keras.Sequential([tf.keras.layers.Dense(1)])
# ... training logic ...
return model # Automatically saved as model.h5
from snowflake.ml.modeling.distributors.many_model import TensorFlowSerde
trainer = ManyModelTraining(train_tf_model, "models_stage", serde=TensorFlowSerde())
カスタムモデルまたはサポートされていない ML フレームワークを使用するには、ModelSerde インターフェースを実装します。この例は、カスタムメタデータ処理を使用したscikit-learnを示しています。
from snowflake.ml.modeling.distributors.many_model import ModelSerde
import json
class ScikitLearnSerde(ModelSerde):
'''Custom serializer for scikit-learn models with metadata'''
@property
def filename(self) -> str:
return "sklearn_model.joblib"
def write(self, model, file_path: str) -> None:
import joblib
# Save model with metadata
model_data = {
'model': model,
'feature_names': getattr(model, 'feature_names_in_', None),
'model_type': type(model).__name__
}
joblib.dump(model_data, file_path)
def read(self, file_path: str):
import joblib
return joblib.load(file_path)
def train_sklearn_model(data_connector, context):
from sklearn.ensemble import RandomForestRegressor
df = data_connector.to_pandas()
X, y = df[['feature1', 'feature2']], df['target']
model = RandomForestRegressor()
model.fit(X, y)
return model # Automatically saved with metadata
trainer = ManyModelTraining(train_sklearn_model, "models_stage", serde=ScikitLearnSerde())
パーティション全体でのモデルのトレーニング¶
トレーニング関数を定義したら、MMT を使用してパーティション全体でモデルをトレーニングできます。分割に使う列とモデルが保存されるステージを指定します。
次のコードは、region
列でデータを分割し、train_xgboost_model
関数を使用して各リージョンの別々のモデルを並行してトレーニングします。
たとえば、次のテーブルが region
列の可能な値だとします。
North
South
East
West
Central
ManyModelTraining
関数は、前述のリージョンごとに個別のデータパーティションを作成し、各パーティションでモデルをトレーニングすることになります。
from snowflake.ml.modeling.distributors.many_model import ManyModelTraining
trainer = ManyModelTraining(train_xgboost_model, "model_stage") # Specify the stage to store the models
training_run = trainer.run(
partition_by="region", # Train separate models for each region
snowpark_dataframe=sales_data,
run_id="regional_models_v1" # Specify a unique ID for the training run
)
# Monitor training progress
final_status = training_run.wait()
print(f"Training completed with status: {final_status}")
モデルは partition_id
がパーティション列の値である run_id/{partition_id}
でステージに格納されます。
トレーニング済みモデルへのアクセス¶
MMT の終了後、指定したステージに格納されている各データパーティションのモデルをトレーニングしました。各モデルは、そのパーティションに固有のデータでトレーニングされます。例えば、「North」モデルは北部リージョンのデータでのみトレーニングされます。
トレーニング実行オブジェクトは、これらのモデルにアクセスし、各パーティションのトレーニングステータスをチェックするメソッドを提供します。
次のコードは、トレーニング実行のステータスチェックを取得し、パーティションごとにトレーニング済みモデルを取得します。
if final_status == RunStatus.SUCCESS:
# Access models for each partition
for partition_id in training_run.partition_details:
trained_model = training_run.get_model(partition_id)
print(f"Model for {partition_id}: {trained_model}")
# You can now use the model for predictions or further analysis
# Example: model.predict(new_data)
else:
# Handle training failures
for partition_id, details in training_run.partition_details.items():
if details.status != "DONE":
print(f"Training failed for {partition_id}")
error_logs = details.logs
モデルの永続性と取得¶
MMT は自動で、トレーニングプロセス中、指定したSnowflakeステージにトレーニング済みモデルを永続化します。各モデルは、実行 ID とパーティション識別子を含む構造化パスで格納されるので、後でモデルを整理したり取得することが容易になります。
自動で永続化するので、手動でモデルを保存する必要はありません。MMT がシリアライズとストレージをあなたのために処理するため、セッションのタイムアウトや接続の問題によってトレーニング済みのモデルが失われるリスクがありません。
元のセッションが終了した後でも、以前のトレーニング実行を復元し、そのモデルにアクセスすることができます。この永続メカニズムにより、次のことが可能になります。
異なるセッション間で作業を再開する
トレーニング済みモデルをチームメンバーと共有する
モデルのバージョン管理ワークフローを確立する
下流の推論パイプラインと統合する
モデルは指定されたステージに自動保存され、後で取得することができます。
# Restore training run from stage
restored_run = ManyModelTraining.restore_from("regional_models_v1", "model_stage")
# Access models from restored run
north_model = restored_run.get_model("North")
south_model = restored_run.get_model("South")
カスタムモデルのトレーニング¶
カスタムモデルまたはサポート対象外の ML フレームワークについては、ModelSerde インターフェースを実装します。カスタムモデルに対して、独自のシリアライズと逆シリアライズのロジックを定義することができます。これにより、MMT を自分が使用する任意の機械学習フレームワークやカスタムモデルアーキテクチャと統合できます。
from snowflake.ml.modeling.distributors.many_model import ModelSerde
class CustomModelSerde(ModelSerde):
def serialize(self, model, path):
# Custom serialization logic
pass
def deserialize(self, path):
# Custom deserialization logic
pass
def train_custom_model(data_connector, context):
# Your custom training logic
model = your_custom_model_training(data_connector.to_pandas())
return model
trainer = ManyModelTraining(
train_custom_model,
"custom_model_stage",
model_serde=CustomModelSerde()
)
モデルレジストリとの統合¶
MMT をSnowflakeのモデルレジストリと統合して、モデル管理を強化することができます。モデルレジストリは、組織全体での一元化されたモデルのバージョン管理、メタデータの追跡、展開管理を可能にします。この統合は、MMT で多モデルトレーニングする場合に特に価値があります。単一の場所からすべてのパーティション固有のモデルを整理、追跡、管理できるようになります。
MMT でモデルレジストリを使用すると、次のことができます。
パーティション固有のモデルのさまざまな反復を追跡する
モデルのパフォーマンスメトリック、トレーニングパラメーター、系統情報を保存する
各パーティションについて、どのモデルバージョンが実稼働環境に展開されるかを管理する
適切なアクセス制御とドキュメントを使用してチーム間でモデルを共有する
モデル展開のための承認ワークフローとコンプライアンス追跡を実装する
# Register trained models to Model Registry
for partition_id in training_run.partition_details:
model = training_run.get_model(partition_id)
# Register to Model Registry
model_ref = registry.log_model(
model,
model_name=f"sales_model_{partition_id.lower()}",
version_name="v1"
)