Carregar e gravar dados¶
Use o Snowflake ML para carregar com eficiência os dados das tabelas e estágios do Snowflake em seus fluxos de trabalho de aprendizado de máquina. O Snowflake ML fornece recursos otimizados de carregamento de dados que aproveitam o processamento distribuído do Snowflake para acelerar a ingestão de dados para seus fluxos de trabalho de treinamento e inferência.
Você pode carregar e processar dados usando:
Notebooks Snowflake: Ambiente de desenvolvimento interativo para explorar dados e criar modelos de ML. Para obter mais informações, consulte Notebooks no Container Runtime para ML.
Trabalhos de ML do Snowflake: Execute suas cargas de trabalho de ML de forma assíncrona de qualquer ambiente de desenvolvimento. Para obter mais informações, consulte Snowflake ML Jobs.
Tanto Notebooks quanto trabalhos de ML são executados no Container Runtime for ML, que fornece ambientes pré-configurados otimizados para cargas de trabalho de aprendizado de máquina com recursos de processamento distribuído. O Container Runtime usa o Ray, uma estrutura de código aberto para computação distribuída, para processar dados com eficiência em vários nós de computação. Para obter mais informações sobre o Container Runtime para ML, consulte Container Runtime para ML.
O Snowflake ML fornece diferentes APIs para carregamento de dados estruturados e não estruturados:
Dados estruturados (tabelas e conjuntos de dados)
DataConnector: Carregue dados de tabelas do Snowflake e conjuntos de dados do Snowflake. Para obter mais informações, consulte Carregamento de dados estruturados de tabelas do Snowflake.
DataSink: Gravar dados de volta nas tabelas do Snowflake. Para obter mais informações, consulte Gravação de dados estruturados nas tabelas do Snowflake.
Dados não estruturados (arquivos em estágios)
APIs DataSource: Carregamento de dados de vários formatos de arquivo (CSV, Parquet, imagens e muito mais) dos estágios do Snowflake. Para obter mais informações, consulte Carregamento de dados não estruturados dos estágios do Snowflake.
A tabela a seguir pode ajudar você a escolher a correta API para seu caso de uso:
Tipo de dados |
Fonte de dados |
API para carregamento |
API para escrita |
|---|---|---|---|
Estruturado |
Tabelas Snowflake |
DataConnector |
DataSink |
Estruturado |
Conjuntos de dados do Snowflake |
DataConnector |
DataSink |
Não estruturado |
CSV Arquivos (estágio) |
DataSource API |
N/A |
Não estruturado |
Arquivos Parquet (estágio) |
DataSource API |
N/A |
Não estruturado |
Outros arquivos preparados |
DataSource API |
N/A |
Carregamento de dados estruturados de tabelas do Snowflake¶
Use a Snowflake DataConnector para carregar dados estruturados de tabelas do Snowflake e conjuntos de dados do Snowflake em um notebook Snowflake ou Snowflake ML Trabalho. O DataConnector acelera o carregamento de dados paralelizando as leituras em vários nós de computação.
O DataConnector funciona com Snowpark DataFrames ou conjuntos de dados Snowflake:
Snowpark DataFrames: Forneça acesso direto aos dados em suas tabelas do Snowflake. Melhor usado durante o desenvolvimento.
Conjuntos de dados do Snowflake: Objetos em nível de esquema com versão. Melhor usado para fluxos de trabalho de produção. Para obter mais informações, consulte Conjuntos de dados do Snowflake.
Depois de paralelizar as leituras, o DataConnector pode converter os dados em uma das seguintes estruturas de dados:
dataframe pandas
Conjunto de dados do PyTorch
Conjunto de dados do TensorFlow
Criar uma DataConnector¶
Você pode criar um DataConnector de um Snowpark DataFrame ou um conjunto de dados do Snowflake.
Use o código a seguir para criar uma DataConnector de uma Snowpark DataFrame:
from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session
session = get_active_session()
# Create DataConnector from a Snowflake table
data_connector = DataConnector.from_dataframe(session.table("example-table-name"))
Use o código a seguir para criar uma DataConnector de um conjunto de dados Snowflake:
from snowflake.ml.data.data_connector import DataConnector
# Create DataConnector from a Snowflake Dataset
data_connector = DataConnector.from_dataset(snowflake_dataset)
Converter DataConnector para outros formatos¶
Depois de criar uma DataConnector, você pode convertê-lo em diferentes estruturas de dados para uso com vários ML Estruturas.
Você pode converter um DataConnector para um dataframe do pandas para uso com o scikit-learn e outras bibliotecas compatíveis com o pandas.
O exemplo a seguir carrega dados de uma tabela Snowflake em um dataframe pandas e treina um XGBoost Classificador:
from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session
import xgboost as xgb
session = get_active_session()
# Specify training table location
table_name = "TRAINING_TABLE"
# Load table into DataConnector
data_connector = DataConnector.from_dataframe(session.table(table_name))
# Convert to pandas dataframe
pandas_df = data_connector.to_pandas()
# Prepare features and labels
label_column_name = 'TARGET'
X, y = pandas_df.drop(label_column_name, axis=1), pandas_df[label_column_name]
# Train classifier
clf = xgb.Classifier()
clf.fit(X, y)
Você pode converter um DataConnector para um PyTorch Conjunto de dados para uso com PyTorch Modelos de e carregamentos de dados.
O exemplo a seguir carrega dados de uma tabela Snowflake em um PyTorch Conjunto de dados:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from snowflake.ml.data.data_connector import DataConnector
# Create DataConnector (see previous examples)
# data_connector = DataConnector.from_dataframe(...)
# Convert to PyTorch dataset
torch_dataset = data_connector.to_torch_dataset(batch_size=32)
dataloader = DataLoader(torch_dataset, batch_size=None)
label_col = 'TARGET'
feature_cols = ['FEATURE1', 'FEATURE2']
for batch_idx, batch in enumerate(dataloader):
y = batch_data.pop(label_col).squeeze()
X = torch.stack(
[tensor.squeeze() for key, tensor in batch.items() if key in feature_cols]
)
Você pode converter um DataConnector para um TensorFlow Conjunto de dados para uso com TensorFlow Modelos. Os dados são carregados de forma contínua para máxima eficiência.
O exemplo a seguir converte um DataConnector para um TensorFlow Conjunto de dados:
from snowflake.ml.data.data_connector import DataConnector
# Create DataConnector (see previous examples)
# data_connector = DataConnector.from_dataframe(...)
# Convert to TensorFlow dataset
tf_ds = data_connector.to_tf_dataset(
batch_size=4,
shuffle=True,
drop_last_batch=True
)
for batch in tf_ds:
print(batch)
Usar com APIs de treinamento distribuído do Snowflake¶
Para obter melhor desempenho, você pode passar um DataConnector diretamente para o treinamento distribuído otimizado das APIs Snowflake em vez de converter primeiro para conjuntos de dados pandas, PyTorch ou TensorFlow.
O exemplo a seguir treina um modelo XGBoost usando um estimador XGBoost distribuído do Snowflake:
from snowflake.ml.data.data_connector import DataConnector
from snowflake.ml.modeling.distributors.xgboost.xgboost_estimator import (
XGBEstimator,
XGBScalingConfig,
)
from snowflake.snowpark.context import get_active_session
session = get_active_session()
# Create DataConnector from a Snowpark dataframe
snowflake_df = session.table("TRAINING_TABLE")
data_connector = DataConnector.from_dataframe(snowflake_df)
# Create Snowflake XGBoost estimator
snowflake_est = XGBEstimator(
n_estimators=1,
objective="reg:squarederror",
scaling_config=XGBScalingConfig(use_gpu=False),
)
# Train using the data connector
# When using a data connector, input_cols and label_col must be provided
fit_booster = snowflake_est.fit(
data_connector,
input_cols=NUMERICAL_COLS,
label_col=LABEL_COL
)
Uso da fragmentação com distribuidor PyTorch¶
Você pode usar o ShardedDataConnector para compartilhar seus dados em vários nós para treinamento distribuído com o distribuidor Snowflake PyTorch.
O exemplo a seguir treina um modelo PyTorch no conjunto de dados de dígitos usando dados fragmentados em múltiplos processos:
from sklearn import datasets
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector
from snowflake.ml.modeling.pytorch import (
PyTorchTrainer,
ScalingConfig,
WorkerResourceConfig,
getContext,
)
from torch import nn
from snowflake.snowpark.context import get_active_session
session = get_active_session()
# Create the Snowflake data from a Snowpark dataframe
digits = datasets.load_digits(as_frame=True).frame
digits_df = session.create_dataframe(digits)
# Create sharded data connector
sharded_data_connector = ShardedDataConnector.from_dataframe(digits_df)
# Define the PyTorch model
class DigitsModel(nn.Module):
def __init__(self):
super(DigitsModel, self).__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(8 * 8, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 10)
)
def forward(self, x):
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits
# Define training function that runs across multiple nodes or devices
# Each process receives a unique data shard
def train_func():
import os
import torch
import torch.distributed as dist
from torch.utils.data import DataLoader
from torch import nn
from torch.nn.parallel import DistributedDataParallel as DDP
# Get context with data shards and model directory
context = getContext()
dataset_map = context.get_dataset_map()
model_dir = context.get_model_dir()
training_data = dataset_map["train"].get_shard().to_torch_dataset()
train_dataloader = DataLoader(training_data, batch_size=batch_size, drop_last=True)
dist.init_process_group()
device = "cpu"
label_col = '"target"'
batch_size = 64
model = DDP(DigitsModel())
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
# Training loop
for epoch in range(5):
for batch, batch_data in enumerate(train_dataloader):
y = batch_data.pop(label_col).flatten().type(torch.LongTensor).to(device)
X = torch.concat(
[tensor.to(torch.float32) for tensor in batch_data.values()],
dim=-1,
).to(device)
pred = model(X)
loss = loss_fn(pred, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if batch % 100 == 0:
print(f"Epoch {epoch}, Batch {batch}, Loss: {loss.item()}")
# Save the model
if dist.get_rank() == 0:
torch.save(model.state_dict(), os.path.join(model_dir, "digits_model.pth"))
# Create PyTorch trainer with scaling configuration
pytorch_trainer = PyTorchTrainer(
train_func=train_func,
scaling_config=ScalingConfig(
num_nodes=1,
num_workers_per_node=4,
resource_requirements_per_worker=WorkerResourceConfig(num_cpus=1, num_gpus=0),
),
)
# Run distributed training
response = pytorch_trainer.run(
dataset_map=dict(
train=sharded_data_connector,
)
)
Carregamento de dados não estruturados dos estágios do Snowflake¶
Use a Snowflake DataSource APIs para ler dados não estruturados dos estágios do Snowflake. Cada formato de arquivo tem uma classe de fonte de dados correspondente que define como ler os dados.
A seguir mostramos os formatos de arquivo e os correspondentes APIs que você usa para carregar os dados:
Arquivos binários:
SFStageBinaryFileDataSourceArquivos de texto:
SFStageTextDataSourceArquivos CSV:
SFStageCSVDataSourceArquivos Parquet:
SFStageParquetDataSourceArquivos de imagem:
SFStageImageDataSource
Carregamento e processamento de dados¶
Ao criar um Snowflake Datasource, você deve fornecer o seguinte:
O nome do estágio do qual você está lendo os dados
O banco de dados que tem o estágio (o padrão é a sessão atual)
O esquema que tem o estágio (o padrão é a sessão atual)
O padrão para os arquivos de filtro sendo lidos da fonte de dados (opcional)
Os dados API ou o conector de dados recupera todos os arquivos dentro do caminho fornecido que corresponde ao padrão do arquivo.
Depois de definir a fonte de dados Snowflake, você pode carregar os dados em um conjunto de dados Ray. Com o conjunto de dados Ray, você pode fazer o seguinte:
Uso do conjunto de dados com Ray APIs
Passar o conjunto de dados para DataConnector
Conversão em pandas ou PyTorch Conjuntos de dados, se necessário.
O exemplo a seguir faz o seguinte:
Lê arquivos Parquet de um estágio Snowflake para um conjunto de dados Ray
Converte o conjunto de dados em um DataConnector
import ray
from snowflake.ml.ray.datasource.stage_parquet_file_datasource import SFStageParquetDataSource
from snowflake.ml.data.data_connector import DataConnector
data_source = SFStageParquetDataSource(
stage_location="@stage/path/",
database="DB_NAME", # optional
schema="SCHEMA_NAME", # optional
file_pattern='*.parquet', # optional
)
# Build Ray dataset from provided datasources
ray_ds = ray.data.read_datasource(data_source)
dc = DataConnector.from_ray_dataset(ray_ds)
Gravação de dados estruturados nas tabelas do Snowflake¶
Use a Snowflake DataSink API para gravar dados estruturados a partir de seu Notebook ou ML Trabalho de volta para uma tabela Snowflake. Você pode gravar conjuntos de dados transformados ou de previsão no Snowflake para análise ou armazenamento adicional.
Para definir um coletor de dados, forneça o seguinte:
Nome do estágio
Nome do banco de dados (o padrão é a sessão atual)
Nome do esquema (o padrão é a sessão atual)
Padrão de arquivo para corresponder a arquivos específicos (opcional)
O exemplo a seguir define um coletor de dados:
from snowflake.ml.ray.datasink import SnowflakeTableDatasink
datasink = SnowflakeTableDatasink(
table_name="table_name",
database="db_name",
schema="schema_name",
auto_create_table=True, # create table if not exists
override=True # replace vs insert to table
)
Depois de definir um coletor de dados, você pode usar o código a seguir para gravar o conjunto de dados do Ray em uma tabela do Snowflake.
import ray
# Get Ray dataset from sources
ray_ds = ray.data.read_datasource(data_source)
# Setup transform operations, not executed yet
transformed_ds = ray_ds.map_batches(example_transform_batch_function)
# Start writing to Snowflake distributedly
transformed_ds.write_datasink(datasink)
Práticas recomendadas e considerações¶
Para um desempenho ideal e utilização de recursos, considere as seguintes práticas recomendadas:
Paralelismo: Projete suas implementações de fonte de dados para aproveitar a natureza distribuída do Ray. Personalize os argumentos de paralelismo e simultaneidade para melhor atender ao seu caso de uso. Você pode definir manualmente quantos recursos está alocando por tarefa em cada etapa.
Particionamento: Por padrão, a lógica interna do Ray particionará o conjunto de dados com base nos recursos e no tamanho dos dados. Você pode personalizar o número de partições para escolher entre um grande número de tarefas pequenas vs um número pequeno de tarefas grandes com base no caso de uso com ray_ds.repartition(X).
Práticas recomendadas: Siga Guia do usuário de dados Ray para obter orientação adicional.
Ray API detalhes:
Próximos passos¶
Após carregar seus dados, você pode:
Uso do Feature Store para gerenciamento de recurso