Snowpark Pythonを使用した機械学習モデルのトレーニング

このトピックでは、Snowparkを使用して機械学習(ML)モデルをトレーニングする方法について説明します。

注釈

Snowpark ML は、Snowflakeでの機械学習に特化して構築されたSnowpark Pythonのコンパニオンです。このトピックには、Snowpark Pythonを使用した機械学習に関する有用な一般情報が含まれています。

このトピックの内容:

Snowpark用に最適化されたウェアハウス

機械学習(ML)モデルのトレーニングは、リソースを大量に消費する場合があります。Snowpark用に最適化されたウェアハウスは、大量のメモリとコンピューティングリソースを必要とするワークロードに使用できるSnowflake仮想ウェアハウスの一種です。たとえば、それらを使用して、単一ノードでカスタムコードを使用して ML モデルをトレーニングできます。

これらの最適化されたウェアハウスは、一部の UDF および UDTF シナリオにも役立ちます。

Snowpark用に最適化されたウェアハウスを作成する方法の詳細については、 Snowpark用に最適化されたウェアハウス をご参照ください。

ML トレーニングに対するSnowpark Pythonストアドプロシージャの使用

Snowpark Python ストアドプロシージャ を使用して、Snowflakeウェアハウスを使用したカスタムコードを実行できます。Snowpark用に最適化されたウェアハウスにより、Snowparkストアドプロシージャを使用して、単一ノードの ML トレーニングワークロードをSnowflakeで直接実行できるようになります。

Pythonストアドプロシージャは、 Python用Snowpark API を使用してネストされたクエリを実行し、データセットを読み込んで変換できます。その後、データセットはストアドプロシージャメモリに読み込まれ、前処理と ML トレーニングを実行します.トレーニング済みのモデルはSnowflakeステージにアップロードでき、 UDFs を作成して推論を実行するために使用できます。

Snowpark用に最適化されたウェアハウスを使用して前処理とトレーニングロジックを実行できますが、パフォーマンスとリソース使用率を向上させるために、別のウェアハウスでネストされたクエリを実行する必要が生じる場合があります。別のクエリウェアハウスは、データセットのサイズに基づいて個別に調整およびスケールできます。

ガイドライン

次のガイドラインに従って、単一ノード ML トレーニングワークロードを実行します。

  • WAREHOUSE_SIZE = MEDIUM を設定して、Snowpark用に最適化されたウェアハウスがSnowpark用に最適化されたノード1つで構成されるようにします。

  • ウェアハウス MAX_CONCURRENCY_LEVEL パラメーターを1に設定して、特定のSnowparkストアドプロシージャのメモリとコンピューティングリソースを最大化します。例:

    alter warehouse snowpark_opt_wh set max_concurrency_level = 1;
    
    Copy
  • 必要に応じて、ウェアハウスを マルチクラスターウェアハウス として設定して、必要な並行性をサポートすることを検討してください。

  • ストアドプロシージャからネストされたクエリを実行するために別のウェアハウスを使用することを検討してください。

    • session.use_warehouse() API を使用して、ストアドプロシージャ内のクエリのウェアハウスを選択します。

  • ML トレーニングストアドプロシージャの実行に使用されるSnowpark用に最適化されたウェアハウスで、他のワークロードを混在させないでください。

次の例では、Snowpark用に最適化されたウェアハウスを作成して使用します。次に、線形回帰モデルをトレーニングするストアドプロシージャを作成します。ストアドプロシージャは、 MARKETING_BUDGETS_FEATURES という名前のテーブルのデータを使用します(ここには示されていません)。

CREATE OR REPLACE WAREHOUSE snowpark_opt_wh WITH
  WAREHOUSE_SIZE = 'MEDIUM'
  WAREHOUSE_TYPE = 'SNOWPARK-OPTIMIZED'
  MAX_CONCURRENCY_LEVEL = 1;

CREATE OR REPLACE PROCEDURE train()
  RETURNS VARIANT
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.8
  PACKAGES = ('snowflake-snowpark-python', 'scikit-learn', 'joblib')
  HANDLER = 'main'
AS $$
import os
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import PolynomialFeatures
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split, GridSearchCV
from joblib import dump

def main(session):
  # Load features
  df = session.table('MARKETING_BUDGETS_FEATURES').to_pandas()
  X = df.drop('REVENUE', axis = 1)
  y = df['REVENUE']

  # Split dataset into training and test
  X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state = 42)

  # Preprocess numeric columns
  numeric_features = ['SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL']
  numeric_transformer = Pipeline(steps=[('poly',PolynomialFeatures(degree = 2)),('scaler', StandardScaler())])
  preprocessor = ColumnTransformer(transformers=[('num', numeric_transformer, numeric_features)])

  # Create pipeline and train
  pipeline = Pipeline(steps=[('preprocessor', preprocessor),('classifier', LinearRegression(n_jobs=-1))])
  model = GridSearchCV(pipeline, param_grid={}, n_jobs=-1, cv=10)
  model.fit(X_train, y_train)

  # Upload trained model to a stage
  model_file = os.path.join('/tmp', 'model.joblib')
  dump(model, model_file)
  session.file.put(model_file, "@ml_models",overwrite=True)

  # Return model R2 score on train and test data
  return {"R2 score on Train": model.score(X_train, y_train),"R2 score on Test": model.score(X_test, y_test)}
$$;
Copy

ストアドプロシージャを呼び出すには、次のコマンドを実行します。

CALL train();
Copy

注釈

他のさまざまなSnowpark Pythonデモが Snowflake-Labs GitHub リポジトリ で入手できます。 広告費と ROI 予測 の例は、線形回帰モデルをトレーニングするストアドプロシージャの作成方法を示しています。