Snowflake ML-Datenkonnektor¶
Verwenden Sie den Snowflake ML-Datenkonnektor, um Daten aus Snowflake-Tabellen oder Stagingbereichen in eine Laufzeitinstanz des Containers (z. B. eine Notebook-Sitzung oder einen ML-Job) zu übernehmen. Der Datenkonnektor nutzt die verteilte Verarbeitung der Container Runtime, um das Laden von Daten zu beschleunigen und die Effizienz der Ausführung von ML-Pipelines in Snowflake-Notebooks oder ML-Jobs zu verbessern. Sie können Python-basierte ML-Workflows in Snowflake mit den von Ihnen geladenen Daten ausführen. Sie können zum Beispiel ML-Pipelines mit Open-Source-Paketen skalieren. Weitere Informationen über die Container Runtime finden Sie unter Container Runtime für ML.
Sie können den Datenkonnektor verwenden, um Daten aus einer beliebigen Snowflake-Datenquelle, z. B. einer Tabelle oder einem Stagingbereich, in einen pandas-Dataframe zu laden. Dieser pandas-Datenframe kann dann mit Ihren Open Source ML-Workflows in Snowflake verwendet werden. Der Datenkonnektor bietet auch die Funktionalität, Torch- und Tensorflow-Datensätze zu erstellen.
Neben der Verwendung von Open-Source-Workflows können Sie auch die verteilten APIs von Snowflake verwenden, um Modelle in großem Maßstab zu trainieren und abzustimmen.
Der Datenkonnektor ist für die Arbeit in einer Container-Umgebung optimiert. Außerhalb der Container Runtime verwendet der Datenkonnektor ein auf Apache Arrow basierendes Datenaustauschformat, um Daten zwischen Snowflake und Ihrem Container zu übertragen. Der gleiche Code funktioniert sowohl innerhalb als auch außerhalb von Snowflake.
Sie können den Datenkonnektor entweder mit einem Snowpark DataFrame oder einem Snowflake Dataset verwenden. Snowpark DataFrames bieten direkten Zugriff auf die Daten in Ihren Snowflake-Tabellen. Sie werden am besten während der Entwicklung verwendet.
Snowflake-Datensets sind versionierte Objekte auf Schemaebene. Sie sind am besten für Produktions-Workflows geeignet. Weitere Informationen über Datensets finden Sie unter Snowflake Datasets.
Sie können den folgenden Code verwenden, um Ihren Snowpark DataFrame in die Container Runtime zu bringen:
connector = DataConnector.from_dataframe(snowpark_df)
Sie können den folgenden Code verwenden, um Ihren Snowflake-Datenset in die Container Runtime zu bringen:
connector = DataConnector.from_dataset(snowflake_dataset)
Der Datenkonnektor nutzt die verteilte Verarbeitung, um das Laden von Daten in Open-Source-Datenobjekte zu beschleunigen, wie z. B. pandas-Dataframes, PyTorch-Datensetzs oder TensorFlow-Datensets. Wenn to_pandas
die Daten nicht schnell genug in den Dataframe lädt, können Sie den Datenkonnektor verwenden, um den Prozess zu beschleunigen.
Die Übergabe der Datenkonnektoren direkt in verteilte Snowflake-Workflows verbessert die Laufzeiten bei der Verwendung der Daten im Workflow. Weitere Informationen finden Sie unter Snowflake ML Container Runtime Reference (Python).
Bemerkung
Dieses Thema setzt voraus, dass das Snowpark ML-Modul installiert ist. Wenn dies nicht der Fall ist, finden Sie weitere Informationen unter Lokale Verwendung von Snowflake ML.
Datenverbindung zu pandas-Dataframe¶
Sie können den folgenden Code verwenden, um Daten aus einer Snowflake-Tabelle in einen pandas-Dataframe zu laden.
from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session
session = get_active_session()
# Specify training table location
table_name = "TRAINING_TABLE"
# Load tabel into DataConnector
data_connector = DataConnector.from_dataframe(session.table(table_name))
import xgboost as xgb
pandas_df = data_connector.to_pandas()
label_column_name = 'TARGET'
X, y = pandas_df.drop(label_column_name, axis=1), pandas_df[label_column_name]
clf = xgb.Classifier()
clf.fit(X, y)
Datenkonnektor zu PyTorch-Datenset¶
Der folgende Code zeigt, wie Sie den Datenkonnektor verwenden können, um Daten aus einer Snowflake-Tabelle in einen PyTorch-Datenset zu laden.
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
torch_dataset = data_connector.to_torch_dataset(batch_size=32)
dataloader = DataLoader(torch_dataset, batch_size=None)
label_col = 'TARGET'
feature_cols = ['FEATURE1', 'FEATURE2']
for batch_idx, batch in enumerate(dataloader):
y = batch_data.pop(label_col).squeeze()
X = torch.stack(
[tensor.squeeze() for key, tensor in batch.items() if key in feature_cols]
)
Datenkonnektor zu TensorFlow-Datenset¶
Verwenden Sie für die Verwendung mit TensorFlow to_tf_dataset()
-Methode, um einen Tensorflow-Datensatz zu erhalten: Durch Iterieren über den Datensatz erhalten Sie TensorFlow in Batches. Das Laden von Daten erfolgt in einem Streaming-Verfahren für maximale Effizienz.
tf_ds = connector.to_tf_dataset(
batch_size=4,
shuffle=True,
drop_last_batch=True
)
for batch in tf_ds:
print(batch)
Übergabe eines Datenkonnektors an das optimierte verteilte Training von Snowflake APIs¶
Um die beste Leistung zu erzielen, können Sie ein Datenkonnektor-Objekt an das optimierte verteilte Training von Snowflake APIs übergeben.
Der folgende Code zeigt, wie Sie das tun können:
Laden von Daten aus einer Snowflake-Tabelle in ein Datenkonnektor-Objekt.
Erstellen Sie eine Instanz des XGBoost-Klassifikators von Snowflake.
Trainieren Sie den Klassifikator mit Hilfe des Datenkonnektor-Objekts.
from sklearn.datasets import make_regression
from snowflake.ml.data.data_connector import DataConnector
from snowflake.ml.modeling.distributors.xgboost.xgboost_estimator import (
XGBEstimator,
XGBScalingConfig,
)
from snowflake.snowpark.context import get_active_session
session = get_active_session()
snowflake_df = session.create_dataframe(session.table(table_name))
# Create the data connector from a Snowpark dataframe.
# Data connectors can also be created from Snowflake Datasets.
data_connector = DataConnector.from_dataframe(snowflake_df)
snowflake_est = XGBEstimator(
n_estimators=1,
objective="reg:squarederror",
scaling_config=XGBScalingConfig(use_gpu=False),
)
# If you use a data connector, input_cols and label_col must also be provided.
fit_booster = snowflake_est.fit(
data_connector,
input_cols = NUMERICAL_COLS,
label_col = LABEL_COL
)
Verwendung von Sharding mit dem PyTorch-Verteiler¶
Sie können den Datenkonnektor verwenden, um Ihre Daten auf mehrere Knoten in einer Container Runtime aufzuteilen. Sie können Sharding verwenden, um Modelle mit dem Snowflake PyTorch-Verteiler zu trainieren. Informationen über den PyTorch-Verteiler finden Sie unter PyTorch-Verteiler.
Der folgende Code trainiert ein PyTorch-Modell auf dem Digits-Datensatz unter Verwendung des Datenkonnektors. Es verwendet einen Knoten mit vier Prozessen, die auf diesem Knoten laufen. Der Code definiert ein PyTorch-Modell, eine Trainingsfunktion und einen PyTorch-Trainer. Die Trainingsfunktion läuft über mehrere Knoten, wobei jeder Prozess einen eigenen Datensplitter erhält. In diesem Beispiel wurden keine GPUs verwendet, aber Sie können den Wert des Parameters num_gpus
auf die Anzahl der GPUs setzen, die Sie verwenden.
from sklearn import datasets
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector
from snowflake.ml.modeling.pytorch import (
PyTorchTrainer,
ScalingConfig,
WorkerResourceConfig,
getContext,
)
from torch import nn
from snowflake.snowpark.context import get_active_session
session = get_active_session()
# Create the Snowflake data. We'll start with a Snowpark dataframe
digits = datasets.load_digits(as_frame=True).frame
digits_df = session.create_dataframe(digits)
# Create the data connector from a Snowpark dataframe.
# Data connectors can also be created from Snowflake Datasets.
sharded_data_connector = ShardedDataConnector.from_dataframe(digits_df)
# Define the model. This is OSS PyTorch code.
class DigitsModel(nn.Module):
def __init__(self):
super(DigitsModel, self).__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(8 * 8, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 10)
)
def forward(self, x):
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits
# Define the training function that will run across multiple nodes or devices.
# Each train_func process will receive a unique data shard.
def train_func():
import os
import torch
import torch.distributed as dist
from torch.utils.data import DataLoader
from torch import nn
from torch.nn.parallel import DistributedDataParallel as DDP
# Context provides relevant information to training process, like data shards and model directory.
context = getContext()
dataset_map = context.get_dataset_map()
model_dir = context.get_model_dir()
training_data = dataset_map["train"].get_shard().to_torch_dataset()
train_dataloader = DataLoader(training_data, batch_size=batch_size, drop_last=True)
# The rest of this code is OSS pytorch code.
dist.init_process_group()
device = "cpu"
label_col = '"target"'
batch_size = 64
model = DDP(DigitsModel())
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
# Training loop
for epoch in range(5):
for batch, batch_data in enumerate(train_dataloader):
y = batch_data.pop(label_col).flatten().type(torch.LongTensor).to(device)
X = torch.concat(
[tensor.to(torch.float32) for tensor in batch_data.values()],
dim=-1,
) .to(device)
pred = model(X)
loss = loss_fn(pred, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if batch % 100 == 0:
print(f"Epoch {epoch}, Batch {batch}, Loss: {loss.item()}")
# Save the model
if dist.get_rank() == 0:
torch.save(model.state_dict(), os.path.join(model_dir, "digits_model.pth"))
pytroch_trainer = PyTorchTrainer(
train_func=train_func,
scaling_config=ScalingConfig(
num_nodes=1,
num_workers_per_node=4,
resource_requirements_per_worker=WorkerResourceConfig(num_cpus=1, num_gpus=0),
),
)
response = pytroch_trainer.run(
dataset_map=dict(
train=sharded_data_connector,
)
)
Im vorangegangenen Code wurden mehrere APIs verwendet. Weitere Informationen finden Sie unter Snowflake ML Container Runtime Reference (Python).