Carregar e gravar dados

Use o Snowflake ML para carregar com eficiência os dados das tabelas e estágios do Snowflake em seus fluxos de trabalho de aprendizado de máquina. O Snowflake ML fornece recursos otimizados de carregamento de dados que aproveitam o processamento distribuído do Snowflake para acelerar a ingestão de dados para seus fluxos de trabalho de treinamento e inferência.

Você pode carregar e processar dados usando:

  • Notebooks Snowflake: Ambiente de desenvolvimento interativo para explorar dados e criar modelos de ML. Para obter mais informações, consulte Notebooks no Container Runtime para ML.

  • Trabalhos de ML do Snowflake: Execute suas cargas de trabalho de ML de forma assíncrona de qualquer ambiente de desenvolvimento. Para obter mais informações, consulte Snowflake ML Jobs.

Tanto Notebooks quanto trabalhos de ML são executados no Container Runtime for ML, que fornece ambientes pré-configurados otimizados para cargas de trabalho de aprendizado de máquina com recursos de processamento distribuído. O Container Runtime usa o Ray, uma estrutura de código aberto para computação distribuída, para processar dados com eficiência em vários nós de computação. Para obter mais informações sobre o Container Runtime para ML, consulte Container Runtime para ML.

O Snowflake ML fornece diferentes APIs para carregamento de dados estruturados e não estruturados:

Dados estruturados (tabelas e conjuntos de dados)

Dados não estruturados (arquivos em estágios)

A tabela a seguir pode ajudar você a escolher a correta API para seu caso de uso:

Fontes de dados e APIs

Tipo de dados

Fonte de dados

API para carregamento

API para escrita

Estruturado

Tabelas Snowflake

DataConnector

DataSink

Estruturado

Conjuntos de dados do Snowflake

DataConnector

DataSink

Não estruturado

CSV Arquivos (estágio)

DataSource API

N/A

Não estruturado

Arquivos Parquet (estágio)

DataSource API

N/A

Não estruturado

Outros arquivos preparados

DataSource API

N/A

Carregamento de dados estruturados de tabelas do Snowflake

Use a Snowflake DataConnector para carregar dados estruturados de tabelas do Snowflake e conjuntos de dados do Snowflake em um notebook Snowflake ou Snowflake ML Trabalho. O DataConnector acelera o carregamento de dados paralelizando as leituras em vários nós de computação.

O DataConnector funciona com Snowpark DataFrames ou conjuntos de dados Snowflake:

  • Snowpark DataFrames: Forneça acesso direto aos dados em suas tabelas do Snowflake. Melhor usado durante o desenvolvimento.

  • Conjuntos de dados do Snowflake: Objetos em nível de esquema com versão. Melhor usado para fluxos de trabalho de produção. Para obter mais informações, consulte Conjuntos de dados do Snowflake.

Depois de paralelizar as leituras, o DataConnector pode converter os dados em uma das seguintes estruturas de dados:

  • dataframe pandas

  • Conjunto de dados do PyTorch

  • Conjunto de dados do TensorFlow

Criar uma DataConnector

Você pode criar um DataConnector de um Snowpark DataFrame ou um conjunto de dados do Snowflake.

Use o código a seguir para criar uma DataConnector de uma Snowpark DataFrame:

from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session

session = get_active_session()

# Create DataConnector from a Snowflake table
data_connector = DataConnector.from_dataframe(session.table("example-table-name"))
Copy

Use o código a seguir para criar uma DataConnector de um conjunto de dados Snowflake:

from snowflake.ml.data.data_connector import DataConnector

# Create DataConnector from a Snowflake Dataset
data_connector = DataConnector.from_dataset(snowflake_dataset)
Copy

Converter DataConnector para outros formatos

Depois de criar uma DataConnector, você pode convertê-lo em diferentes estruturas de dados para uso com vários ML Estruturas.

Você pode converter um DataConnector para um dataframe do pandas para uso com o scikit-learn e outras bibliotecas compatíveis com o pandas.

O exemplo a seguir carrega dados de uma tabela Snowflake em um dataframe pandas e treina um XGBoost Classificador:

from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session
import xgboost as xgb

session = get_active_session()

# Specify training table location
table_name = "TRAINING_TABLE"

# Load table into DataConnector
data_connector = DataConnector.from_dataframe(session.table(table_name))

# Convert to pandas dataframe
pandas_df = data_connector.to_pandas()

# Prepare features and labels
label_column_name = 'TARGET'
X, y = pandas_df.drop(label_column_name, axis=1), pandas_df[label_column_name]

# Train classifier
clf = xgb.Classifier()
clf.fit(X, y)
Copy

Usar com APIs de treinamento distribuído do Snowflake

Para obter melhor desempenho, você pode passar um DataConnector diretamente para o treinamento distribuído otimizado das APIs Snowflake em vez de converter primeiro para conjuntos de dados pandas, PyTorch ou TensorFlow.

O exemplo a seguir treina um modelo XGBoost usando um estimador XGBoost distribuído do Snowflake:

from snowflake.ml.data.data_connector import DataConnector
from snowflake.ml.modeling.distributors.xgboost.xgboost_estimator import (
    XGBEstimator,
    XGBScalingConfig,
)
from snowflake.snowpark.context import get_active_session

session = get_active_session()

# Create DataConnector from a Snowpark dataframe
snowflake_df = session.table("TRAINING_TABLE")
data_connector = DataConnector.from_dataframe(snowflake_df)

# Create Snowflake XGBoost estimator
snowflake_est = XGBEstimator(
    n_estimators=1,
    objective="reg:squarederror",
    scaling_config=XGBScalingConfig(use_gpu=False),
)

# Train using the data connector
# When using a data connector, input_cols and label_col must be provided
fit_booster = snowflake_est.fit(
    data_connector,
    input_cols=NUMERICAL_COLS,
    label_col=LABEL_COL
)
Copy

Uso da fragmentação com distribuidor PyTorch

Você pode usar o ShardedDataConnector para compartilhar seus dados em vários nós para treinamento distribuído com o distribuidor Snowflake PyTorch.

O exemplo a seguir treina um modelo PyTorch no conjunto de dados de dígitos usando dados fragmentados em múltiplos processos:

from sklearn import datasets
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector
from snowflake.ml.modeling.pytorch import (
    PyTorchTrainer,
    ScalingConfig,
    WorkerResourceConfig,
    getContext,
)
from torch import nn
from snowflake.snowpark.context import get_active_session

session = get_active_session()

# Create the Snowflake data from a Snowpark dataframe
digits = datasets.load_digits(as_frame=True).frame
digits_df = session.create_dataframe(digits)

# Create sharded data connector
sharded_data_connector = ShardedDataConnector.from_dataframe(digits_df)

# Define the PyTorch model
class DigitsModel(nn.Module):
    def __init__(self):
        super(DigitsModel, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(8 * 8, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

# Define training function that runs across multiple nodes or devices
# Each process receives a unique data shard
def train_func():
    import os
    import torch
    import torch.distributed as dist
    from torch.utils.data import DataLoader
    from torch import nn
    from torch.nn.parallel import DistributedDataParallel as DDP

    # Get context with data shards and model directory
    context = getContext()
    dataset_map = context.get_dataset_map()
    model_dir = context.get_model_dir()
    training_data = dataset_map["train"].get_shard().to_torch_dataset()
    train_dataloader = DataLoader(training_data, batch_size=batch_size, drop_last=True)

    dist.init_process_group()
    device = "cpu"
    label_col = '"target"'
    batch_size = 64

    model = DDP(DigitsModel())
    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

    # Training loop
    for epoch in range(5):
        for batch, batch_data in enumerate(train_dataloader):
            y = batch_data.pop(label_col).flatten().type(torch.LongTensor).to(device)
            X = torch.concat(
                [tensor.to(torch.float32) for tensor in batch_data.values()],
                dim=-1,
            ).to(device)
            pred = model(X)
            loss = loss_fn(pred, y)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if batch % 100 == 0:
                print(f"Epoch {epoch}, Batch {batch}, Loss: {loss.item()}")

    # Save the model
    if dist.get_rank() == 0:
        torch.save(model.state_dict(), os.path.join(model_dir, "digits_model.pth"))

# Create PyTorch trainer with scaling configuration
pytorch_trainer = PyTorchTrainer(
    train_func=train_func,
    scaling_config=ScalingConfig(
        num_nodes=1,
        num_workers_per_node=4,
        resource_requirements_per_worker=WorkerResourceConfig(num_cpus=1, num_gpus=0),
    ),
)

# Run distributed training
response = pytorch_trainer.run(
    dataset_map=dict(
        train=sharded_data_connector,
    )
)
Copy

Carregamento de dados não estruturados dos estágios do Snowflake

Use a Snowflake DataSource APIs para ler dados não estruturados dos estágios do Snowflake. Cada formato de arquivo tem uma classe de fonte de dados correspondente que define como ler os dados.

A seguir mostramos os formatos de arquivo e os correspondentes APIs que você usa para carregar os dados:

  • Arquivos binários: SFStageBinaryFileDataSource

  • Arquivos de texto: SFStageTextDataSource

  • Arquivos CSV: SFStageCSVDataSource

  • Arquivos Parquet: SFStageParquetDataSource

  • Arquivos de imagem: SFStageImageDataSource

Carregamento e processamento de dados

Ao criar um Snowflake Datasource, você deve fornecer o seguinte:

  • O nome do estágio do qual você está lendo os dados

  • O banco de dados que tem o estágio (o padrão é a sessão atual)

  • O esquema que tem o estágio (o padrão é a sessão atual)

  • O padrão para os arquivos de filtro sendo lidos da fonte de dados (opcional)

Os dados API ou o conector de dados recupera todos os arquivos dentro do caminho fornecido que corresponde ao padrão do arquivo.

Depois de definir a fonte de dados Snowflake, você pode carregar os dados em um conjunto de dados Ray. Com o conjunto de dados Ray, você pode fazer o seguinte:

  • Uso do conjunto de dados com Ray APIs

  • Passar o conjunto de dados para DataConnector

  • Conversão em pandas ou PyTorch Conjuntos de dados, se necessário.

O exemplo a seguir faz o seguinte:

  • Lê arquivos Parquet de um estágio Snowflake para um conjunto de dados Ray

  • Converte o conjunto de dados em um DataConnector

import ray
from snowflake.ml.ray.datasource.stage_parquet_file_datasource import SFStageParquetDataSource
from snowflake.ml.data.data_connector import DataConnector

data_source = SFStageParquetDataSource(
    stage_location="@stage/path/",
    database="DB_NAME", # optional
    schema="SCHEMA_NAME", # optional
    file_pattern='*.parquet', # optional
)

# Build Ray dataset from provided datasources
ray_ds = ray.data.read_datasource(data_source)

dc = DataConnector.from_ray_dataset(ray_ds)
Copy

Gravação de dados estruturados nas tabelas do Snowflake

Use a Snowflake DataSink API para gravar dados estruturados a partir de seu Notebook ou ML Trabalho de volta para uma tabela Snowflake. Você pode gravar conjuntos de dados transformados ou de previsão no Snowflake para análise ou armazenamento adicional.

Para definir um coletor de dados, forneça o seguinte:

  • Nome do estágio

  • Nome do banco de dados (o padrão é a sessão atual)

  • Nome do esquema (o padrão é a sessão atual)

  • Padrão de arquivo para corresponder a arquivos específicos (opcional)

O exemplo a seguir define um coletor de dados:

from snowflake.ml.ray.datasink import SnowflakeTableDatasink
datasink = SnowflakeTableDatasink(
    table_name="table_name",
    database="db_name",
    schema="schema_name",
    auto_create_table=True, # create table if not exists
    override=True # replace vs insert to table
)
Copy

Depois de definir um coletor de dados, você pode usar o código a seguir para gravar o conjunto de dados do Ray em uma tabela do Snowflake.

import ray

# Get Ray dataset from sources
ray_ds = ray.data.read_datasource(data_source)

# Setup transform operations, not executed yet
transformed_ds = ray_ds.map_batches(example_transform_batch_function)

# Start writing to Snowflake distributedly
transformed_ds.write_datasink(datasink)
Copy

Práticas recomendadas e considerações

Para um desempenho ideal e utilização de recursos, considere as seguintes práticas recomendadas:

Paralelismo: Projete suas implementações de fonte de dados para aproveitar a natureza distribuída do Ray. Personalize os argumentos de paralelismo e simultaneidade para melhor atender ao seu caso de uso. Você pode definir manualmente quantos recursos está alocando por tarefa em cada etapa.

Particionamento: Por padrão, a lógica interna do Ray particionará o conjunto de dados com base nos recursos e no tamanho dos dados. Você pode personalizar o número de partições para escolher entre um grande número de tarefas pequenas vs um número pequeno de tarefas grandes com base no caso de uso com ray_ds.repartition(X).

Práticas recomendadas: Siga Guia do usuário de dados Ray para obter orientação adicional.

Ray API detalhes:

Próximos passos

Após carregar seus dados, você pode: