Chargement et écriture de données¶
Utilisez Snowflake ML pour charger efficacement les données des tables et des zones de préparation Snowflake dans vos workflows de machine learning. Snowflake ML fournit des capacités de chargement de données optimisées qui tirent parti du traitement distribué de Snowflake pour accélérer l’ingestion de données pour vos workflows d’entraînement et d’inférence.
Vous pouvez charger et traiter des données à l’aide de :
Snowflake Notebooks : Environnement de développement interactif pour explorer les données et créer des modèles ML. Pour plus d’informations, voir Notebooks sur Container Runtime pour ML.
Tâches ML Snowflake : Exécutez vos charges de travail ML de manière asynchrone à partir de n’importe quel environnement de développement. Pour plus d’informations, voir Tâches Snowflake ML.
Les tâches ML et les notebooks s’exécutent sur Container Runtime pour ML, qui fournit des environnements préconfigurés optimisés pour les charges de travail de machine learning avec des capacités de traitement distribuées. Container Runtime utilise Ray, un framework open-source pour le calcul distribué, pour traiter efficacement les données sur plusieurs nœuds de calcul. Pour plus d’informations sur Container Runtime pour ML, voir Container Runtime pour ML.
Snowflake ML fournit différentes APIs pour le chargement de données structurées et non structurées :
Données structurées (tables et ensembles de données)
DataConnector : Chargez des données à partir de tables Snowflake et d’ensembles de données Snowflake. Pour plus d’informations, voir Charger des données structurées à partir de tables Snowflake.
DataSink : Réécrivez les données dans des tables Snowflake. Pour plus d’informations, voir Réécrire les données structurées dans des tables Snowflake.
Données non structurées (fichiers dans des zones de préparation)
APIs DataSource : Chargez des données à partir de différents formats de fichiers (CSV, Parquet, images, etc.) des zones de préparation de Snowflake. Pour plus d’informations, voir Chargement de données non structurées à partir de zones de préparation Snowflake.
Le tableau suivant peut vous aider à choisir la bonne API pour votre cas d’utilisation :
Type de données |
Source de données |
API pour le chargement |
API pour l’écriture |
|---|---|---|---|
Structuré |
Tables Snowflake |
DataConnector |
DataSink |
Structuré |
Ensembles de données Snowflake |
DataConnector |
DataSink |
Non structuré |
Fichiers CSV (zone de préparation) |
DataSource API |
N/A |
Non structuré |
Fichiers Parquet (zone de préparation) |
DataSource API |
N/A |
Non structuré |
Autres fichiers en zone de préparation |
DataSource API |
N/A |
Charger des données structurées à partir de tables Snowflake¶
Utilisez le DataConnector Snowflake pour charger des données structurées à partir de tables Snowflake et d’ensembles de données Snowflake dans un notebook Snowflake ou une tâche ML Snowflake. Le DataConnector accélère le chargement des données en parallélisant les lectures sur plusieurs nœuds de calcul.
Le DataConnector fonctionne avec les DataFrames Snowpark ou les ensembles de données Snowflake :
DataFrames Snowpark : Fournissez un accès direct aux données de vos tables Snowflake. À utiliser de préférence pendant le développement.
Ensembles de données Snowflake : Objets de niveau schéma versionnés. À utiliser de préférence pour les workflows de production. Pour plus d’informations, voir Ensembles de données Snowflake.
Après la parallélisation des lectures, le DataConnector peut convertir les données dans l’une des structures de données suivantes :
dataframe pandas
PyTorch jeu de données
TensorFlow jeu de données
Créer une DataConnector¶
Vous pouvez créer un DataConnector depuis un DataFrame Snowpark ou un ensemble de données Snowflake.
Utilisez le code suivant pour créer un DataConnector depuis un DataFrame Snowpark :
from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session
session = get_active_session()
# Create DataConnector from a Snowflake table
data_connector = DataConnector.from_dataframe(session.table("example-table-name"))
Utilisez le code suivant pour créer un DataConnector depuis un ensemble de données Snowflake :
from snowflake.ml.data.data_connector import DataConnector
# Create DataConnector from a Snowflake Dataset
data_connector = DataConnector.from_dataset(snowflake_dataset)
Convertir DataConnector vers d’autres formats¶
Après avoir créé un DataConnector, vous pouvez le convertir en différentes structures de données pour l’utiliser avec divers frameworks ML.
Vous pouvez convertir un DataConnector vers un dataframe pandas à utiliser avec scikit-learn et d’autres bibliothèques compatibles pandas.
L’exemple suivant charge les données d’une table Snowflake dans un dataframe Pandas et forme un classificateur XGBoost :
from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session
import xgboost as xgb
session = get_active_session()
# Specify training table location
table_name = "TRAINING_TABLE"
# Load table into DataConnector
data_connector = DataConnector.from_dataframe(session.table(table_name))
# Convert to pandas dataframe
pandas_df = data_connector.to_pandas()
# Prepare features and labels
label_column_name = 'TARGET'
X, y = pandas_df.drop(label_column_name, axis=1), pandas_df[label_column_name]
# Train classifier
clf = xgb.Classifier()
clf.fit(X, y)
Vous pouvez convertir un DataConnector vers un ensemble de données PyTorch à utiliser avec des modèles PyTorch et des chargeurs de données.
L’exemple suivant charge les données d’une table Snowflake dans un ensemble de données PyTorch :
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from snowflake.ml.data.data_connector import DataConnector
# Create DataConnector (see previous examples)
# data_connector = DataConnector.from_dataframe(...)
# Convert to PyTorch dataset
torch_dataset = data_connector.to_torch_dataset(batch_size=32)
dataloader = DataLoader(torch_dataset, batch_size=None)
label_col = 'TARGET'
feature_cols = ['FEATURE1', 'FEATURE2']
for batch_idx, batch in enumerate(dataloader):
y = batch_data.pop(label_col).squeeze()
X = torch.stack(
[tensor.squeeze() for key, tensor in batch.items() if key in feature_cols]
)
Vous pouvez convertir un DataConnector vers un ensemble de données TensorFlow à utiliser avec des modèles TensorFlow. Les données sont chargées en flux continu pour une efficacité maximale.
L’exemple suivant convertit un DataConnector vers un ensemble de données TensorFlow :
from snowflake.ml.data.data_connector import DataConnector
# Create DataConnector (see previous examples)
# data_connector = DataConnector.from_dataframe(...)
# Convert to TensorFlow dataset
tf_ds = data_connector.to_tf_dataset(
batch_size=4,
shuffle=True,
drop_last_batch=True
)
for batch in tf_ds:
print(batch)
Utilisation avec des APIs d’entraînement distribué de Snowflake¶
Pour une performance optimale, vous pouvez transmettre un DataConnector directement vers les APIs d’entraînement distribué optimisé de Snowflake au lieu de convertir d’abord en ensemble de données pandas, PyTorch ou TensorFlow.
L’exemple suivant entraîne un modèle XGBoost à l’aide de l’estimateur XGBoost distribué de Snowflake :
from snowflake.ml.data.data_connector import DataConnector
from snowflake.ml.modeling.distributors.xgboost.xgboost_estimator import (
XGBEstimator,
XGBScalingConfig,
)
from snowflake.snowpark.context import get_active_session
session = get_active_session()
# Create DataConnector from a Snowpark dataframe
snowflake_df = session.table("TRAINING_TABLE")
data_connector = DataConnector.from_dataframe(snowflake_df)
# Create Snowflake XGBoost estimator
snowflake_est = XGBEstimator(
n_estimators=1,
objective="reg:squarederror",
scaling_config=XGBScalingConfig(use_gpu=False),
)
# Train using the data connector
# When using a data connector, input_cols and label_col must be provided
fit_booster = snowflake_est.fit(
data_connector,
input_cols=NUMERICAL_COLS,
label_col=LABEL_COL
)
Utiliser le fragmentage avec le distributeur PyTorch¶
Vous pouvez utiliser le ShardedDataConnector pour fragmenter vos données sur plusieurs nœuds pour un entraînement distribué avec le distributeur PyTorch Snowflake.
L’exemple suivant entraîne un modèle PyTorch sur l’ensemble de données de chiffres utilisant des données fragmentées dans plusieurs processus :
from sklearn import datasets
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector
from snowflake.ml.modeling.pytorch import (
PyTorchTrainer,
ScalingConfig,
WorkerResourceConfig,
getContext,
)
from torch import nn
from snowflake.snowpark.context import get_active_session
session = get_active_session()
# Create the Snowflake data from a Snowpark dataframe
digits = datasets.load_digits(as_frame=True).frame
digits_df = session.create_dataframe(digits)
# Create sharded data connector
sharded_data_connector = ShardedDataConnector.from_dataframe(digits_df)
# Define the PyTorch model
class DigitsModel(nn.Module):
def __init__(self):
super(DigitsModel, self).__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(8 * 8, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 10)
)
def forward(self, x):
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits
# Define training function that runs across multiple nodes or devices
# Each process receives a unique data shard
def train_func():
import os
import torch
import torch.distributed as dist
from torch.utils.data import DataLoader
from torch import nn
from torch.nn.parallel import DistributedDataParallel as DDP
# Get context with data shards and model directory
context = getContext()
dataset_map = context.get_dataset_map()
model_dir = context.get_model_dir()
training_data = dataset_map["train"].get_shard().to_torch_dataset()
train_dataloader = DataLoader(training_data, batch_size=batch_size, drop_last=True)
dist.init_process_group()
device = "cpu"
label_col = '"target"'
batch_size = 64
model = DDP(DigitsModel())
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
# Training loop
for epoch in range(5):
for batch, batch_data in enumerate(train_dataloader):
y = batch_data.pop(label_col).flatten().type(torch.LongTensor).to(device)
X = torch.concat(
[tensor.to(torch.float32) for tensor in batch_data.values()],
dim=-1,
).to(device)
pred = model(X)
loss = loss_fn(pred, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if batch % 100 == 0:
print(f"Epoch {epoch}, Batch {batch}, Loss: {loss.item()}")
# Save the model
if dist.get_rank() == 0:
torch.save(model.state_dict(), os.path.join(model_dir, "digits_model.pth"))
# Create PyTorch trainer with scaling configuration
pytorch_trainer = PyTorchTrainer(
train_func=train_func,
scaling_config=ScalingConfig(
num_nodes=1,
num_workers_per_node=4,
resource_requirements_per_worker=WorkerResourceConfig(num_cpus=1, num_gpus=0),
),
)
# Run distributed training
response = pytorch_trainer.run(
dataset_map=dict(
train=sharded_data_connector,
)
)
Chargement de données non structurées à partir de zones de préparation Snowflake¶
Utilisez les APIs DataSource Snowflake pour lire des données non structurées à partir de zones de préparation Snowflake. Chaque format de fichier possède une classe de source de données correspondante qui définit la manière de lire les données.
Les formats de fichier et les APIs correspondantes que vous utilisez pour charger les données sont les suivants :
Fichiers binaires :
SFStageBinaryFileDataSourceFichiers de texte :
SFStageTextDataSourceFichiers CSV :
SFStageCSVDataSourceFichiers Parquet :
SFStageParquetDataSourceFichiers d’images :
SFStageImageDataSource
Charger et traiter des données¶
Lorsque vous créez une source de données Snowflake, vous devez fournir les éléments suivants :
Le nom de la zone de préparation à partir de laquelle vous lisez les données
La base de données qui possède la zone de préparation (par défaut, la session actuelle)
Le schéma qui possède la zone de préparation (par défaut, la session actuelle)
Le modèle des fichiers de filtre lus à partir de la source de données (facultatif)
L’API de données ou le connecteur de données extrait tous les fichiers du chemin fourni qui correspond au modèle de fichier.
Après avoir défini la source de données Snowflake, vous pouvez charger les données dans un ensemble de données Ray. Avec l’ensemble de données Ray, vous pouvez effectuer les opérations suivantes :
Utiliser l’ensemble de données avec les APIs Ray
Transmettre l’ensemble de données à DataConnector
Convertir en ensemble de données pandas ou PyTorch si nécessaire.
L’exemple suivant permet d’effectuer les opérations suivantes :
Lit les fichiers Parquet d’une zone de préparation Snowflake vers un ensemble de données Ray
Convertit l’ensemble de données en DataConnector
import ray
from snowflake.ml.ray.datasource.stage_parquet_file_datasource import SFStageParquetDataSource
from snowflake.ml.data.data_connector import DataConnector
data_source = SFStageParquetDataSource(
stage_location="@stage/path/",
database="DB_NAME", # optional
schema="SCHEMA_NAME", # optional
file_pattern='*.parquet', # optional
)
# Build Ray dataset from provided datasources
ray_ds = ray.data.read_datasource(data_source)
dc = DataConnector.from_ray_dataset(ray_ds)
Réécrire les données structurées dans des tables Snowflake¶
Utilisez l’API DataSink Snowflake pour écrire des données structurées à partir de votre notebook ou de tâches ML vers une table Snowflake. Vous pouvez écrire des ensembles de données transformés ou de prédiction dans Snowflake pour une analyse ou un stockage ultérieur.
Pour définir un récepteur de données, il convient de fournir les éléments suivants :
Nom de la zone de préparation
Nom de la base de données (par défaut, la session actuelle)
Nom du schéma (par défaut, la session actuelle)
Modèle de fichier correspondant à des fichiers spécifiques (facultatif)
L’exemple suivant définit un récepteur de données :
from snowflake.ml.ray.datasink import SnowflakeTableDatasink
datasink = SnowflakeTableDatasink(
table_name="table_name",
database="db_name",
schema="schema_name",
auto_create_table=True, # create table if not exists
override=True # replace vs insert to table
)
Après avoir défini un récepteur de données, vous pouvez utiliser le code suivant pour écrire l’ensemble de données Ray dans une table Snowflake.
import ray
# Get Ray dataset from sources
ray_ds = ray.data.read_datasource(data_source)
# Setup transform operations, not executed yet
transformed_ds = ray_ds.map_batches(example_transform_batch_function)
# Start writing to Snowflake distributedly
transformed_ds.write_datasink(datasink)
Bonnes pratiques et considérations¶
Pour une performance et une utilisation optimales des ressources, tenez compte des bonnes pratiques suivantes :
Parallélisme : Concevez vos implémentations de sources de données pour tirer parti de la nature distribuée de Ray. Personnalisez les arguments de parallélisme et de concurrence pour mieux répondre à votre cas d’utilisation. Vous pouvez définir manuellement le nombre de ressources que vous affectez par tâche à chaque étape.
Partitionnement : Par défaut, la logique interne de Ray partitionnera l’ensemble de données en fonction des ressources et de la taille des données. Vous pouvez personnaliser le nombre de partitions pour choisir entre un grand nombre de petites tâches et un petit nombre de grandes tâches en fonction du cas d’utilisation grâce à ray_ds.repartition(X).
Bonnes pratiques : Pour en savoir plus, consulter Guide de l’utilisateur des données Ray.
Détails de l’API Ray :
Prochaines étapes¶
Après avoir chargé vos données, vous pouvez :
Utiliser le Feature Store pour la gestion des fonctionnalités