Data Connector Snowflake ML¶
Utilisez Data Connector Snowflake ML pour ingérer les données des tables ou des zones de préparation Snowflake dans une instance d’environnement d’exécution des conteneurs (telle qu’une session notebook ou une tâche ML). Le connecteur de données utilise le traitement distribué de l’environnement d’exécution du conteneur pour accélérer le chargement des données et améliorer l’efficacité de l’exécution des pipelines ML dans les notebook Snowflake ou les tâches ML. Vous pouvez exécuter des workflows ML basés sur Python dans Snowflake en utilisant les données que vous avez chargées. Par exemple, vous pouvez faire évoluer les pipelines ML à l’aide de paquets open source. Pour plus d’informations sur l’environnement d’exécution des conteneurs, voir Container Runtime pour ML.
Vous pouvez utiliser le connecteur de données pour charger des données depuis n’importe quelle source de données Snowflake, telle qu’une table ou une zone de préparation, dans un dataframe pandas. Ce dataframe pandas peut ensuite être utilisé avec vos flux de travail ML open source dans Snowflake. Le connecteur de données offre également la fonctionnalité pour créer des ensembles de données Torch et Tensorflow.
Outre l’utilisation de flux de travail open source, vous pouvez également utiliser les APIs distribuées de Snowflake pour entraîner et ajuster des modèles à l’échelle.
Le connecteur de données est optimisé pour fonctionner dans l’environnement des conteneurs. En dehors de l’environnement d’exécution du conteneur, le connecteur de données utilise un format d’échange de données basé sur Apache Arrow pour déplacer les données entre Snowflake et votre conteneur. Le même code fonctionne à l’intérieur et à l’extérieur de Snowflake.
Vous pouvez utiliser le Data Connector avec un jeu de données Snowpark ou Snowflake DataFrame. Snowpark DataFrames fournit un accès direct aux données de vos tables Snowflake. Il est préférable de les utiliser pendant le développement.
Les jeux de données Snowflake sont des objets versionnés au niveau du schéma. Il est préférable de les utiliser pour les flux de production. Pour plus d’informations sur les jeux de données, voir Ensembles de données Snowflake.
Vous pouvez utiliser le code suivant pour intégrer votre Snowpark DataFrame dans l’environnement d’exécution du conteneur :
connector = DataConnector.from_dataframe(snowpark_df)
Vous pouvez utiliser le code suivant pour intégrer votre jeu de données Snowflake dans l’environnement d’exécution du conteneur :
connector = DataConnector.from_dataset(snowflake_dataset)
Le connecteur de données utilise le traitement distribué pour accélérer le chargement des données dans les objets de données open source, tels que les dataframes pandas, les jeu de données PyTorch ou les jeux de données TensorFlow. Si to_pandas
ne charge pas les données dans le dataframe assez rapidement, vous pouvez utiliser le connecteur de données pour accélérer le processus.
La transmission des connecteurs de données directement dans les workflows distribués Snowflake améliore les temps d’exécution de l’utilisation des données dans le workflow. Pour plus d’informations, voir Snowflake ML Container Runtime Reference (Python).
Note
Cette rubrique suppose que le module Snowpark ML est installé. Si ce n’est pas le cas, consultez Utilisation de Snowflake ML en local.
Data Connector vers dataframe pandas¶
Vous pouvez utiliser le code suivant pour charger les données d’une table Snowflake dans un dataframe pandas.
from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session
session = get_active_session()
# Specify training table location
table_name = "TRAINING_TABLE"
# Load tabel into DataConnector
data_connector = DataConnector.from_dataframe(session.table(table_name))
import xgboost as xgb
pandas_df = data_connector.to_pandas()
label_column_name = 'TARGET'
X, y = pandas_df.drop(label_column_name, axis=1), pandas_df[label_column_name]
clf = xgb.Classifier()
clf.fit(X, y)
Data Connector vers jeu de données PyTorch¶
Le code suivant montre comment vous pouvez utiliser le connecteur de données pour charger les données d’une table Snowflake dans un jeu de données PyTorch.
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
torch_dataset = data_connector.to_torch_dataset(batch_size=32)
dataloader = DataLoader(torch_dataset, batch_size=None)
label_col = 'TARGET'
feature_cols = ['FEATURE1', 'FEATURE2']
for batch_idx, batch in enumerate(dataloader):
y = batch_data.pop(label_col).squeeze()
X = torch.stack(
[tensor.squeeze() for key, tensor in batch.items() if key in feature_cols]
)
Data Connector vers jeu de données TensorFlow¶
Pour une utilisation avec TensorFlow, utilisez la méthode to_tf_dataset()
pour obtenir un ensemble de données Tensorflow : l’itération sur l’ensemble de données permet d’obtenir les tenseurs TensorFlow en lots. Les données sont chargées en flux continu pour une efficacité maximale.
tf_ds = connector.to_tf_dataset(
batch_size=4,
shuffle=True,
drop_last_batch=True
)
for batch in tf_ds:
print(batch)
Transmission d’un connecteur de données vers les APIs d’entraînement distribuées optimisées de Snowflake¶
Pour de meilleures performances, vous pouvez transférer un objet de connecteur de données vers les APIs d’entraînement distribuées optimisées de Snowflake.
Le code suivant montre comment vous pouvez procéder :
Chargez les données d’une table Snowflake dans un objet de connecteur de données.
Créez une instance du classificateur XGBoost de Snowflake.
Entraînez le classificateur à l’aide de l’objet de connecteur de données.
from sklearn.datasets import make_regression
from snowflake.ml.data.data_connector import DataConnector
from snowflake.ml.modeling.distributors.xgboost.xgboost_estimator import (
XGBEstimator,
XGBScalingConfig,
)
from snowflake.snowpark.context import get_active_session
session = get_active_session()
snowflake_df = session.create_dataframe(session.table(table_name))
# Create the data connector from a Snowpark dataframe.
# Data connectors can also be created from Snowflake Datasets.
data_connector = DataConnector.from_dataframe(snowflake_df)
snowflake_est = XGBEstimator(
n_estimators=1,
objective="reg:squarederror",
scaling_config=XGBScalingConfig(use_gpu=False),
)
# If you use a data connector, input_cols and label_col must also be provided.
fit_booster = snowflake_est.fit(
data_connector,
input_cols = NUMERICAL_COLS,
label_col = LABEL_COL
)
Utiliser le partitionnement avec le distributeur PyTorch¶
Vous pouvez utiliser le connecteur de données pour répartir vos données sur plusieurs nœuds dans un environnement d’exécution des conteneurs. Vous pouvez utiliser le partitionnement pour entraîner des modèles avec le distributeur PyTorch Snowflake. Pour des informations sur le distributeur PyTorch, voir Distributeur PyTorch.
Le code suivant entraîne un modèle PyTorch dans le jeu de données des chiffres à l’aide du connecteur de données. Il utilise spécifiquement un nœud avec quatre processus qui s’exécutent sur le nœud. Le code définit un modèle PyTorch, une fonction d’entraînement et un formateur PyTorch. La fonction d’entraînement s’exécute sur plusieurs nœuds, chaque processus recevant un fragment de données unique. Aucune GPUs n’a été utilisée dans cet exemple, mais vous pouvez définir la valeur du paramètre num_gpus
sur le nombre de GPUs que vous utilisez.
from sklearn import datasets
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector
from snowflake.ml.modeling.pytorch import (
PyTorchTrainer,
ScalingConfig,
WorkerResourceConfig,
getContext,
)
from torch import nn
from snowflake.snowpark.context import get_active_session
session = get_active_session()
# Create the Snowflake data. We'll start with a Snowpark dataframe
digits = datasets.load_digits(as_frame=True).frame
digits_df = session.create_dataframe(digits)
# Create the data connector from a Snowpark dataframe.
# Data connectors can also be created from Snowflake Datasets.
sharded_data_connector = ShardedDataConnector.from_dataframe(digits_df)
# Define the model. This is OSS PyTorch code.
class DigitsModel(nn.Module):
def __init__(self):
super(DigitsModel, self).__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(8 * 8, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 10)
)
def forward(self, x):
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits
# Define the training function that will run across multiple nodes or devices.
# Each train_func process will receive a unique data shard.
def train_func():
import os
import torch
import torch.distributed as dist
from torch.utils.data import DataLoader
from torch import nn
from torch.nn.parallel import DistributedDataParallel as DDP
# Context provides relevant information to training process, like data shards and model directory.
context = getContext()
dataset_map = context.get_dataset_map()
model_dir = context.get_model_dir()
training_data = dataset_map["train"].get_shard().to_torch_dataset()
train_dataloader = DataLoader(training_data, batch_size=batch_size, drop_last=True)
# The rest of this code is OSS pytorch code.
dist.init_process_group()
device = "cpu"
label_col = '"target"'
batch_size = 64
model = DDP(DigitsModel())
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
# Training loop
for epoch in range(5):
for batch, batch_data in enumerate(train_dataloader):
y = batch_data.pop(label_col).flatten().type(torch.LongTensor).to(device)
X = torch.concat(
[tensor.to(torch.float32) for tensor in batch_data.values()],
dim=-1,
) .to(device)
pred = model(X)
loss = loss_fn(pred, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if batch % 100 == 0:
print(f"Epoch {epoch}, Batch {batch}, Loss: {loss.item()}")
# Save the model
if dist.get_rank() == 0:
torch.save(model.state_dict(), os.path.join(model_dir, "digits_model.pth"))
pytroch_trainer = PyTorchTrainer(
train_func=train_func,
scaling_config=ScalingConfig(
num_nodes=1,
num_workers_per_node=4,
resource_requirements_per_worker=WorkerResourceConfig(num_cpus=1, num_gpus=0),
),
)
response = pytroch_trainer.run(
dataset_map=dict(
train=sharded_data_connector,
)
)
Plusieurs APIs ont été utilisées dans le code précédent. Pour plus d’informations, voir Snowflake ML Container Runtime Reference (Python).