Chargement et écriture de données

Utilisez Snowflake ML pour charger efficacement les données des tables et des zones de préparation Snowflake dans vos workflows de machine learning. Snowflake ML fournit des capacités de chargement de données optimisées qui tirent parti du traitement distribué de Snowflake pour accélérer l’ingestion de données pour vos workflows d’entraînement et d’inférence.

Vous pouvez charger et traiter des données à l’aide de :

  • Snowflake Notebooks : Environnement de développement interactif pour explorer les données et créer des modèles ML. Pour plus d’informations, voir Notebooks sur Container Runtime pour ML.

  • Tâches ML Snowflake : Exécutez vos charges de travail ML de manière asynchrone à partir de n’importe quel environnement de développement. Pour plus d’informations, voir Tâches Snowflake ML.

Les tâches ML et les notebooks s’exécutent sur Container Runtime pour ML, qui fournit des environnements préconfigurés optimisés pour les charges de travail de machine learning avec des capacités de traitement distribuées. Container Runtime utilise Ray, un framework open-source pour le calcul distribué, pour traiter efficacement les données sur plusieurs nœuds de calcul. Pour plus d’informations sur Container Runtime pour ML, voir Container Runtime pour ML.

Snowflake ML fournit différentes APIs pour le chargement de données structurées et non structurées :

Données structurées (tables et ensembles de données)

Données non structurées (fichiers dans des zones de préparation)

Le tableau suivant peut vous aider à choisir la bonne API pour votre cas d’utilisation :

Sources de données et APIs

Type de données

Source de données

API pour le chargement

API pour l’écriture

Structuré

Tables Snowflake

DataConnector

DataSink

Structuré

Ensembles de données Snowflake

DataConnector

DataSink

Non structuré

Fichiers CSV (zone de préparation)

DataSource API

N/A

Non structuré

Fichiers Parquet (zone de préparation)

DataSource API

N/A

Non structuré

Autres fichiers en zone de préparation

DataSource API

N/A

Charger des données structurées à partir de tables Snowflake

Utilisez le DataConnector Snowflake pour charger des données structurées à partir de tables Snowflake et d’ensembles de données Snowflake dans un notebook Snowflake ou une tâche ML Snowflake. Le DataConnector accélère le chargement des données en parallélisant les lectures sur plusieurs nœuds de calcul.

Le DataConnector fonctionne avec les DataFrames Snowpark ou les ensembles de données Snowflake :

  • DataFrames Snowpark : Fournissez un accès direct aux données de vos tables Snowflake. À utiliser de préférence pendant le développement.

  • Ensembles de données Snowflake : Objets de niveau schéma versionnés. À utiliser de préférence pour les workflows de production. Pour plus d’informations, voir Ensembles de données Snowflake.

Après la parallélisation des lectures, le DataConnector peut convertir les données dans l’une des structures de données suivantes :

  • dataframe pandas

  • PyTorch jeu de données

  • TensorFlow jeu de données

Créer une DataConnector

Vous pouvez créer un DataConnector depuis un DataFrame Snowpark ou un ensemble de données Snowflake.

Utilisez le code suivant pour créer un DataConnector depuis un DataFrame Snowpark :

from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session

session = get_active_session()

# Create DataConnector from a Snowflake table
data_connector = DataConnector.from_dataframe(session.table("example-table-name"))
Copy

Utilisez le code suivant pour créer un DataConnector depuis un ensemble de données Snowflake :

from snowflake.ml.data.data_connector import DataConnector

# Create DataConnector from a Snowflake Dataset
data_connector = DataConnector.from_dataset(snowflake_dataset)
Copy

Convertir DataConnector vers d’autres formats

Après avoir créé un DataConnector, vous pouvez le convertir en différentes structures de données pour l’utiliser avec divers frameworks ML.

Vous pouvez convertir un DataConnector vers un dataframe pandas à utiliser avec scikit-learn et d’autres bibliothèques compatibles pandas.

L’exemple suivant charge les données d’une table Snowflake dans un dataframe Pandas et forme un classificateur XGBoost :

from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session
import xgboost as xgb

session = get_active_session()

# Specify training table location
table_name = "TRAINING_TABLE"

# Load table into DataConnector
data_connector = DataConnector.from_dataframe(session.table(table_name))

# Convert to pandas dataframe
pandas_df = data_connector.to_pandas()

# Prepare features and labels
label_column_name = 'TARGET'
X, y = pandas_df.drop(label_column_name, axis=1), pandas_df[label_column_name]

# Train classifier
clf = xgb.Classifier()
clf.fit(X, y)
Copy

Utilisation avec des APIs d’entraînement distribué de Snowflake

Pour une performance optimale, vous pouvez transmettre un DataConnector directement vers les APIs d’entraînement distribué optimisé de Snowflake au lieu de convertir d’abord en ensemble de données pandas, PyTorch ou TensorFlow.

L’exemple suivant entraîne un modèle XGBoost à l’aide de l’estimateur XGBoost distribué de Snowflake :

from snowflake.ml.data.data_connector import DataConnector
from snowflake.ml.modeling.distributors.xgboost.xgboost_estimator import (
    XGBEstimator,
    XGBScalingConfig,
)
from snowflake.snowpark.context import get_active_session

session = get_active_session()

# Create DataConnector from a Snowpark dataframe
snowflake_df = session.table("TRAINING_TABLE")
data_connector = DataConnector.from_dataframe(snowflake_df)

# Create Snowflake XGBoost estimator
snowflake_est = XGBEstimator(
    n_estimators=1,
    objective="reg:squarederror",
    scaling_config=XGBScalingConfig(use_gpu=False),
)

# Train using the data connector
# When using a data connector, input_cols and label_col must be provided
fit_booster = snowflake_est.fit(
    data_connector,
    input_cols=NUMERICAL_COLS,
    label_col=LABEL_COL
)
Copy

Utiliser le fragmentage avec le distributeur PyTorch

Vous pouvez utiliser le ShardedDataConnector pour fragmenter vos données sur plusieurs nœuds pour un entraînement distribué avec le distributeur PyTorch Snowflake.

L’exemple suivant entraîne un modèle PyTorch sur l’ensemble de données de chiffres utilisant des données fragmentées dans plusieurs processus :

from sklearn import datasets
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector
from snowflake.ml.modeling.pytorch import (
    PyTorchTrainer,
    ScalingConfig,
    WorkerResourceConfig,
    getContext,
)
from torch import nn
from snowflake.snowpark.context import get_active_session

session = get_active_session()

# Create the Snowflake data from a Snowpark dataframe
digits = datasets.load_digits(as_frame=True).frame
digits_df = session.create_dataframe(digits)

# Create sharded data connector
sharded_data_connector = ShardedDataConnector.from_dataframe(digits_df)

# Define the PyTorch model
class DigitsModel(nn.Module):
    def __init__(self):
        super(DigitsModel, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(8 * 8, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

# Define training function that runs across multiple nodes or devices
# Each process receives a unique data shard
def train_func():
    import os
    import torch
    import torch.distributed as dist
    from torch.utils.data import DataLoader
    from torch import nn
    from torch.nn.parallel import DistributedDataParallel as DDP

    # Get context with data shards and model directory
    context = getContext()
    dataset_map = context.get_dataset_map()
    model_dir = context.get_model_dir()
    training_data = dataset_map["train"].get_shard().to_torch_dataset()
    train_dataloader = DataLoader(training_data, batch_size=batch_size, drop_last=True)

    dist.init_process_group()
    device = "cpu"
    label_col = '"target"'
    batch_size = 64

    model = DDP(DigitsModel())
    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

    # Training loop
    for epoch in range(5):
        for batch, batch_data in enumerate(train_dataloader):
            y = batch_data.pop(label_col).flatten().type(torch.LongTensor).to(device)
            X = torch.concat(
                [tensor.to(torch.float32) for tensor in batch_data.values()],
                dim=-1,
            ).to(device)
            pred = model(X)
            loss = loss_fn(pred, y)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if batch % 100 == 0:
                print(f"Epoch {epoch}, Batch {batch}, Loss: {loss.item()}")

    # Save the model
    if dist.get_rank() == 0:
        torch.save(model.state_dict(), os.path.join(model_dir, "digits_model.pth"))

# Create PyTorch trainer with scaling configuration
pytorch_trainer = PyTorchTrainer(
    train_func=train_func,
    scaling_config=ScalingConfig(
        num_nodes=1,
        num_workers_per_node=4,
        resource_requirements_per_worker=WorkerResourceConfig(num_cpus=1, num_gpus=0),
    ),
)

# Run distributed training
response = pytorch_trainer.run(
    dataset_map=dict(
        train=sharded_data_connector,
    )
)
Copy

Chargement de données non structurées à partir de zones de préparation Snowflake

Utilisez les APIs DataSource Snowflake pour lire des données non structurées à partir de zones de préparation Snowflake. Chaque format de fichier possède une classe de source de données correspondante qui définit la manière de lire les données.

Les formats de fichier et les APIs correspondantes que vous utilisez pour charger les données sont les suivants :

  • Fichiers binaires : SFStageBinaryFileDataSource

  • Fichiers de texte : SFStageTextDataSource

  • Fichiers CSV : SFStageCSVDataSource

  • Fichiers Parquet : SFStageParquetDataSource

  • Fichiers d’images : SFStageImageDataSource

Charger et traiter des données

Lorsque vous créez une source de données Snowflake, vous devez fournir les éléments suivants :

  • Le nom de la zone de préparation à partir de laquelle vous lisez les données

  • La base de données qui possède la zone de préparation (par défaut, la session actuelle)

  • Le schéma qui possède la zone de préparation (par défaut, la session actuelle)

  • Le modèle des fichiers de filtre lus à partir de la source de données (facultatif)

L’API de données ou le connecteur de données extrait tous les fichiers du chemin fourni qui correspond au modèle de fichier.

Après avoir défini la source de données Snowflake, vous pouvez charger les données dans un ensemble de données Ray. Avec l’ensemble de données Ray, vous pouvez effectuer les opérations suivantes :

  • Utiliser l’ensemble de données avec les APIs Ray

  • Transmettre l’ensemble de données à DataConnector

  • Convertir en ensemble de données pandas ou PyTorch si nécessaire.

L’exemple suivant permet d’effectuer les opérations suivantes :

  • Lit les fichiers Parquet d’une zone de préparation Snowflake vers un ensemble de données Ray

  • Convertit l’ensemble de données en DataConnector

import ray
from snowflake.ml.ray.datasource.stage_parquet_file_datasource import SFStageParquetDataSource
from snowflake.ml.data.data_connector import DataConnector

data_source = SFStageParquetDataSource(
    stage_location="@stage/path/",
    database="DB_NAME", # optional
    schema="SCHEMA_NAME", # optional
    file_pattern='*.parquet', # optional
)

# Build Ray dataset from provided datasources
ray_ds = ray.data.read_datasource(data_source)

dc = DataConnector.from_ray_dataset(ray_ds)
Copy

Réécrire les données structurées dans des tables Snowflake

Utilisez l’API DataSink Snowflake pour écrire des données structurées à partir de votre notebook ou de tâches ML vers une table Snowflake. Vous pouvez écrire des ensembles de données transformés ou de prédiction dans Snowflake pour une analyse ou un stockage ultérieur.

Pour définir un récepteur de données, il convient de fournir les éléments suivants :

  • Nom de la zone de préparation

  • Nom de la base de données (par défaut, la session actuelle)

  • Nom du schéma (par défaut, la session actuelle)

  • Modèle de fichier correspondant à des fichiers spécifiques (facultatif)

L’exemple suivant définit un récepteur de données :

from snowflake.ml.ray.datasink import SnowflakeTableDatasink
datasink = SnowflakeTableDatasink(
    table_name="table_name",
    database="db_name",
    schema="schema_name",
    auto_create_table=True, # create table if not exists
    override=True # replace vs insert to table
)
Copy

Après avoir défini un récepteur de données, vous pouvez utiliser le code suivant pour écrire l’ensemble de données Ray dans une table Snowflake.

import ray

# Get Ray dataset from sources
ray_ds = ray.data.read_datasource(data_source)

# Setup transform operations, not executed yet
transformed_ds = ray_ds.map_batches(example_transform_batch_function)

# Start writing to Snowflake distributedly
transformed_ds.write_datasink(datasink)
Copy

Bonnes pratiques et considérations

Pour une performance et une utilisation optimales des ressources, tenez compte des bonnes pratiques suivantes :

Parallélisme : Concevez vos implémentations de sources de données pour tirer parti de la nature distribuée de Ray. Personnalisez les arguments de parallélisme et de concurrence pour mieux répondre à votre cas d’utilisation. Vous pouvez définir manuellement le nombre de ressources que vous affectez par tâche à chaque étape.

Partitionnement : Par défaut, la logique interne de Ray partitionnera l’ensemble de données en fonction des ressources et de la taille des données. Vous pouvez personnaliser le nombre de partitions pour choisir entre un grand nombre de petites tâches et un petit nombre de grandes tâches en fonction du cas d’utilisation grâce à ray_ds.repartition(X).

Bonnes pratiques : Pour en savoir plus, consulter Guide de l’utilisateur des données Ray.

Détails de l’API Ray :

Prochaines étapes

Après avoir chargé vos données, vous pouvez :