データパーティション全体でモデルをトレーニングする

多モデルトレーニング(MMT)を使用して、データパーティション全体で複数の機械学習モデルを効率的にトレーニングします。分散オーケストレーション、モデルストレージ、アーティファクトの永続化を自動的に処理します。

MMT は、指定された列によってSnowpark DataFrame をパーティション化し、各パーティションの個別のモデルを並行してトレーニングします。モデルトレーニングロジックに焦点を当てる一方、MMT はインフラストラクチャの複雑さを処理し、自動的にスケーリングします。

MMT を使用することで、さまざまなデータセグメントにわたって複数のモデルを効率的にトレーニングできます。このツールは、地域固有の売上予測モデルのトレーニング、各顧客グループが独自のモデルを必要とするパーソナライズされた推奨システムの構築、セグメント固有の予測モデルの作成などのシナリオに最適です。 MMT は、分散モデルのトレーニングを自動的に処理するため、複雑な分散コンピューティングインフラストラクチャの管理が不要になります。

MMT では、XGBoost、scikit-learn、PyTorch、TensorFlow などのオープンソース機械学習モデルやフレームワークを使用してモデルをトレーニングできます。 MMT はモデルのアーキテクトを自動的にシリアル化するので、推論時にアクセスすることができます。

ModelSerde インターフェースを実装してカスタムモデルをトレーニングしたり、サポートされていない ML フレームワークを使用することもできます。これにより、MMT を自分が使用する任意の機械学習フレームワークやカスタムモデルアーキテクチャと統合できます。

重要

MMT を使い始める前に、以下をご確認ください。

  • コンテナランタイム環境: MMT には、Snowflake ML コンテナランタイム環境が必要です。

  • ステージアクセス許可: MMT は、モデルアーティファクトをSnowflakeステージに自動的に保管します。指定した名前付きステージにアクセスするための適切な権限を持っていることを確認します。

  • ML フレームワークのサポート:組み込みの統合を XGBoost、scikit-learn、PyTorch、TensorFlow に利用できます。カスタムモデルの場合は、ModelSerde インターフェースを実装します。

次のセクションでは、ワークフローの例を使って MMT の使用について説明します。

MMT を使用したモデルのトレーニング

このセクションでは、完全な MMT ワークフローを5つの主要ステップで説明します。

  1. データのインポート - Snowparkを使用してトレーニングデータをロードする

  2. トレーニング関数の定義 - トレーニング関数を定義する

  3. パーティション全体でモデルのトレーニング - MMT を使用して各パーティションのモデルを並行してトレーニングする

  4. トレーニング済みモデルへのアクセス - 各パーティションのトレーニング済みモデルを取得して使用する

  5. モデルの永続性と取得 - モデルをステージに保存し、後で復元する

このワークフローでは、データパーティション全体での分散トレーニング、モデルのシリアル化、アーティファクトストレージを自動的に処理します。

データのインポート

Snowparkセッションを使用して、データのインポートを開始します。多モデルトレーニング関数が、指定した列を使用して、インポートするデータを異なるパーティションに分割します。

MMT を使用する前に、Snowparkセッションを作成します。詳細情報については、Snowpark Pythonのセッションの作成 をご参照ください。

次のコードでは、Snowparkのセッションを使用してトレーニングデータをインポートします。

# Example: sales_data with columns: region, feature1, feature2, feature3, target
sales_data = session.table("SALES_TRAINING_DATA")
Copy

トレーニング関数の定義

データを取得した後、MMT がパーティション全体でのモデルトレーニングに使用するトレーニング関数を定義します。トレーニング関数は、データコネクタと、トレーニング中のデータパーティションをポイントするコンテキストオブジェクトを受け取ります。このセクションには、TensorFlow と PyTorch を活用する例に加えて、XGBoost モデルをトレーニングするためのトレーニング関数定義の例があります。

トレーニング関数には、この (data_connector, context) という正確な署名が必要です。データパーティションごとに、MMT は 次の引数を使用して train_xgboost_model を呼び出します。

  • data_connector:MMT が分割するデータへのアクセスを提供するデータコネクタです。train_xgboost_model はそのデータフレームを pandas に変換します。

  • context:partition_idtrain_xgboost_model 関数に提供するオブジェクトです。この ID は、パーティション分割する列の名前です。

この関数を自分で呼び出すことはありません。MMT は、すべてのパーティションでの実行を処理します。

以下のコードを使用して、トレーニング関数を定義します。データの特徴量を反映するようにコードを変更したら、MMT 関数に渡すことができます。

XGBoost を使用してデータパーティション全体でモデルをトレーニングします。XGBoost は構造化データに優れたパフォーマンスを提供し、欠損値を自動的に処理します。

def train_xgboost_model(data_connector, context):
    df = data_connector.to_pandas()
    print(f"Training model for partition: {context.partition_id}")

    # Prepare features and target
    X = df[['feature1', 'feature2', 'feature3']]
    y = df['target']

    # Train the model
    from xgboost import XGBRegressor
    model = XGBRegressor(
        n_estimators=100,
        max_depth=6,
        learning_rate=0.1,
        random_state=42
    )
    model.fit(X, y)
    return model

trainer = ManyModelTraining(train_xgboost_model, "model_stage")
Copy

パーティション全体でのモデルのトレーニング

トレーニング関数を定義したら、MMT を使用してパーティション全体でモデルをトレーニングできます。分割に使う列とモデルが保存されるステージを指定します。

次のコードは、region 列でデータを分割し、train_xgboost_model 関数を使用して各リージョンの別々のモデルを並行してトレーニングします。

たとえば、次のテーブルが region 列の可能な値だとします。

  • North

  • South

  • East

  • West

  • Central

ManyModelTraining 関数は、前述のリージョンごとに個別のデータパーティションを作成し、各パーティションでモデルをトレーニングすることになります。

from snowflake.ml.modeling.distributors.many_model import ManyModelTraining

trainer = ManyModelTraining(train_xgboost_model, "model_stage") # Specify the stage to store the models
training_run = trainer.run(
    partition_by="region",  # Train separate models for each region
    snowpark_dataframe=sales_data,
    run_id="regional_models_v1" # Specify a unique ID for the training run
)

# Monitor training progress
final_status = training_run.wait()
print(f"Training completed with status: {final_status}")
Copy

モデルは partition_id がパーティション列の値である run_id/{partition_id} でステージに格納されます。

トレーニング済みモデルへのアクセス

MMT の終了後、指定したステージに格納されている各データパーティションのモデルをトレーニングしました。各モデルは、そのパーティションに固有のデータでトレーニングされます。例えば、「North」モデルは北部リージョンのデータでのみトレーニングされます。

トレーニング実行オブジェクトは、これらのモデルにアクセスし、各パーティションのトレーニングステータスをチェックするメソッドを提供します。

次のコードは、トレーニング実行のステータスチェックを取得し、パーティションごとにトレーニング済みモデルを取得します。

if final_status == RunStatus.SUCCESS:
    # Access models for each partition
    for partition_id in training_run.partition_details:
        trained_model = training_run.get_model(partition_id)
        print(f"Model for {partition_id}: {trained_model}")

        # You can now use the model for predictions or further analysis
        # Example: model.predict(new_data)
else:
    # Handle training failures
    for partition_id, details in training_run.partition_details.items():
        if details.status != "DONE":
            print(f"Training failed for {partition_id}")
            error_logs = details.logs
Copy

モデルの永続性と取得

MMT は自動で、トレーニングプロセス中、指定したSnowflakeステージにトレーニング済みモデルを永続化します。各モデルは、実行 ID とパーティション識別子を含む構造化パスで格納されるので、後でモデルを整理したり取得することが容易になります。

自動で永続化するので、手動でモデルを保存する必要はありません。MMT がシリアライズとストレージをあなたのために処理するため、セッションのタイムアウトや接続の問題によってトレーニング済みのモデルが失われるリスクがありません。

元のセッションが終了した後でも、以前のトレーニング実行を復元し、そのモデルにアクセスすることができます。この永続メカニズムにより、次のことが可能になります。

  • 異なるセッション間で作業を再開する

  • トレーニング済みモデルをチームメンバーと共有する

  • モデルのバージョン管理ワークフローを確立する

  • 下流の推論パイプラインと統合する

モデルは指定されたステージに自動保存され、後で取得することができます。

# Restore training run from stage
restored_run = ManyModelTraining.restore_from("regional_models_v1", "model_stage")

# Access models from restored run
north_model = restored_run.get_model("North")
south_model = restored_run.get_model("South")
Copy

カスタムモデルのトレーニング

カスタムモデルまたはサポート対象外の ML フレームワークについては、ModelSerde インターフェースを実装します。カスタムモデルに対して、独自のシリアライズと逆シリアライズのロジックを定義することができます。これにより、MMT を自分が使用する任意の機械学習フレームワークやカスタムモデルアーキテクチャと統合できます。

from snowflake.ml.modeling.distributors.many_model import ModelSerde

class CustomModelSerde(ModelSerde):
    def serialize(self, model, path):
        # Custom serialization logic
        pass

    def deserialize(self, path):
        # Custom deserialization logic
        pass

def train_custom_model(data_connector, context):
    # Your custom training logic
    model = your_custom_model_training(data_connector.to_pandas())
    return model

trainer = ManyModelTraining(
    train_custom_model,
    "custom_model_stage",
    model_serde=CustomModelSerde()
)
Copy

モデルレジストリとの統合

MMT をSnowflakeのモデルレジストリと統合して、モデル管理を強化することができます。モデルレジストリは、組織全体での一元化されたモデルのバージョン管理、メタデータの追跡、展開管理を可能にします。この統合は、MMT で多モデルトレーニングする場合に特に価値があります。単一の場所からすべてのパーティション固有のモデルを整理、追跡、管理できるようになります。

MMT でモデルレジストリを使用すると、次のことができます。

  • パーティション固有のモデルのさまざまな反復を追跡する

  • モデルのパフォーマンスメトリック、トレーニングパラメーター、系統情報を保存する

  • 各パーティションについて、どのモデルバージョンが実稼働環境に展開されるかを管理する

  • 適切なアクセス制御とドキュメントを使用してチーム間でモデルを共有する

  • モデル展開のための承認ワークフローとコンプライアンス追跡を実装する

# Register trained models to Model Registry
for partition_id in training_run.partition_details:
    model = training_run.get_model(partition_id)

    # Register to Model Registry
    model_ref = registry.log_model(
        model,
        model_name=f"sales_model_{partition_id.lower()}",
        version_name="v1"
    )
Copy