Snowflake ML Data Connector

Use o Snowflake ML Data Connector para ingerir dados de tabelas ou estágios do Snowflake em uma instância do Container Runtime (como uma sessão de notebook ou um trabalho de ML). O Data Connector usa o processamento distribuído do Container Runtime para acelerar o carregamento de dados e melhorar a eficiência da execução de pipelines de ML nos Snowflake Notebooks ou ML Jobs. Você pode executar fluxos de trabalho de ML baseados em Python no Snowflake usando os dados que carregou. Por exemplo, você pode dimensionar os pipelines de ML usando pacotes de código aberto. Para obter mais informações sobre o Container Runtime, consulte Container Runtime para ML.

Você pode usar o Data Connector para carregar dados de qualquer fonte de dados do Snowflake, como uma tabela ou um estágio, em um dataframe pandas. Esse dataframe pandas pode então ser usado com seus fluxos de trabalho de código aberto de ML no Snowflake. O Data Connector também fornece a funcionalidade para criar conjuntos de dados torch e tensorflow.

Além de usar fluxos de trabalho de código aberto, você também pode usar APIs distribuídas do Snowflake para treinar e ajustar modelos em escala.

O Data Connector é otimizado para funcionar no ambiente de contêineres. Fora do Container Runtime, o Data Connector usa um formato de troca de dados baseado no Apache Arrow para mover dados entre o Snowflake e o seu contêiner. O mesmo código funciona tanto dentro quanto fora do Snowflake.

Você pode usar o Data Connector com um Snowpark DataFrame ou um Snowflake Dataset. Os Snowpark DataFrames fornecem acesso direto aos dados das tabelas do Snowflake. Eles são mais bem utilizados durante o desenvolvimento.

Os Snowflake Datasets são objetos de nível de esquema com versão. Eles são mais bem utilizados para fluxos de trabalho de produção. Para obter mais informações sobre conjuntos de dados, consulte Conjuntos de dados do Snowflake.

Você pode usar o código a seguir para trazer o Snowpark DataFrame para o Container Runtime:

connector = DataConnector.from_dataframe(snowpark_df)
Copy

Você pode usar o código a seguir para trazer o conjunto de dados do Snowflake para o Container Runtime:

connector = DataConnector.from_dataset(snowflake_dataset)
Copy

O Data Connector usa o processamento distribuído para acelerar o carregamento de dados em objetos de dados de código aberto, como dataframes pandas, conjuntos de dados PyTorch ou conjuntos de dados TensorFlow. Se o to_pandas não estiver carregando dados no dataframe com rapidez suficiente, você pode usar o Data Connector para acelerar o processo.

Passar os conectores de dados diretamente para os fluxos de trabalho distribuídos do Snowflake melhora os tempos de execução do uso dos dados no fluxo de trabalho. Para obter mais informações, consulte Referência do Snowflake ML Container Runtime (Python).

Nota

Este tópico pressupõe que o módulo Snowpark ML esteja instalado. Caso contrário, consulte Como usar o Snowflake ML localmente.

Data Connector para o dataframe pandas

Você pode usar o código a seguir para carregar dados de uma tabela do Snowflake em um dataframe pandas.

from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session

session = get_active_session()

# Specify training table location
table_name = "TRAINING_TABLE"
# Load tabel into DataConnector
data_connector = DataConnector.from_dataframe(session.table(table_name))


import xgboost as xgb

pandas_df = data_connector.to_pandas()
label_column_name = 'TARGET'
X, y = pandas_df.drop(label_column_name, axis=1), pandas_df[label_column_name]

clf = xgb.Classifier()
clf.fit(X, y)
Copy

Data Connector para conjunto de dados PyTorch

O código a seguir mostra como você pode usar o Data Connector para carregar dados de uma tabela Snowflake para um conjunto de dados PyTorch.

import torch
import torch.nn as nn
from torch.utils.data import DataLoader

torch_dataset = data_connector.to_torch_dataset(batch_size=32)
dataloader = DataLoader(torch_dataset, batch_size=None)
label_col = 'TARGET'
feature_cols = ['FEATURE1', 'FEATURE2']

for batch_idx, batch in enumerate(dataloader):
  y = batch_data.pop(label_col).squeeze()
  X = torch.stack(
      [tensor.squeeze() for key, tensor in batch.items() if key in feature_cols]
  )
Copy

Data Connector para conjunto de dados TensorFlow

Para uso com TensorFlow, use o método to_tf_dataset() para obter um Tensorflow Dataset: a iteração sobre o conjunto de dados produz tensores TensorFlow em lote. Os dados são carregados de forma contínua para máxima eficiência.

tf_ds = connector.to_tf_dataset(
    batch_size=4,
    shuffle=True,
    drop_last_batch=True
)

for batch in tf_ds:
    print(batch)
Copy

Passando um Data Connector para o treinamento distribuído otimizado das APIs do Snowflake

Para obter o melhor desempenho, você pode passar um objeto do Data Connector para o treinamento distribuído otimizado das APIs do Snowflake.

O código a seguir mostra como você pode fazer o seguinte:

  1. Carregar dados de uma tabela do Snowflake para um objeto do Data Connector.

  2. Criar uma instância do classificador XGBoost do Snowflake.

  3. Treinar o classificador usando o objeto do Data Connector.

from sklearn.datasets import make_regression
from snowflake.ml.data.data_connector import DataConnector


from snowflake.ml.modeling.distributors.xgboost.xgboost_estimator import (
  XGBEstimator,
  XGBScalingConfig,
)

from snowflake.snowpark.context import get_active_session

session = get_active_session()

snowflake_df = session.create_dataframe(session.table(table_name))

# Create the data connector from a Snowpark dataframe.
# Data connectors can also be created from Snowflake Datasets.
data_connector = DataConnector.from_dataframe(snowflake_df)

snowflake_est = XGBEstimator(
  n_estimators=1,
  objective="reg:squarederror",
  scaling_config=XGBScalingConfig(use_gpu=False),
)

# If you use a data connector, input_cols and label_col must also be provided.
fit_booster = snowflake_est.fit(
  data_connector,
  input_cols = NUMERICAL_COLS,
  label_col = LABEL_COL
)
Copy

Usar fragmentação com o distribuidor PyTorch

É possível usar o Data Connector para fragmentar seus dados em vários nós em um Container Runtime. Você pode usar fragmentação para treinar modelos com o distribuidor PyTorch do Snowflake. Para obter informações sobre o distribuidor PyTorch, consulte Distribuidor PyTorch.

O código a seguir treina um modelo PyTorch no conjunto de dados de dígitos usando o Data Connector. Ele usa especificamente um nó com quatro processos em execução no nó. O código define um modelo PyTorch, uma função de treinamento e um treinador PyTorch. A função de treinamento é executada em vários nós, sendo que cada processo recebe um fragmento de dados exclusivo. Nenhuma GPUs foi usada neste exemplo, mas você pode definir o valor do parâmetro num_gpus como o número de GPUs que estiver usando.

from sklearn import datasets
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector

from snowflake.ml.modeling.pytorch import (
  PyTorchTrainer,
  ScalingConfig,
  WorkerResourceConfig,
  getContext,
)
from torch import nn
from snowflake.snowpark.context import get_active_session

session = get_active_session()

# Create the Snowflake data. We'll start with a Snowpark dataframe
digits = datasets.load_digits(as_frame=True).frame
digits_df = session.create_dataframe(digits)

# Create the data connector from a Snowpark dataframe.
# Data connectors can also be created from Snowflake Datasets.
sharded_data_connector = ShardedDataConnector.from_dataframe(digits_df)

# Define the model. This is OSS PyTorch code.
class DigitsModel(nn.Module):
    def __init__(self):
        super(DigitsModel, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(8 * 8, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

# Define the training function that will run across multiple nodes or devices.
# Each train_func process will receive a unique data shard.
def train_func():
    import os
    import torch
    import torch.distributed as dist
    from torch.utils.data import DataLoader
    from torch import nn
    from torch.nn.parallel import DistributedDataParallel as DDP

    # Context provides relevant information to training process, like data shards and model directory.
    context = getContext()
    dataset_map = context.get_dataset_map()
    model_dir = context.get_model_dir()
    training_data = dataset_map["train"].get_shard().to_torch_dataset()
    train_dataloader = DataLoader(training_data, batch_size=batch_size, drop_last=True)

    # The rest of this code is OSS pytorch code.
    dist.init_process_group()
    device = "cpu"
    label_col = '"target"'
    batch_size = 64

    model = DDP(DigitsModel())

    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

    # Training loop
    for epoch in range(5):
        for batch, batch_data in enumerate(train_dataloader):
            y = batch_data.pop(label_col).flatten().type(torch.LongTensor).to(device)
            X = torch.concat(
                [tensor.to(torch.float32) for tensor in batch_data.values()],
                dim=-1,
          )  .to(device)
            pred = model(X)
            loss = loss_fn(pred, y)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if batch % 100 == 0:
                print(f"Epoch {epoch}, Batch {batch}, Loss: {loss.item()}")

    # Save the model
    if dist.get_rank() == 0:
        torch.save(model.state_dict(), os.path.join(model_dir, "digits_model.pth"))


pytroch_trainer = PyTorchTrainer(
    train_func=train_func,
    scaling_config=ScalingConfig(
        num_nodes=1,
        num_workers_per_node=4,
        resource_requirements_per_worker=WorkerResourceConfig(num_cpus=1, num_gpus=0),
    ),
)
response = pytroch_trainer.run(
    dataset_map=dict(
        train=sharded_data_connector,
    )
)
Copy

Havia várias APIs usadas no código anterior. Para obter mais informações, consulte Referência do Snowflake ML Container Runtime (Python).