Snowflake ML Data Connector¶
Use the Snowflake ML Data Connector to ingest data from Snowflake tables or stages into a container runtime instance (such as a notebook session or ML job). The data connector uses the container runtime’s distributed processing to speed up data loading and improve the efficiency of running ML pipelines in Snowflake Notebooks or ML Jobs. You can run Python-based ML workflows in Snowflake using the data you’ve loaded. For example, you can scale ML pipelines using open source packages. For more information about the container runtime, see Container Runtime for ML.
You can use the data connector to load data from any Snowflake data source, such as a table or stage, into a pandas dataframe. That pandas dataframe can then be used with your open source ML workflows in Snowflake. The data connector also provides the functionality to create torch and tensorflow datasets.
In addition to using open source workflows, you can also use Snowflake’s distributed APIs to train and tune models at scale.
The data connector is optimized to work within the container environment. Outside of the container runtime, the data connector uses an Apache Arrow-based data exchange format to move data between Snowflake and your container. The same code works both inside and outside of Snowflake.
You can use the Data Connector with either a Snowpark DataFrame or a Snowflake Dataset. Snowpark DataFrames provide direct access to the data in your Snowflake tables. They’re best used during development.
Snowflake Datasets are versioned schema-level objects. They’re best used for production workflows. For more information about datasets, see Snowflake Datasets.
You can use the following code to bring your Snowpark DataFrame into the container runtime:
connector = DataConnector.from_dataframe(snowpark_df)
You can use the following code to bring your Snowflake Dataset into the container runtime:
connector = DataConnector.from_dataset(snowflake_dataset)
The data connector uses distributed processing to accelerate data loading into open source data objects, such as pandas dataframes, PyTorch datasets, or TensorFlow datasets.
If to_pandas
isn’t loading data into the dataframe quickly enough, you can use the data connector to speed up the process.
Passing the data connectors directly into Snowflake distributed workflows improves the run times of using the data in the workflow. For more information, see Snowflake ML Container Runtime Reference (Python).
Note
This topic assumes that the Snowpark ML module is installed. If it isn’t, see Using Snowflake ML Locally.
Data Connector to pandas dataframe¶
You can use the following code to load data from a Snowflake table into a pandas dataframe.
from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session
session = get_active_session()
# Specify training table location
table_name = "TRAINING_TABLE"
# Load tabel into DataConnector
data_connector = DataConnector.from_dataframe(session.table(table_name))
import xgboost as xgb
pandas_df = data_connector.to_pandas()
label_column_name = 'TARGET'
X, y = pandas_df.drop(label_column_name, axis=1), pandas_df[label_column_name]
clf = xgb.Classifier()
clf.fit(X, y)
Data Connector to PyTorch dataset¶
The following code shows how you can use the data connector to load data from a Snowflake table into a PyTorch dataset.
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
torch_dataset = data_connector.to_torch_dataset(batch_size=32)
dataloader = DataLoader(torch_dataset, batch_size=None)
label_col = 'TARGET'
feature_cols = ['FEATURE1', 'FEATURE2']
for batch_idx, batch in enumerate(dataloader):
y = batch_data.pop(label_col).squeeze()
X = torch.stack(
[tensor.squeeze() for key, tensor in batch.items() if key in feature_cols]
)
Data Connector to TensorFlow dataset¶
For usage with TensorFlow, use the to_tf_dataset()
method to get a Tensorflow Dataset:
Iterating over the Dataset yields batched TensorFlow tensors. Data is loaded in a streaming fashion
for maximum efficiency.
tf_ds = connector.to_tf_dataset(
batch_size=4,
shuffle=True,
drop_last_batch=True
)
for batch in tf_ds:
print(batch)
Passing a data connector to Snowflake’s optimized distributed training APIs¶
For best performance, you can pass a data connector object to Snowflake’s optimized distributed training APIs.
The following code shows how you can do the following:
Load data from a Snowflake table into a data connector object.
Create an instance of Snowflake’s XGBoost classifier.
Train the classifier using the data connector object.
from sklearn.datasets import make_regression
from snowflake.ml.data.data_connector import DataConnector
from snowflake.ml.modeling.distributors.xgboost.xgboost_estimator import (
XGBEstimator,
XGBScalingConfig,
)
from snowflake.snowpark.context import get_active_session
session = get_active_session()
snowflake_df = session.create_dataframe(session.table(table_name))
# Create the data connector from a Snowpark dataframe.
# Data connectors can also be created from Snowflake Datasets.
data_connector = DataConnector.from_dataframe(snowflake_df)
snowflake_est = XGBEstimator(
n_estimators=1,
objective="reg:squarederror",
scaling_config=XGBScalingConfig(use_gpu=False),
)
# If you use a data connector, input_cols and label_col must also be provided.
fit_booster = snowflake_est.fit(
data_connector,
input_cols = NUMERICAL_COLS,
label_col = LABEL_COL
)
Using sharding with the PyTorch distributor¶
You can use the data connector to shard your data across multiple nodes in a container runtime. You can use sharding to train models with the Snowflake PyTorch distributor. For information about the PyTorch distributor, see PyTorch Distributor.
The following code trains a PyTorch model on the digits dataset using the data connector.
It uses specifies one node with four processes running on the node.
The code defines a PyTorch model, a training function, and a PyTorch trainer.
The training function runs across multiple nodes with each process receiving a unique data shard.
No GPUs were used in this example, but you can set the value of the num_gpus
parameter to the number of GPUs you’re using.
from sklearn import datasets
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector
from snowflake.ml.modeling.pytorch import (
PyTorchTrainer,
ScalingConfig,
WorkerResourceConfig,
getContext,
)
from torch import nn
from snowflake.snowpark.context import get_active_session
session = get_active_session()
# Create the Snowflake data. We'll start with a Snowpark dataframe
digits = datasets.load_digits(as_frame=True).frame
digits_df = session.create_dataframe(digits)
# Create the data connector from a Snowpark dataframe.
# Data connectors can also be created from Snowflake Datasets.
sharded_data_connector = ShardedDataConnector.from_dataframe(digits_df)
# Define the model. This is OSS PyTorch code.
class DigitsModel(nn.Module):
def __init__(self):
super(DigitsModel, self).__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(8 * 8, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 10)
)
def forward(self, x):
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits
# Define the training function that will run across multiple nodes or devices.
# Each train_func process will receive a unique data shard.
def train_func():
import os
import torch
import torch.distributed as dist
from torch.utils.data import DataLoader
from torch import nn
from torch.nn.parallel import DistributedDataParallel as DDP
# Context provides relevant information to training process, like data shards and model directory.
context = getContext()
dataset_map = context.get_dataset_map()
model_dir = context.get_model_dir()
training_data = dataset_map["train"].get_shard().to_torch_dataset()
train_dataloader = DataLoader(training_data, batch_size=batch_size, drop_last=True)
# The rest of this code is OSS pytorch code.
dist.init_process_group()
device = "cpu"
label_col = '"target"'
batch_size = 64
model = DDP(DigitsModel())
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
# Training loop
for epoch in range(5):
for batch, batch_data in enumerate(train_dataloader):
y = batch_data.pop(label_col).flatten().type(torch.LongTensor).to(device)
X = torch.concat(
[tensor.to(torch.float32) for tensor in batch_data.values()],
dim=-1,
) .to(device)
pred = model(X)
loss = loss_fn(pred, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if batch % 100 == 0:
print(f"Epoch {epoch}, Batch {batch}, Loss: {loss.item()}")
# Save the model
if dist.get_rank() == 0:
torch.save(model.state_dict(), os.path.join(model_dir, "digits_model.pth"))
pytroch_trainer = PyTorchTrainer(
train_func=train_func,
scaling_config=ScalingConfig(
num_nodes=1,
num_workers_per_node=4,
resource_requirements_per_worker=WorkerResourceConfig(num_cpus=1, num_gpus=0),
),
)
response = pytroch_trainer.run(
dataset_map=dict(
train=sharded_data_connector,
)
)
There were multiple APIs used the in the preceding code. For more information, see Snowflake ML Container Runtime Reference (Python).