Snowpark ML Modeling: ML モデル開発

注釈

Snowpark ML Modeling API はパッケージバージョン1.1.1で一般公開されます。まだ開発中の機能には、プレビュー機能としてマークされているものがあります。

Snowpark ML Modelingは、データの前処理とモデルをトレーニングするためのPython APIs のコレクションです。Snowpark ML Modelingを使用してSnowflake内でこれらのタスクを実行すると、以下を実行できます。

  • データをSnowflakeから移動することなく、データを変換し、モデルをトレーニングする。

  • scikit-learnなど、すでに使い慣れたものと同様の APIs と連動する。

  • Snowflakeのセキュリティとガバナンスのフレームワーク内で ML パイプラインの実行を継続する。

  • Snowflakeの仮想ウェアハウスのパフォーマンスとスケーラビリティからメリットを受ける。

このトピックで説明するSnowpark ML Modelingパッケージは、scikit-learn、xgboost、lightgbmライブラリと同様の APIs がある推定器と変換器を提供します。これらの APIs を使用して、Snowflake内で機械学習モデルを構築し、トレーニングすることができます。

Snowpark ML Modelingの簡単な紹介は、 クイックスタート をご参照ください。

注釈

このトピックでは、Snowpark ML モジュールとそのモデリングの依存関係がすでにインストールされていることを前提としています。 Snowpark ML のインストール をご参照ください。

次の例を見て、Snowpark ML Modeling API と馴染みのある機械学習ライブラリとの類似性を理解してください。

前処理

この例では、Snowpark ML Modeling によるデータの前処理と変換機能の使用方法を説明します。この例で使用されている2つの前処理関数(MixMaxScalerOrdinalEncoder)は、Snowflakeの分散処理エンジンを使用しているため、クライアントサイドやストアドプロシージャの実装に比べてパフォーマンスが大幅に向上しています。詳細については、 分散前処理 をご参照ください。

import numpy as np
import pandas as pd
import random
import string

from sklearn.datasets import make_regression
from snowflake.ml.modeling.preprocessing import MinMaxScaler, OrdinalEncoder
from snowflake.ml.modeling.pipeline import Pipeline
from snowflake.snowpark import Session

# Create a session with your preferred method
# session =

NUMERICAL_COLS = ["X1", "X2", "X3"]
CATEGORICAL_COLS = ["C1", "C2", "C3"]
FEATURE_COLS = NUMERICAL_COLS + CATEGORICAL_COLS
CATEGORICAL_OUTPUT_COLS = ["C1_OUT", "C2_OUT", "C3_OUT"]
FEATURE_OUTPUT_COLS = ["X1_FEAT_OUT", "X2_FEAT_OUT", "X3_FEAT_OUT", "C1_FEAT_OUT", "C2_FEAT_OUT", "C3_FEAT_OUT"]

# Create a dataset with numerical and categorical features
X, _ = make_regression(
    n_samples=1000,
    n_features=3,
    noise=0.1,
    random_state=0,
)

def generate_random_string(length):
    return "".join(random.choices(string.ascii_uppercase, k=length))

num_categorical_cols, categorical_feature_length = 3, 2
categorical_features = []
for _ in range(num_categorical_cols):
    categorical_column = [generate_random_string(categorical_feature_length) for _ in range(X.shape[0])]
    categorical_features.append(categorical_column)
X = np.column_stack((X, *categorical_features))
X = pd.DataFrame(X, columns=FEATURE_COLS)

features_df = session.create_dataframe(X)

# Fit a pipeline with OrdinalEncoder and MinMaxScaler on Snowflake
pipeline = Pipeline(
    steps=[
        (
            "OE",
            OrdinalEncoder(
                input_cols=CATEGORICAL_COLS,
                output_cols=CATEGORICAL_OUTPUT_COLS,
            )
        ),
        (
            "MMS",
            MinMaxScaler(
                input_cols=NUMERICAL_COLS + CATEGORICAL_OUTPUT_COLS,
                output_cols=FEATURE_OUTPUT_COLS,
            )
        ),
    ]
)

pipeline.fit(features_df)

# Use the pipeline to transform a dataset.
result = pipeline.transform(features_df)
Copy

トレーニング

この例では、Snowpark ML Modelingを使用して単純なxgboost分類器モデルのトレーニングを行い、予測を実行する方法を示します。ここでのSnowpark ML API は、列の指定方法が少し違うだけで、xgboostと類似しています。これらの相違の詳細については、 一般的な API の違い をご参照ください。

import pandas as pd
from sklearn.datasets import make_classification

from snowflake.ml.modeling.xgboost import XGBClassifier
from snowflake.ml.utils.connection_params import SnowflakeLoginOptions
from snowflake.snowpark import Session

# Create a session with your preferred method
# session =

FEATURE_COLS = ["X1", "X2", "X3", "X4", "X5", "X6"]
LABEL_COLS = ["Y"]
OUTPUT_COLS = ["PREDICTIONS"]

# Set up data.
X, y = make_classification(
    n_samples=40000,
    n_features=6,
    n_informative=4,
    n_redundant=1,
    random_state=0,
    shuffle=True,
)

X = pd.DataFrame(X, columns=FEATURE_COLS)
y = pd.DataFrame(y, columns=LABEL_COLS)

features_pandas = pd.concat([X, y], axis=1)
features_df = session.create_dataframe(features_pandas)

# Train an XGBoost model on snowflake.
xgboost_model = XGBClassifier(
    input_cols=FEATURE_COLS,
    label_cols=LABEL_COLS,
    output_cols=OUTPUT_COLS
)

xgboost_model.fit(features_df)

# Use the model to make predictions.
predictions = xgboost_model.predict(features_df)
predictions[OUTPUT_COLS].show()
Copy

非合成データの特徴量の前処理とトレーニング

この例では、地上に設置された大気チェレンコフ望遠鏡からの高エネルギーガンマ粒子データを使用します。この望遠鏡は、ガンマ線によって引き起こされる電磁シャワーの中で生成される荷電粒子から放出される放射線を利用して、高エネルギーのガンマ粒子を観測します。検出器は、大気を透過して漏れるチェレンコフ放射(可視から紫外線の波長)を記録し、ガンマ線シャワーのパラメーターを再構成することができます。この望遠鏡は、宇宙線シャワーに多く含まれ、ガンマ線に似た信号を出すハドロン線も検出します。

目標は、ガンマ線とハドロン線を区別するための分類モデルを開発することです。このモデルによって、科学者はバックグラウンドのノイズをフィルタリングし、本物のガンマ線信号に焦点を当てることができます。ガンマ線により、科学者は星の誕生と死、宇宙爆発、極限状態における物質の挙動といった宇宙現象を観測することができます。

粒子データは MAGIC ガンマ望遠鏡 からダウンロードできます。データをダウンロードして解凍し、 DATA_FILE_PATH 変数にデータファイルを指すように設定し、以下のコードを実行してSnowflakeに読み込みます。

DATA_FILE_PATH = "~/Downloads/magic+gamma+telescope/magic04.data"

# Setup
from snowflake.ml.utils.connection_params import SnowflakeLoginOptions
from snowflake.snowpark import Session
import posixpath
import os

##
# Note: Create session https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/api/snowflake.snowpark.Session
##
session = Session.builder.configs(SnowflakeLoginOptions()).create()

session.sql("""
CREATE OR REPLACE TABLE Gamma_Telescope_Data(
    F_LENGTH FLOAT,
    F_WIDTH FLOAT,
    F_SIZE FLOAT,
    F_CONC FLOAT,
    F_CONC1 FLOAT,
    F_ASYM FLOAT,
    F_M3_LONG FLOAT,
    F_M3_TRANS FLOAT,
    F_ALPHA FLOAT,
    F_DIST FLOAT,
    CLASS VARCHAR(10))
""").collect()
session.sql("CREATE OR REPLACE STAGE SNOWPARK_ML_TEST_DATA_STAGE").collect()
session.file.put(
    DATA_FILE_PATH,
    "SNOWPARK_ML_TEST_DATA_STAGE/magic04.data",
    auto_compress=False,
    overwrite=True,
)

session.sql("""
COPY INTO Gamma_Telescope_Data FROM @SNOWPARK_ML_TEST_DATA_STAGE/magic04.data
FILE_FORMAT = (TYPE = 'CSV' field_optionally_enclosed_by='"',SKIP_HEADER = 0);
""").collect()

session.sql("select * from Gamma_Telescope_Data limit 5").collect()
Copy

データを読み込んだら、次のコードを使って、以下のステップでトレーニングと予測を行います。

  • データを前処理する:

    • 欠損値を平均値で置き換える。

    • 標準的なスケーラーを使用してデータを中央に配置する。

  • xgboost分類器でイベントのタイプを決定できるようにトレーニングする。

  • トレーニングデータセットとテストデータセットの両方でモデルの精度をテストする。

from snowflake.ml.utils.connection_params import SnowflakeLoginOptions
from snowflake.snowpark import Session, DataFrame

from snowflake.ml.modeling.preprocessing import StandardScaler
from snowflake.ml.modeling.impute import SimpleImputer
from snowflake.ml.modeling.pipeline import Pipeline
from snowflake.ml.modeling.xgboost import XGBClassifier

from snowflake.ml.modeling.metrics import accuracy_score

##
# Note: Create session https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/api/snowflake.snowpark.Session
##
session = Session.builder.configs(SnowflakeLoginOptions()).create()

# Step 1: Create train and test dataframes
all_data = session.sql("select *, IFF(CLASS = 'g', 1.0, 0.0) as LABEL from Gamma_Telescope_Data").drop("CLASS")
train_data, test_data = all_data.random_split(weights=[0.9, 0.1], seed=0)

# Step 2: Construct training pipeline with preprocessing and modeling steps
FEATURE_COLS = [c for c in train_data.columns if c != "LABEL"]
LABEL_COLS = ["LABEL"]

pipeline = Pipeline(steps = [
    ("impute", SimpleImputer(input_cols=FEATURE_COLS, output_cols=FEATURE_COLS)),
    ("scaler", StandardScaler(input_cols=FEATURE_COLS, output_cols=FEATURE_COLS)),
    ("model", XGBClassifier(input_cols=FEATURE_COLS, label_cols=LABEL_COLS))
])

# Step 3: Train
pipeline.fit(train_data)

# Step 4: Eval
predict_on_training_data = pipeline.predict(train_data)
training_accuracy = accuracy_score(df=predict_on_training_data, y_true_col_names=["LABEL"], y_pred_col_names=["OUTPUT_LABEL"])

predict_on_test_data = pipeline.predict(test_data)
eval_accuracy = accuracy_score(df=predict_on_test_data, y_true_col_names=["LABEL"], y_pred_col_names=["OUTPUT_LABEL"])

print(f"Training accuracy: {training_accuracy} \nEval accuracy: {eval_accuracy}")
Copy

分散ハイパーパラメーターの最適化

この例では、Snowpark ML のscikit-learnの GridSearchCV の実装を使用して、分散ハイパーパラメーターの最適化を実行する方法を示します。個々の実行は、分散ウェアハウスのコンピューティングリソースを使用して並列実行されます。分散ハイパーパラメーターの最適化の詳細については、 分散ハイパーパラメーターの最適化 をご参照ください。

from snowflake.snowpark import Session, DataFrame
from snowflake.ml.utils.connection_params import SnowflakeLoginOptions

from sklearn.datasets import make_classification
from snowflake.snowpark import Session, DataFrame
from snowflake.ml.modeling.xgboost import XGBClassifier
from snowflake.ml.modeling.model_selection.grid_search_cv import GridSearchCV

FEATURE_COLS = ["X1", "X2", "X3", "X4", "X5", "X6"]
LABEL_COLS = ["Y"]
OUTPUT_COLS = ["PREDICTIONS"]

# Create a session using your favorite login option.
# In this example we use a session builder with `SnowflakeLoginOptions`.
session = Session.builder.configs(SnowflakeLoginOptions()).create()

# Set up data.
def set_up_data(session: Session, n_samples: int) -> DataFrame:
    X, y = make_classification(
        n_samples=n_samples,
        n_features=6,
        n_informative=2,
        n_redundant=0,
        random_state=0,
        shuffle=True,
    )

    X = pd.DataFrame(X, columns=FEATURE_COLS)
    y = pd.DataFrame(y, columns=LABEL_COLS)

    features_pandas = pd.concat([X, y], axis=1)
    features_pandas.head()

    features_df = session.create_dataframe(features_pandas)
    return features_df

features_df = set_up_data(session, 10**4)

# Create a warehouse to use for the tuning job.
session.sql(
    """
CREATE or replace warehouse HYPERPARAM_WH
    WITH WAREHOUSE_SIZE = 'X-SMALL'
    WAREHOUSE_TYPE = 'Standard'
    AUTO_SUSPEND = 60
    AUTO_RESUME = TRUE
    INITIALLY_SUSPENDED = FALSE;"""
).collect()
session.use_warehouse("HYPERPARAM_WH")

# Tune an XGB Classifier model using sklearn GridSearchCV.
DISTRIBUTIONS = dict(
    n_estimators=[10, 50],
    learning_rate=[0.01, 0.1, 0.2],
)
estimator = XGBClassifier()
grid_search_cv = GridSearchCV(estimator=estimator, param_grid=DISTRIBUTIONS, input_cols=FEATURE_COLS, label_cols=LABEL_COLS, output_cols=OUTPUT_COLS)

grid_search_cv.fit(features_df)

# Use the best model to make predictions.
predictions = grid_search_cv.predict(features_df)
predictions[OUTPUT_COLS].show()

# Retrieve sklearn model, and print the best score
sklearn_grid_search_cv = grid_search_cv.to_sklearn()
print(sklearn_grid_search_cv.best_score_)
Copy

分散最適化のパワーを実際に見るには、100万行のデータでトレーニングします。

large_features_df = set_up_data(session, 10**6)

# Scale up the warehouse for a faster fit. This takes 2m15s to run on an L warehouse versus 4m5s on a XS warehouse.
session.sql(f"ALTER WAREHOUSE {session.get_current_warehouse()} SET WAREHOUSE_SIZE='LARGE'").collect()

grid_search_cv.fit(large_features_df)
print(grid_search_cv.to_sklearn().best_score_)
Copy

Snowpark ML Modelingクラス

すべてのSnowpark ML のモデリングクラスと前処理クラスは、 snowflake.ml.modeling 名前空間にあります。Snowpark MLのモジュールは、 sklearn 名前空間の対応するモジュールと同じ名前です。たとえば、 sklearn.calibration に対応するSnowpark ML モジュールは、 snowflake.ml.modeling.calibration です。 xgboostlightgbm モジュールは、それぞれ snowflake.ml.modeling.xgboostsnowflake.ml.modeling.lightgbm に対応しています。

SnowparkML Modeling API は、基礎となるscikit-learn、xgboost、lightgbmクラスのラッパーを提供し、その大部分は仮想ウェアハウス内のストアドプロシージャ(単一のウェアハウスノードで実行)として実行されます。scikit-learnのすべてのクラスがSnowpark ML でサポートされているわけではありません。現在利用可能なクラスのリストについては、 Snowpark ML API リファレンス をご参照ください。

一部のクラス(前処理やメトリクスクラスを含む)は分散実行をサポートしており、同じ操作をローカルで実行するのに比べて顕著なパフォーマンス上のメリットを提供する場合があります。詳細については、 分散前処理 および 分散ハイパーパラメーターの最適化 をご参照ください。以下のテーブルは、分散実行をサポートする特定のクラスを一覧表示したものです。

Snowpark ML モジュール名

分散クラス

snowflake.ml.modeling.impute

  • SimpleImputer

snowflake.ml.modeling.metrics

correlation:

  • correlation

covariance:

  • covariance

classification:

  • accuracy_score

  • confusion_matrix

  • f1_score

  • fbeta_score

  • log_loss

  • precision_recall_fscore_support

  • precision_score

  • recall_score

regression:

  • mean_absolute_error

  • mean_absolute_percentage_error

  • mean_squared_error

snowflake.ml.modeling.model_selection

  • GridSearchCV

  • RandomizedSearchCV

snowflake.ml.modeling.preprocessing

  • Binarizer

  • KBinsDiscretizer

  • LabelEncoder

  • MaxAbsScaler

  • MinMaxScaler

  • Normalizer

  • OneHotEncoder

  • OrdinalEncoder

  • RobustScaler

  • StandardScaler

一般的な API の違い

ちなみに

モデリング API の完全な詳細については、Snowpark ML API リファレンス をご参照ください。

Snowpark ML Modelingには、scikit-learn、xgboost、lightgbmに基づくデータ前処理、変換、予測アルゴリズムが含まれています。Snowpark Pythonクラスは、オリジナルパッケージからの対応するクラスを置き換えたもので、署名も類似しています。ただし、これらの APIs は、 NumPy 配列の代わりに、Snowpark DataFrames で動作するように設計されています。

Snowpark ML Modeling API はscikit-learnに似ていますが、いくつかの重要な違いがあります。このセクションでは、Snowpark ML で提供される推定器クラスと変換器クラスの __init__ (コンストラクター)、 fit、および predict メソッドを呼び出す方法について説明します。

  • すべてのSnowpark ML Pythonクラスの コンストラクター は、scikit-learn、xgboost、lightgbmの同等クラスが受け入れるパラメーターに加えて、 input_colsoutput_colssample_weight_collabel_cols、および drop_input_cols の5つの追加パラメーターを受け入れます。これらは文字列または文字列のシーケンスで、SnowparkまたはPandas DataFrameの入力列、出力列、サンプル重み列、ラベル列の名前を指定します。

  • Snowpark ML の fitpredict メソッドは、入力データ、ラベル、重みを表す個別の配列の代わり、単一の DataFrame を受け入れます。Snowpark ML では、クラスをインスタンス化するときに、これらの目的で使用する列の名前を指定します。これらの名前は、 fit または predict に渡す DataFrame で必要な列を見つけるために使用されます。 fit、および predict をご参照ください。

  • Snowpark ML の transformpredict メソッドは、メソッドに渡された DataFrame のすべての列を含む DataFrame を返し、予測からの出力は追加の列に格納されます。(出力列名と一致する入力列名を指定して、インプレースで変換することも、 drop_input_cols = True を渡して、入力列をドロップすることもできます。)scikit-learn、xgboost、lightgbmと同等のものは、結果のみを含む配列を返します。

  • Snowpark Pythonの変換器には fit_transform メソッドがありません。しかし、scikit-learnと同様に、パラメーターの検証は fit メソッドでのみ実行されます。したがって、変換器がフィッティングを実行しない場合でも、 transform の前のある時点で fit を呼び出す必要があります。 fit は変換器を返すため、メソッド呼び出しは、たとえば Binarizer(threshold=0.5).fit(df).transform(df) のように連鎖させることができます。

  • Snowpark ML 変換器には inverse_transform メソッドがありません。このメソッドは、Snowpark ML には不要です。これは、入力列と出力列の両方に同じ名前を指定してインプレース変換を明示的に実行する場合を除き、入力 DataFrame の入力列には元の表現が保持されるためです。

任意のSnowpark ML Modelingオブジェクトを、対応するscikit-learn、xgboost、またはlightgbmオブジェクトに変換して、基礎となる型のすべてのメソッドと属性を使用できます。 基礎となるモデルの取得 をご参照ください。

モデルの構築

個別のscikit-learnモデルクラスが受け入れるパラメーターに加えて、すべてのSnowpark ML Modelingクラスは、インスタンス化の際に次の追加パラメーターを受け入れます。

これらのパラメーターは技術的にはすべてオプションですが、 input_cols、または output_cols、あるいはその両方を指定する場合が多くあります。 label_colssample_weight_col は、テーブルに示す特定の状況では必須ですが、それ以外の場合では省略できます。

ちなみに

すべての列名は、Snowflakeの 識別子の要件 に従う必要があります。テーブル作成時に大文字と小文字を区別したり、特殊文字(ドル記号とアンダースコア以外)を使用したりする場合は、列名を二重引用符で囲む必要があります。大文字と小文字を区別するPandas DataFrames との互換性を維持するために、可能な限りすべて大文字の列名を使用します。

from snowflake.ml.modeling.preprocessing import MinMaxScaler
from snowflake.snowpark import Session

# Snowflake identifiers are not case sensitive by default.
# These column names will be automatically updated to ["COLUMN_1", "COLUMN_2", "COLUMN_3"] by the Snowpark DataFrame.
schema = ["column_1", "column_2", "column_3"]
df = session.create_dataframe([[1, 2, 3]], schema = schema)
df.show()
Copy
--------------------------------------
|"COLUMN_1"  |"COLUMN_2"  |"COLUMN_3"|
--------------------------------------
|1           |2          |3          |
--------------------------------------
Copy
# Identify the column names using the Snowflake identifier.
input_cols = ["COLUMN_1", "COLUMN_2", "COLUMN_3"]
mms = MinMaxScaler(input_cols=input_cols)
mms.fit(df)

# To maintain lower case column names, include a double quote within the string.
schema = ['"column_1"', '"column_2"', '"column_3"']
df = session.create_dataframe([[1, 2, 3]], schema = schema)
df.show()
Copy
----------------------------------------
|'"column_1"'|'"column_2"'|'"column_3"'|
----------------------------------------
|1           |2           |3           |
----------------------------------------
Copy
# Since no conversion took place, the schema labels can be used as the column identifiers.
mms = MinMaxScaler(input_cols=schema)
mms.fit(df)
Copy

パラメーター

説明

input_cols

特徴量を含む列名を表す文字列または文字列のリスト。

このパラメーターを省略した場合は、 label_colssample_weight_colpassthrough_cols パラメーターによって指定された列以外の、入力 DataFrame の列すべてが入力列とみなされます。

label_cols

ラベルを含む列の名前を表す文字列または文字列のリスト。

監視型推定器のラベル列は推論できないため、必ず指定する必要があります。これらのラベル列はモデル予測のターゲットとして使用され、 input_cols とは明確に区別する必要があります。

output_cols

predict および transform 操作の出力を格納する列名を表す文字列または文字列のリスト。 output_cols の長さは、使用される特定の予測器または変換器クラスから予測される出力列数と一致する必要があります。

このパラメーターを省略した場合、出力列名は監視型推定器ではラベル列名に OUTPUT_ プレフィックスを、非監視型推定器では OUTPUT_IDX を追加したものになります。これらの推論された出力列名は予測器では機能しますが、変換器では output_cols を明示的に設定する必要があります。一般的に、特に入力列名を指定しない場合は、出力列名を明示的に指定した方が明確になります。

インプレースで変換するには、 input_colsoutput_cols に同じ名前を渡します。

passthrough_cols

トレーニング、変換、推論から除外する列の名前を示す文字列または文字列のリスト。パススルー列は、入力と出力 DataFrames の間にそのまま残ります。

このオプションは、トレーニングや推論時にインデックス列などの特定の列の使用を避けたい場合に便利ですが、 input_cols を渡すことはできません。 input_cols を渡さなかった場合、これらの列は通常は入力とみなされます。

sample_weight_col

例の重みを含む列名を表す文字列。

この引数は,重み付きデータセットに必要です.

drop_input_cols

入力列を結果 DataFrame から削除するかどうかを示すブール値。デフォルトは False です。

scikit-learnには、 DecisionTreeClassifier コンストラクターに必須の引数がありません。すべての引数はデフォルト値です。そのため、scikit-learnでは次のように記述する場合があります。

from sklearn.tree import DecisionTreeClassifier

model = DecisionTreeClassifier()
Copy

Snowpark ML では、列名を指定する必要があります(または、指定せずにデフォルトを受け入れます)。この例では、明示的に指定されています。

引数をコンストラクターに直接渡すか、インスタンス化後にモデルの属性として設定して、Snowpark ML DecisionTreeClassifier を初期化することができます。(属性はいつでも変更可能です。)

  • コンストラクターの引数として、

    from snowflake.ml.modeling.tree import DecisionTreeClassifier
    
    model = DecisionTreeClassifier(
        input_cols=feature_column_names, label_cols=label_column_names, sample_weight_col=weight_column_name,
        output_cols=expected_output_column_names
    )
    
    Copy
  • モデルの属性を設定することで、

    from snowflake.ml.modeling.tree import DecisionTreeClassifier
    
    model = DecisionTreeClassifier()
    model.set_input_cols(feature_column_names)
    model.set_label_cols(label_column_names)
    model.set_sample_weight_col(weight_column_name)
    model.set_output_cols(output_column_names)
    
    Copy

fit

Snowpark ML 分類器の fit メソッドは、特徴量、ラベル、重みを含むすべての列を含む単一の SnowparkまたはPandas DataFrame を受け取ります。これはscikit-learnが特徴量、ラベル、および重みを個別の入力として受け取る fit メソッドとは異なります。

scikit-learnでは、 DecisionTreeClassifier.fit メソッド呼び出しは次のようになります。

model.fit(
    X=df[feature_column_names], y=df[label_column_names], sample_weight=df[weight_column_name]
)
Copy

Snowpark ML では、 DataFrame を渡すだけです。 モデルの構築 に示すように、入力、ラベル、重みの列名は、初期化時、またはセッターメソッドを使用してすでに設定されています。

model.fit(df)
Copy

predict

Snowpark ML クラスの predict メソッドも、すべての特徴量列を含む単一のSnowparkまたはPandas DataFrame を受け取ります。結果は、入力 DataFrame のすべての列が変更されず、出力列が追加された DataFrame になります。この DataFrame から出力列を抽出する必要があります。これは、結果のみを返すscikit-learnの predict メソッドとは異なります。

scikit-learnでは、 predict は予測結果のみを返します。

prediction_results = model.predict(X=df[feature_column_names])
Copy

Snowpark ML の予測結果のみを取得するには、返された DataFrame から出力列を抽出します。ここで、 output_column_names は、出力列の名前を含むリストです。

prediction_results = model.predict(df)[output_column_names]
Copy

分散前処理

Snowpark ML のデータ前処理および変換関数の多くは、Snowflakeの分散実行エンジンを使用して実装されており、シングルノード実行(つまりストアドプロシージャ)と比較してパフォーマンスに大きなメリットがあります。どの関数が分散実行をサポートしているかは、 Snowpark ML Modelingクラス をご参照ください。

下のグラフは、大規模なパブリックデータセットで、中程度のSnowparkに最適化されたウェアハウスで実行され、ストアドプロシージャで実行されるscikit-learnとSnowpark MLの分散実装を比較した、例示的なパフォーマンス数値を示したものです。多くのシナリオでは、Snowpark ML Modelingを使用すると、コードの実行が25~50倍速くなります。

Illustration of performance improvements possible by distributed preprocessing

Fitの分配方法

Snowpark ML 前処理変換器の fit メソッドは、SnowparkまたはPandas DataFrame を受け取り、データセットをフィッティングし、フィッティング後の変換器を返します。

  • Snowpark DataFrames の場合、分散フィッティングは SQL エンジンを使用します。変換器は SQL クエリを生成して、必要な状態(平均、最大、カウントなど)を計算します。これらのクエリはSnowflakeによって実行され、結果はローカルで実体化されます。SQL で計算できない複雑な状態の場合、変換器はSnowflakeから中間結果をフェッチし、メタデータに対してローカル計算を実行します。

    変換中に仮の状態テーブルを必要とする複雑な変換器(OneHotEncoder、OrdinalEncoder など)の場合、これらのテーブルはPandas DataFrames を使用してローカルに表現されます。

  • Pandas DataFrames は、scikit-learnによるフィッティングと同様に、ローカルでフィッティングされます。変換器は提供されたパラメーターを使って、対応するscikit-learn変換器を作成します。次に、scikit-learn変換器がフィッティングされ、Snowpark ML 変換器がscikit-learnオブジェクトから必要な状態を導出します。

Transformの分配方法

Snowpark ML 前処理変換器の transform メソッドは、SnowparkまたはPandasDataFrame を受け取り、データセットを変換し、変換後のデータセットを返します。

  • Snowpark DataFrames では、 SQL エンジンを使って分散変換が行われます。フィッティング後の変換器は、変換後のデータセットを表す基礎となる SQL クエリを持つSnowpark DataFrame を生成します。 transform メソッドは、単純な変換(StandardScalerMinMaxScaler など)に対して遅延評価を行うので、 transform メソッドの間は実際には変換が行われません。

    しかし、ある種の複雑な変換には実行が伴います。これには、変換中に仮の状態テーブル(OneHotEncoderOrdinalEncoder など)を必要とする変換器も含まれます。このような変換器の場合、結合やその他の操作のために、Pandas DataFrame(オブジェクトの状態を格納)から仮テーブルを作成します。もう1つのケースは、特定のパラメーターが設定されている場合です。たとえば、変換器が、変換中に見つかった未知の値をエラーとして処理するように設定されている場合、列や未知の値などを含むデータを実体化します。

  • Pandas DataFrames は、scikit-learnによる変換と同様に、ローカルで変換されます。変換器は to_sklearn API を使って、対応するscikit-learn変換器を作成し、メモリ内で変換を行います。

分散ハイパーパラメーターの最適化

ハイパーパラメーターチューニングは、データサイエンスのワークフローの不可欠な部分です。Snowpark ML ライブラリは、scikit-learn GridSearchCVRandomizedSearchCV APIs の分散実装を提供し、シングルノードとマルチノードのウェアハウス両方で効率的なハイパーパラメータチューニングを可能にします。

ちなみに

Snowpark ML は、デフォルトで分散ハイパーパラメーター最適化を有効にします。これを無効にするには、以下のPythonインポートを使用します。

import snowflake.ml.modeling.parameters.disable_distributed_hpo
Copy

注釈

最小のSnowflake仮想ウェアハウス(XS)またはSnowparkに最適化されたウェアハウス(M)にはノードが1つあります。サイズが大きくなる度にノード数は倍増します。

シングルノード(XS)のウェアハウスでは、scikit-learnのjoblibマルチプロセッシングフレームワークを使用して、デフォルトでノードの全容量が利用されます。

ちなみに

シングルノードのウェアハウス(XS標準ウェアハウスまたはSnowparkに最適化されたウェアハウス(M))でメモリ不足エラーが発生した場合は、 n_jobs パラメーターを指定して並列度を下げてみてください。デフォルトの n_jobs 値-1は、すべてのコアを使用します。

マルチノードのウェアハウスでは、クロスバリデーションチューニングジョブ内の fit 操作はノード間で分散されます。スケールアップのためのコード変更は必要ありません。推定器のフィッティングは、ウェアハウス内のすべてのノードの利用可能なすべてのコアで並列実行されます。

Estimator fits are executed in parallel on all available CPUs on all machines in the warehouse

例として、scikit-learnライブラリで提供されている カリフォルニア住宅データセット を考えてみましょう。このデータには20,640行のデータがあり、以下の情報が含まれています。

  • MedInc: ブロックグループの所得の中央値

  • HouseAge: ブロックグループの住宅築年数の中央値

  • AveRooms: 世帯あたりの平均部屋数

  • AveBedrms: 世帯あたりの平均寝室数

  • 人口: ブロックグループの人口

  • AveOccup: 平均世帯人数

  • 緯度 および 経度

データセットの対象は所得の中央値で、単位は十万ドルです。

この例では、所得中央値を予測するための最良のハイパーパラメーターの組み合わせを見つけるために、ランダムフォレスト回帰変数のグリッド探索クロスバリデーションを行います。

from snowflake.ml.modeling.ensemble.random_forest_regressor import RandomForestRegressor
from snowflake.ml.modeling.model_selection.grid_search_cv import GridSearchCV
from sklearn import datasets

def load_housing_data() -> DataFrame:
    input_df_pandas = datasets.fetch_california_housing(as_frame=True).frame
    # Set the columns to be upper case for consistency with Snowflake identifiers.
    input_df_pandas.columns = [c.upper() for c in input_df_pandas.columns]
    input_df = session.create_dataframe(input_df_pandas)

    return input_df

input_df = load_housing_data()

# Use all the columns besides the median value as the features
input_cols = [c for c in input_df.columns if not c.startswith("MEDHOUSEVAL")]
# Set the target median value as the only label columns
label_cols = [c for c in input_df.columns if c.startswith("MEDHOUSEVAL")]


DISTRIBUTIONS = dict(
            max_depth=[80, 90, 100, 110],
            min_samples_leaf=[1,3,10],
            min_samples_split=[1.0, 3,10],
            n_estimators=[100,200,400]
        )
estimator = RandomForestRegressor()
n_folds = 5

clf = GridSearchCV(estimator=estimator, param_grid=DISTRIBUTIONS, cv=n_folds, input_cols=input_cols, label_cols=label_col)
clf.fit(input_df)
Copy

この例は、Snowparkで最適化されたMedium(シングルノード)ウェアハウスでは7分強で実行され、X-Largeウェアハウスではわずか3分で実行されます。

Illustration of performance improvements possible by distributed hyperparameter optimization

モデルの展開と実行

モデルをトレーニングした結果は、Python Snowpark ML モデルオブジェクトになります。モデルの predict メソッドを呼び出すと、トレーニング済みモデルを使用して予測できます。これにより、Snowflake仮想ウェアハウスでモデルを実行するための仮のユーザー定義関数が作成されます。この関数は、Snowpark ML セッションの終了時(例: スクリプトの終了時やJupyterノートブックを閉じるとき)に自動的に削除されます。

セッション終了後もユーザー定義関数を保持するには、手動で作成します。詳細については、このトピックの クイックスタート をご参照ください。

Snowpark ML モデルレジストリは、今後リリースされる機能で、永続的なモデルもサポートし、モデルの検索と展開を容易にします。モデルレジストリと他のSnowpark ML の機能の概要については、 クイックスタート をご参照ください。

複数変換のためのパイプライン

scikit-learnでは、パイプラインを使用して一連の変換を実行するのが一般的です。scikit-learnパイプラインはSnowpark ML クラスでは動作しないため、Snowpark ML は、一連の変換を実行するために sklearn.pipeline.Pipeline のSnowpark Pythonバージョンを提供しています。このクラスは snowflake.ml.modeling.pipeline パッケージにあり、scikit-learnバージョンと同じように動作します。

基礎となるモデルの取得

Snowpark ML モデルは、以下の方法で「アンラップ」できます。これは、サードパーティの基礎となるモデルに変換できるということです(ライブラリによって異なります)。

  • to_sklearn

  • to_xgboost

  • to_lightgbm

次に、基礎となるモデルのすべての属性とメソッドにアクセスし、推定器に対してローカルで実行することができます。たとえば、 GridSearchCV の例 では、最良のスコアを取得するために、グリッド探索推定器をscikit-learnオブジェクトに変換します。

best_score = grid_search_cv.to_sklearn().best_score_
Copy

既知の制限

  • Snowpark ML 推定器および変換器は現在、スパース入力およびスパース応答をサポートしていません。スパースデータがある場合は、Snowpark ML の推定器や変換器に渡す前に密な形式に変換します。

  • Snowpark ML パッケージは現在、マトリックスデータ型をサポートしていません。結果としてマトリックスを生成するような推定器や変換器に対する操作はすべて失敗します。

  • 結果データの行の順番が入力データの行の順番と一致することは保証されません。

トラブルシューティング

ログへの詳細の追加

Snowpark ML は、Snowpark Pythonのログを使用します。デフォルトでは、Snowpark ML は INFO レベルのメッセージを標準出力にログします。より詳細なログを取得し、Snowpark ML で発生した問題のトラブルシューティングに役立てるには、 サポートされているレベル のいずれかに変更します。

DEBUG は最も詳細なログを生成します。ログレベルを DEBUG に設定するには、

import logging, sys

logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
Copy

一般的な問題の解決策

次のテーブルで、Snowflake ML Modelingで起こりうる問題を解決するためのいくつかの提案を示します。

問題またはエラーメッセージ

考えられる原因

解決策

「name x is not defined」などの NameError、 ImportError、または ModuleNotFoundError

モジュール名またはクラス名に誤字があるか、Snowpark ML がインストールされていない。

正しいモジュールとクラス名については、Snowpark ML Modelingクラスのテーブルをご参照ください。Snowpark ML モジュールがインストールされていることを確認してください(Snowpark ML のインストール を参照)。

KeyError (「not in index」または「none of [Index[..]] are in [列]」)

誤った列名。

列名をチェックして修正します。

SnowparkSQLException、「does not exist or not authorize」

テーブルが存在しないか、テーブルに対する十分な権限がない。

テーブルが存在し、ユーザーロールに権限があることを確認してください。

SnowparkSQLException、「invalid identifier PETALLENGTH」

誤った列の数(通常は列が欠落)。

モデルクラスの作成時に指定した列の数をチェックし、正しい数を渡していることを確認してください。

InvalidParameterError

不適切な型または値がパラメーターとして渡された。

インタラクティブPythonセッションで help 関数を使用して、クラスやメソッドのヘルプをチェックし、値を修正します。

TypeError、「unexpected keyword argument」

名前付き引数に誤字がある。

インタラクティブPythonセッションで help 関数を使用して、クラスやメソッドのヘルプをチェックし、引数名を修正します。

ValueError、「array with 0 sample(s)」

渡されたデータセットが空。

データセットが空ではないことを確認してください。

SnowparkSQLException、「authentication token has expired」

セッションの有効期限が切れた。

Jupyterノートブックを使用している場合は、カーネルを再起動して新しいセッションを作成します。

「cannot convert string to float」などの ValueError

データ型の不一致。

インタラクティブPythonセッションで help 関数を使用して、クラスやメソッドのヘルプをチェックし、値を修正します。

SnowparkSQLException、「cannot create temporary table」

モデルクラスが、呼び出し元の権限で実行されないストアドプロシージャの内部で使用されている。

所有者権限ではなく、呼び出し元の権限でストアドプロシージャを作成します。

SnowparkSQLException、「function available memory exceeded」

データセットが標準ウェアハウスで5 GB を超えている。

Snowparkに最適化されたウェアハウス に切り替えます。

OSError、「no space left on device」

モデルは標準的なウェアハウスの約500 MB よりも大きい。

Snowparkに最適化されたウェアハウス に切り替えます。

互換性のないxgboostのバージョン、またはxgboostのインポート時のエラー

依存関係を適切に処理できない pip を使用してインストールされている。

エラーメッセージに従って、パッケージをアップグレードまたはダウングレードします。

to_sklearnto_xgboost、 または to_lightgbm が関与する AttributeError

異なる型のモデルでこれらのメソッドのいずれかを使おうとした。

scikit-learn ベースのモデルなどで to_sklearn を使用します。

参考文献

各ライブラリの機能に関する包括的な情報については、元のライブラリのドキュメントをご参照ください。

謝辞

このドキュメントの一部は、 BSD-3 「新規」または「改訂」ライセンスおよびCopyright © 2007-2023 The scikit-learn developersの下でライセンスされているScikit-learnのドキュメントに由来しています。All Rights Reserved.

このドキュメントの一部は、Apache License 2.0, January 2004およびCopyright © 2019で網羅されている XGboost に由来します。All Rights Reserved.

このドキュメントの一部は、 MIT によるライセンスおよびCopyright © Microsoft Corp.の LightGBM ドキュメントに由来します。All Rights Reserved.