Snowflake ML Data Connector

Use the Snowflake ML Data Connector to ingest data from Snowflake tables or stages into a container runtime instance (such as a notebook session or ML job). The data connector uses the container runtime’s distributed processing to speed up data loading and improve the efficiency of running ML pipelines in Snowflake Notebooks or ML Jobs. You can run Python-based ML workflows in Snowflake using the data you’ve loaded. For example, you can scale ML pipelines using open source packages. For more information about the container runtime, see Container Runtime for ML.

You can use the data connector to load data from any Snowflake data source, such as a table or stage, into a pandas dataframe. That pandas dataframe can then be used with your open source ML workflows in Snowflake. The data connector also provides the functionality to create torch and tensorflow datasets.

In addition to using open source workflows, you can also use Snowflake’s distributed APIs to train and tune models at scale.

The data connector is optimized to work within the container environment. Outside of the container runtime, the data connector uses an Apache Arrow-based data exchange format to move data between Snowflake and your container. The same code works both inside and outside of Snowflake.

You can use the Data Connector with either a Snowpark DataFrame or a Snowflake Dataset. Snowpark DataFrames provide direct access to the data in your Snowflake tables. They’re best used during development.

Snowflake Datasets are versioned schema-level objects. They’re best used for production workflows. For more information about datasets, see Snowflake Datasets.

You can use the following code to bring your Snowpark DataFrame into the container runtime:

connector = DataConnector.from_dataframe(snowpark_df)
Copy

You can use the following code to bring your Snowflake Dataset into the container runtime:

connector = DataConnector.from_dataset(snowflake_dataset)
Copy

The data connector uses distributed processing to accelerate data loading into open source data objects, such as pandas dataframes, PyTorch datasets, or TensorFlow datasets. If to_pandas isn’t loading data into the dataframe quickly enough, you can use the data connector to speed up the process.

Passing the data connectors directly into Snowflake distributed workflows improves the run times of using the data in the workflow. For more information, see Snowflake ML Container Runtime Reference (Python).

Note

This topic assumes that the Snowpark ML module is installed. If it isn’t, see Using Snowflake ML Locally.

Data Connector to pandas dataframe

You can use the following code to load data from a Snowflake table into a pandas dataframe.

from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session

session = get_active_session()

# Specify training table location
table_name = "TRAINING_TABLE"
# Load tabel into DataConnector
data_connector = DataConnector.from_dataframe(session.table(table_name))


import xgboost as xgb

pandas_df = data_connector.to_pandas()
label_column_name = 'TARGET'
X, y = pandas_df.drop(label_column_name, axis=1), pandas_df[label_column_name]

clf = xgb.Classifier()
clf.fit(X, y)
Copy

Data Connector to PyTorch dataset

The following code shows how you can use the data connector to load data from a Snowflake table into a PyTorch dataset.

import torch
import torch.nn as nn
from torch.utils.data import DataLoader

torch_dataset = data_connector.to_torch_dataset(batch_size=32)
dataloader = DataLoader(torch_dataset, batch_size=None)
label_col = 'TARGET'
feature_cols = ['FEATURE1', 'FEATURE2']

for batch_idx, batch in enumerate(dataloader):
  y = batch_data.pop(label_col).squeeze()
  X = torch.stack(
      [tensor.squeeze() for key, tensor in batch.items() if key in feature_cols]
  )
Copy

Data Connector to TensorFlow dataset

For usage with TensorFlow, use the to_tf_dataset() method to get a Tensorflow Dataset: Iterating over the Dataset yields batched TensorFlow tensors. Data is loaded in a streaming fashion for maximum efficiency.

tf_ds = connector.to_tf_dataset(
    batch_size=4,
    shuffle=True,
    drop_last_batch=True
)

for batch in tf_ds:
    print(batch)
Copy

Passing a data connector to Snowflake’s optimized distributed training APIs

For best performance, you can pass a data connector object to Snowflake’s optimized distributed training APIs.

The following code shows how you can do the following:

  1. Load data from a Snowflake table into a data connector object.

  2. Create an instance of Snowflake’s XGBoost classifier.

  3. Train the classifier using the data connector object.

from sklearn.datasets import make_regression
from snowflake.ml.data.data_connector import DataConnector


from snowflake.ml.modeling.distributors.xgboost.xgboost_estimator import (
  XGBEstimator,
  XGBScalingConfig,
)

from snowflake.snowpark.context import get_active_session

session = get_active_session()

snowflake_df = session.create_dataframe(session.table(table_name))

# Create the data connector from a Snowpark dataframe.
# Data connectors can also be created from Snowflake Datasets.
data_connector = DataConnector.from_dataframe(snowflake_df)

snowflake_est = XGBEstimator(
  n_estimators=1,
  objective="reg:squarederror",
  scaling_config=XGBScalingConfig(use_gpu=False),
)

# If you use a data connector, input_cols and label_col must also be provided.
fit_booster = snowflake_est.fit(
  data_connector,
  input_cols = NUMERICAL_COLS,
  label_col = LABEL_COL
)
Copy

Using sharding with the PyTorch distributor

You can use the data connector to shard your data across multiple nodes in a container runtime. You can use sharding to train models with the Snowflake PyTorch distributor. For information about the PyTorch distributor, see PyTorch Distributor.

The following code trains a PyTorch model on the digits dataset using the data connector. It uses specifies one node with four processes running on the node. The code defines a PyTorch model, a training function, and a PyTorch trainer. The training function runs across multiple nodes with each process receiving a unique data shard. No GPUs were used in this example, but you can set the value of the num_gpus parameter to the number of GPUs you’re using.

from sklearn import datasets
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector

from snowflake.ml.modeling.pytorch import (
  PyTorchTrainer,
  ScalingConfig,
  WorkerResourceConfig,
  getContext,
)
from torch import nn
from snowflake.snowpark.context import get_active_session

session = get_active_session()

# Create the Snowflake data. We'll start with a Snowpark dataframe
digits = datasets.load_digits(as_frame=True).frame
digits_df = session.create_dataframe(digits)

# Create the data connector from a Snowpark dataframe.
# Data connectors can also be created from Snowflake Datasets.
sharded_data_connector = ShardedDataConnector.from_dataframe(digits_df)

# Define the model. This is OSS PyTorch code.
class DigitsModel(nn.Module):
    def __init__(self):
        super(DigitsModel, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(8 * 8, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

# Define the training function that will run across multiple nodes or devices.
# Each train_func process will receive a unique data shard.
def train_func():
    import os
    import torch
    import torch.distributed as dist
    from torch.utils.data import DataLoader
    from torch import nn
    from torch.nn.parallel import DistributedDataParallel as DDP

    # Context provides relevant information to training process, like data shards and model directory.
    context = getContext()
    dataset_map = context.get_dataset_map()
    model_dir = context.get_model_dir()
    training_data = dataset_map["train"].get_shard().to_torch_dataset()
    train_dataloader = DataLoader(training_data, batch_size=batch_size, drop_last=True)

    # The rest of this code is OSS pytorch code.
    dist.init_process_group()
    device = "cpu"
    label_col = '"target"'
    batch_size = 64

    model = DDP(DigitsModel())

    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

    # Training loop
    for epoch in range(5):
        for batch, batch_data in enumerate(train_dataloader):
            y = batch_data.pop(label_col).flatten().type(torch.LongTensor).to(device)
            X = torch.concat(
                [tensor.to(torch.float32) for tensor in batch_data.values()],
                dim=-1,
          )  .to(device)
            pred = model(X)
            loss = loss_fn(pred, y)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if batch % 100 == 0:
                print(f"Epoch {epoch}, Batch {batch}, Loss: {loss.item()}")

    # Save the model
    if dist.get_rank() == 0:
        torch.save(model.state_dict(), os.path.join(model_dir, "digits_model.pth"))


pytroch_trainer = PyTorchTrainer(
    train_func=train_func,
    scaling_config=ScalingConfig(
        num_nodes=1,
        num_workers_per_node=4,
        resource_requirements_per_worker=WorkerResourceConfig(num_cpus=1, num_gpus=0),
    ),
)
response = pytroch_trainer.run(
    dataset_map=dict(
        train=sharded_data_connector,
    )
)
Copy

There were multiple APIs used the in the preceding code. For more information, see Snowflake ML Container Runtime Reference (Python).