Snowflake ML Data Connector

Snowflake ML Data Connectorを使用して、Snowflakeテーブルまたはステージからコンテナーランタイムインスタンス(ノートブックセッションや ML ジョブなど)にデータを取り込みます。データコネクタは、コンテナーランタイムの分散処理を使用してデータのロードを高速化し、Snowflake Notebooksまたは ML ジョブで ML パイプラインを実行する効率を向上させます。ロードしたデータを使用して、SnowflakeでPythonベースの ML ワークフローを実行できます。例えば、オープンソースパッケージを使用して、 ML パイプラインを拡張することができます。コンテナーランタイムの詳細情報については、 ML のContainer Runtime を参照してください。

データコネクタを使用すると、テーブルやステージなどの任意のSnowflakeデータソースからpandasデータフレームにデータをロードできます。このpandasデータフレームは、Snowflakeのオープンソース ML ワークフローで使用することができます。データコネクタはtorchやtensorflowデータセットを作成する機能も提供します。

オープンソースのワークフローを使用するだけでなく、Snowflakeの分散型 APIs を使用して、スケールの大きなモデルをトレーニングおよびチューニングすることもできます。

データコネクタはコンテナー環境で動作するように最適化されています。コンテナーランタイム以外では、データコネクタはApache Arrowベースのデータ交換形式を使用して、Snowflakeとコンテナー間でデータを移動します。同じコードがSnowflakeの中でも外でも動作します。

Data Connectorは、Snowpark DataFrame またはSnowflake Datasetのいずれかで使用できます。Snowpark DataFrames は、Snowflakeテーブルのデータへの直接アクセスを提供します。開発中に使うのがベストです。

Snowflake Datasetsはバージョン管理されたスキーマレベルのオブジェクトです。実稼働ワークフローに最適です。データセットの情報については、 Snowflake Datasets を参照してください。

以下のコードを使用して、Snowpark DataFrame をコンテナーランタイムに取り込むことができます。

connector = DataConnector.from_dataframe(snowpark_df)
Copy

以下のコードを使用して、Snowflake Datasetをコンテナーランタイムに取り込むことができます。

connector = DataConnector.from_dataset(snowflake_dataset)
Copy

データコネクタは分散処理を使用して、pandasデータフレーム、 PyTorch データセット、 TensorFlow データセットなどのオープンソースデータオブジェクトへのデータのロードを高速化します。 to_pandas がデータフレームにデータを十分な速さでロードしない場合は、データコネクタを使用してプロセスを高速化できます。

データコネクタを直接Snowflake分散ワークフローに渡すことで、ワークフローでデータを使用する際の実行時間が改善されます。詳細情報については、 Snowflake ML Container Runtimeリファレンス(Python) を参照してください。

注釈

このトピックでは、Snowpark ML モジュールがインストールされていることを前提としています。インストールされていない場合は、 Snowflake ML をローカルで使用する をご参照ください。

pandasデータフレームへのData Connector

次のコードを使用して、Snowflakeテーブルからpandasデータフレームにデータを読み込みます。

from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session

session = get_active_session()

# Specify training table location
table_name = "TRAINING_TABLE"
# Load tabel into DataConnector
data_connector = DataConnector.from_dataframe(session.table(table_name))


import xgboost as xgb

pandas_df = data_connector.to_pandas()
label_column_name = 'TARGET'
X, y = pandas_df.drop(label_column_name, axis=1), pandas_df[label_column_name]

clf = xgb.Classifier()
clf.fit(X, y)
Copy

PyTorch データセットへのData Connector

次のコードは、データコネクタを使用してSnowflakeテーブルから PyTorch データセットにデータを読み込む方法を示しています。

import torch
import torch.nn as nn
from torch.utils.data import DataLoader

torch_dataset = data_connector.to_torch_dataset(batch_size=32)
dataloader = DataLoader(torch_dataset, batch_size=None)
label_col = 'TARGET'
feature_cols = ['FEATURE1', 'FEATURE2']

for batch_idx, batch in enumerate(dataloader):
  y = batch_data.pop(label_col).squeeze()
  X = torch.stack(
      [tensor.squeeze() for key, tensor in batch.items() if key in feature_cols]
  )
Copy

TensorFlow データセットへのData Connector

TensorFlow での使用には、 to_tf_dataset() メソッドを使用してTensorflow Datasetを取得します。データセットを反復すると、バッチ化された TensorFlow テンソルが得られます。データの読み込みはストリーミング方式で行われ、最大限の効率を実現します。

tf_ds = connector.to_tf_dataset(
    batch_size=4,
    shuffle=True,
    drop_last_batch=True
)

for batch in tf_ds:
    print(batch)
Copy

Snowflakeの最適化された分散トレーニング APIs にデータコネクタを渡す

最高のパフォーマンスを得るには、Snowflakeの最適化された分散トレーニング APIs にデータコネクタオブジェクトを渡します。

次のコードは、次のことを実行する方法を示しています。

  1. Snowflakeテーブルからデータコネクタオブジェクトにデータをロードします。

  2. Snowflakeの XGBoost 分類子のインスタンスを作成します。

  3. データコネクタオブジェクトを使って分類子をトレーニングします。

from sklearn.datasets import make_regression
from snowflake.ml.data.data_connector import DataConnector


from snowflake.ml.modeling.distributors.xgboost.xgboost_estimator import (
  XGBEstimator,
  XGBScalingConfig,
)

from snowflake.snowpark.context import get_active_session

session = get_active_session()

snowflake_df = session.create_dataframe(session.table(table_name))

# Create the data connector from a Snowpark dataframe.
# Data connectors can also be created from Snowflake Datasets.
data_connector = DataConnector.from_dataframe(snowflake_df)

snowflake_est = XGBEstimator(
  n_estimators=1,
  objective="reg:squarederror",
  scaling_config=XGBScalingConfig(use_gpu=False),
)

# If you use a data connector, input_cols and label_col must also be provided.
fit_booster = snowflake_est.fit(
  data_connector,
  input_cols = NUMERICAL_COLS,
  label_col = LABEL_COL
)
Copy

PyTorch ディストリビューターによるシャーディングの使用

データコネクタを使用すると、コンテナーランタイムの複数のノード間でデータをシャードできます。Snowflake PyTorch ディストリビューターでモデルをトレーニングするためにシャーディングを使用できます。PyTorch ディストリビューターに関する情報は、 PyTorch ディストリビューター をご覧ください。

以下のコードでは、データコネクタを使用して、digitsデータセットに対して PyTorch モデルをトレーニングしています。1つのノードを指定し、そのノード上で4つのプロセスを実行します。このコードでは、 PyTorch モデル、トレーニング関数、 PyTorch トレーナーを定義しています。トレーニング関数は複数のノードで実行され、各プロセスは一意のデータシャードを受け取ります。この例では GPUs は使用していませんが、 num_gpus パラメーターの値を、使用している GPUs の数にセットすることができます。

from sklearn import datasets
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector

from snowflake.ml.modeling.pytorch import (
  PyTorchTrainer,
  ScalingConfig,
  WorkerResourceConfig,
  getContext,
)
from torch import nn
from snowflake.snowpark.context import get_active_session

session = get_active_session()

# Create the Snowflake data. We'll start with a Snowpark dataframe
digits = datasets.load_digits(as_frame=True).frame
digits_df = session.create_dataframe(digits)

# Create the data connector from a Snowpark dataframe.
# Data connectors can also be created from Snowflake Datasets.
sharded_data_connector = ShardedDataConnector.from_dataframe(digits_df)

# Define the model. This is OSS PyTorch code.
class DigitsModel(nn.Module):
    def __init__(self):
        super(DigitsModel, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(8 * 8, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

# Define the training function that will run across multiple nodes or devices.
# Each train_func process will receive a unique data shard.
def train_func():
    import os
    import torch
    import torch.distributed as dist
    from torch.utils.data import DataLoader
    from torch import nn
    from torch.nn.parallel import DistributedDataParallel as DDP

    # Context provides relevant information to training process, like data shards and model directory.
    context = getContext()
    dataset_map = context.get_dataset_map()
    model_dir = context.get_model_dir()
    training_data = dataset_map["train"].get_shard().to_torch_dataset()
    train_dataloader = DataLoader(training_data, batch_size=batch_size, drop_last=True)

    # The rest of this code is OSS pytorch code.
    dist.init_process_group()
    device = "cpu"
    label_col = '"target"'
    batch_size = 64

    model = DDP(DigitsModel())

    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

    # Training loop
    for epoch in range(5):
        for batch, batch_data in enumerate(train_dataloader):
            y = batch_data.pop(label_col).flatten().type(torch.LongTensor).to(device)
            X = torch.concat(
                [tensor.to(torch.float32) for tensor in batch_data.values()],
                dim=-1,
          )  .to(device)
            pred = model(X)
            loss = loss_fn(pred, y)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if batch % 100 == 0:
                print(f"Epoch {epoch}, Batch {batch}, Loss: {loss.item()}")

    # Save the model
    if dist.get_rank() == 0:
        torch.save(model.state_dict(), os.path.join(model_dir, "digits_model.pth"))


pytroch_trainer = PyTorchTrainer(
    train_func=train_func,
    scaling_config=ScalingConfig(
        num_nodes=1,
        num_workers_per_node=4,
        resource_requirements_per_worker=WorkerResourceConfig(num_cpus=1, num_gpus=0),
    ),
)
response = pytroch_trainer.run(
    dataset_map=dict(
        train=sharded_data_connector,
    )
)
Copy

先のコードでは、 APIs が複数使用されていました。詳細情報については、 Snowflake ML Container Runtimeリファレンス(Python) を参照してください。