Snowflake ML Data Connector¶
Snowflake ML Data Connectorを使用して、Snowflakeテーブルまたはステージからコンテナーランタイムインスタンス(ノートブックセッションや ML ジョブなど)にデータを取り込みます。データコネクタは、コンテナーランタイムの分散処理を使用してデータのロードを高速化し、Snowflake Notebooksまたは ML ジョブで ML パイプラインを実行する効率を向上させます。ロードしたデータを使用して、SnowflakeでPythonベースの ML ワークフローを実行できます。例えば、オープンソースパッケージを使用して、 ML パイプラインを拡張することができます。コンテナーランタイムの詳細情報については、 ML のContainer Runtime を参照してください。
データコネクタを使用すると、テーブルやステージなどの任意のSnowflakeデータソースからpandasデータフレームにデータをロードできます。このpandasデータフレームは、Snowflakeのオープンソース ML ワークフローで使用することができます。データコネクタはtorchやtensorflowデータセットを作成する機能も提供します。
オープンソースのワークフローを使用するだけでなく、Snowflakeの分散型 APIs を使用して、スケールの大きなモデルをトレーニングおよびチューニングすることもできます。
データコネクタはコンテナー環境で動作するように最適化されています。コンテナーランタイム以外では、データコネクタはApache Arrowベースのデータ交換形式を使用して、Snowflakeとコンテナー間でデータを移動します。同じコードがSnowflakeの中でも外でも動作します。
Data Connectorは、Snowpark DataFrame またはSnowflake Datasetのいずれかで使用できます。Snowpark DataFrames は、Snowflakeテーブルのデータへの直接アクセスを提供します。開発中に使うのがベストです。
Snowflake Datasetsはバージョン管理されたスキーマレベルのオブジェクトです。実稼働ワークフローに最適です。データセットの情報については、 Snowflake Datasets を参照してください。
以下のコードを使用して、Snowpark DataFrame をコンテナーランタイムに取り込むことができます。
connector = DataConnector.from_dataframe(snowpark_df)
以下のコードを使用して、Snowflake Datasetをコンテナーランタイムに取り込むことができます。
connector = DataConnector.from_dataset(snowflake_dataset)
データコネクタは分散処理を使用して、pandasデータフレーム、 PyTorch データセット、 TensorFlow データセットなどのオープンソースデータオブジェクトへのデータのロードを高速化します。 to_pandas
がデータフレームにデータを十分な速さでロードしない場合は、データコネクタを使用してプロセスを高速化できます。
データコネクタを直接Snowflake分散ワークフローに渡すことで、ワークフローでデータを使用する際の実行時間が改善されます。詳細情報については、 Snowflake ML Container Runtimeリファレンス(Python) を参照してください。
注釈
このトピックでは、Snowpark ML モジュールがインストールされていることを前提としています。インストールされていない場合は、 Snowflake ML をローカルで使用する をご参照ください。
pandasデータフレームへのData Connector¶
次のコードを使用して、Snowflakeテーブルからpandasデータフレームにデータを読み込みます。
from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session
session = get_active_session()
# Specify training table location
table_name = "TRAINING_TABLE"
# Load tabel into DataConnector
data_connector = DataConnector.from_dataframe(session.table(table_name))
import xgboost as xgb
pandas_df = data_connector.to_pandas()
label_column_name = 'TARGET'
X, y = pandas_df.drop(label_column_name, axis=1), pandas_df[label_column_name]
clf = xgb.Classifier()
clf.fit(X, y)
PyTorch データセットへのData Connector¶
次のコードは、データコネクタを使用してSnowflakeテーブルから PyTorch データセットにデータを読み込む方法を示しています。
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
torch_dataset = data_connector.to_torch_dataset(batch_size=32)
dataloader = DataLoader(torch_dataset, batch_size=None)
label_col = 'TARGET'
feature_cols = ['FEATURE1', 'FEATURE2']
for batch_idx, batch in enumerate(dataloader):
y = batch_data.pop(label_col).squeeze()
X = torch.stack(
[tensor.squeeze() for key, tensor in batch.items() if key in feature_cols]
)
TensorFlow データセットへのData Connector¶
TensorFlow での使用には、 to_tf_dataset()
メソッドを使用してTensorflow Datasetを取得します。データセットを反復すると、バッチ化された TensorFlow テンソルが得られます。データの読み込みはストリーミング方式で行われ、最大限の効率を実現します。
tf_ds = connector.to_tf_dataset(
batch_size=4,
shuffle=True,
drop_last_batch=True
)
for batch in tf_ds:
print(batch)
Snowflakeの最適化された分散トレーニング APIs にデータコネクタを渡す¶
最高のパフォーマンスを得るには、Snowflakeの最適化された分散トレーニング APIs にデータコネクタオブジェクトを渡します。
次のコードは、次のことを実行する方法を示しています。
Snowflakeテーブルからデータコネクタオブジェクトにデータをロードします。
Snowflakeの XGBoost 分類子のインスタンスを作成します。
データコネクタオブジェクトを使って分類子をトレーニングします。
from sklearn.datasets import make_regression
from snowflake.ml.data.data_connector import DataConnector
from snowflake.ml.modeling.distributors.xgboost.xgboost_estimator import (
XGBEstimator,
XGBScalingConfig,
)
from snowflake.snowpark.context import get_active_session
session = get_active_session()
snowflake_df = session.create_dataframe(session.table(table_name))
# Create the data connector from a Snowpark dataframe.
# Data connectors can also be created from Snowflake Datasets.
data_connector = DataConnector.from_dataframe(snowflake_df)
snowflake_est = XGBEstimator(
n_estimators=1,
objective="reg:squarederror",
scaling_config=XGBScalingConfig(use_gpu=False),
)
# If you use a data connector, input_cols and label_col must also be provided.
fit_booster = snowflake_est.fit(
data_connector,
input_cols = NUMERICAL_COLS,
label_col = LABEL_COL
)
PyTorch ディストリビューターによるシャーディングの使用¶
データコネクタを使用すると、コンテナーランタイムの複数のノード間でデータをシャードできます。Snowflake PyTorch ディストリビューターでモデルをトレーニングするためにシャーディングを使用できます。PyTorch ディストリビューターに関する情報は、 PyTorch ディストリビューター をご覧ください。
以下のコードでは、データコネクタを使用して、digitsデータセットに対して PyTorch モデルをトレーニングしています。1つのノードを指定し、そのノード上で4つのプロセスを実行します。このコードでは、 PyTorch モデル、トレーニング関数、 PyTorch トレーナーを定義しています。トレーニング関数は複数のノードで実行され、各プロセスは一意のデータシャードを受け取ります。この例では GPUs は使用していませんが、 num_gpus
パラメーターの値を、使用している GPUs の数にセットすることができます。
from sklearn import datasets
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector
from snowflake.ml.modeling.pytorch import (
PyTorchTrainer,
ScalingConfig,
WorkerResourceConfig,
getContext,
)
from torch import nn
from snowflake.snowpark.context import get_active_session
session = get_active_session()
# Create the Snowflake data. We'll start with a Snowpark dataframe
digits = datasets.load_digits(as_frame=True).frame
digits_df = session.create_dataframe(digits)
# Create the data connector from a Snowpark dataframe.
# Data connectors can also be created from Snowflake Datasets.
sharded_data_connector = ShardedDataConnector.from_dataframe(digits_df)
# Define the model. This is OSS PyTorch code.
class DigitsModel(nn.Module):
def __init__(self):
super(DigitsModel, self).__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(8 * 8, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 10)
)
def forward(self, x):
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits
# Define the training function that will run across multiple nodes or devices.
# Each train_func process will receive a unique data shard.
def train_func():
import os
import torch
import torch.distributed as dist
from torch.utils.data import DataLoader
from torch import nn
from torch.nn.parallel import DistributedDataParallel as DDP
# Context provides relevant information to training process, like data shards and model directory.
context = getContext()
dataset_map = context.get_dataset_map()
model_dir = context.get_model_dir()
training_data = dataset_map["train"].get_shard().to_torch_dataset()
train_dataloader = DataLoader(training_data, batch_size=batch_size, drop_last=True)
# The rest of this code is OSS pytorch code.
dist.init_process_group()
device = "cpu"
label_col = '"target"'
batch_size = 64
model = DDP(DigitsModel())
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
# Training loop
for epoch in range(5):
for batch, batch_data in enumerate(train_dataloader):
y = batch_data.pop(label_col).flatten().type(torch.LongTensor).to(device)
X = torch.concat(
[tensor.to(torch.float32) for tensor in batch_data.values()],
dim=-1,
) .to(device)
pred = model(X)
loss = loss_fn(pred, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if batch % 100 == 0:
print(f"Epoch {epoch}, Batch {batch}, Loss: {loss.item()}")
# Save the model
if dist.get_rank() == 0:
torch.save(model.state_dict(), os.path.join(model_dir, "digits_model.pth"))
pytroch_trainer = PyTorchTrainer(
train_func=train_func,
scaling_config=ScalingConfig(
num_nodes=1,
num_workers_per_node=4,
resource_requirements_per_worker=WorkerResourceConfig(num_cpus=1, num_gpus=0),
),
)
response = pytroch_trainer.run(
dataset_map=dict(
train=sharded_data_connector,
)
)
先のコードでは、 APIs が複数使用されていました。詳細情報については、 Snowflake ML Container Runtimeリファレンス(Python) を参照してください。