Load and write data

Use Snowflake ML to efficiently load data from Snowflake tables and stages into your machine learning workflows. Snowflake ML provides optimized data loading capabilities that take advantage of Snowflake’s distributed processing to accelerate data ingestion for your training and inference workflows.

You can load and process data using:

  • Snowflake Notebooks: Interactive development environment for exploring data and building ML models. For more information, see Notebooks on Container Runtime for ML.

  • Snowflake ML Jobs: Run your ML workloads asynchronously from any development environment. For more information, see Snowflake ML Jobs.

Both Notebooks and ML Jobs run on the Container Runtime for ML, which provides preconfigured environments optimized for machine learning workloads with distributed processing capabilities. The Container Runtime uses Ray, an open-source framework for distributed computing, to efficiently process data across multiple compute nodes. For more information about the Container Runtime for ML, see Container Runtime for ML.

Snowflake ML provides different APIs for loading structured and unstructured data:

Structured data (tables and datasets)

Unstructured data (files in stages)

The following table can help you choose the right API for your use case:

Data Sources and APIs

Data Type

Data Source

API for Loading

API for Writing

Structured

Snowflake Tables

DataConnector

DataSink

Structured

Snowflake Datasets

DataConnector

DataSink

Unstructured

CSV Files (Stage)

DataSource API

N/A

Unstructured

Parquet Files (Stage)

DataSource API

N/A

Unstructured

Other Staged Files

DataSource API

N/A

Load structured data from Snowflake tables

Use the Snowflake DataConnector to load structured data from Snowflake tables and Snowflake Datasets into a Snowflake Notebook or Snowflake ML Job. The DataConnector accelerates data loading by parallelizing the reads across multiple compute nodes.

The DataConnector works with either Snowpark DataFrames or Snowflake Datasets:

  • Snowpark DataFrames: Provide direct access to the data in your Snowflake tables. Best used during development.

  • Snowflake Datasets: Versioned schema-level objects. Best used for production workflows. For more information, see Snowflake Datasets.

After parallelizing the reads, the DataConnector can convert the data into one of following data structures:

  • pandas dataframe

  • PyTorch dataset

  • TensorFlow dataset

Create a DataConnector

You can create a DataConnector from a Snowpark DataFrame or a Snowflake Dataset.

Use the following code to create a DataConnector from a Snowpark DataFrame:

from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session

session = get_active_session()

# Create DataConnector from a Snowflake table
data_connector = DataConnector.from_dataframe(session.table("example-table-name"))
Copy

Use the following code to create a DataConnector from a Snowflake Dataset:

from snowflake.ml.data.data_connector import DataConnector

# Create DataConnector from a Snowflake Dataset
data_connector = DataConnector.from_dataset(snowflake_dataset)
Copy

Convert DataConnector to other formats

After creating a DataConnector, you can convert it to different data structures for use with various ML frameworks.

You can convert a DataConnector to a pandas dataframe for use with scikit-learn and other pandas-compatible libraries.

The following example loads data from a Snowflake table into a pandas dataframe and trains an XGBoost classifier:

from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session
import xgboost as xgb

session = get_active_session()

# Specify training table location
table_name = "TRAINING_TABLE"

# Load table into DataConnector
data_connector = DataConnector.from_dataframe(session.table(table_name))

# Convert to pandas dataframe
pandas_df = data_connector.to_pandas()

# Prepare features and labels
label_column_name = 'TARGET'
X, y = pandas_df.drop(label_column_name, axis=1), pandas_df[label_column_name]

# Train classifier
clf = xgb.Classifier()
clf.fit(X, y)
Copy

Use with Snowflake’s distributed training APIs

For best performance, you can pass a DataConnector directly to Snowflake’s optimized distributed training APIs instead of converting to pandas, PyTorch, or TensorFlow datasets first.

The following example trains an XGBoost model using Snowflake’s distributed XGBoost estimator:

from snowflake.ml.data.data_connector import DataConnector
from snowflake.ml.modeling.distributors.xgboost.xgboost_estimator import (
    XGBEstimator,
    XGBScalingConfig,
)
from snowflake.snowpark.context import get_active_session

session = get_active_session()

# Create DataConnector from a Snowpark dataframe
snowflake_df = session.table("TRAINING_TABLE")
data_connector = DataConnector.from_dataframe(snowflake_df)

# Create Snowflake XGBoost estimator
snowflake_est = XGBEstimator(
    n_estimators=1,
    objective="reg:squarederror",
    scaling_config=XGBScalingConfig(use_gpu=False),
)

# Train using the data connector
# When using a data connector, input_cols and label_col must be provided
fit_booster = snowflake_est.fit(
    data_connector,
    input_cols=NUMERICAL_COLS,
    label_col=LABEL_COL
)
Copy

Use sharding with PyTorch distributor

You can use the ShardedDataConnector to shard your data across multiple nodes for distributed training with the Snowflake PyTorch distributor.

The following example trains a PyTorch model on the digits dataset using sharded data across multiple processes:

from sklearn import datasets
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector
from snowflake.ml.modeling.pytorch import (
    PyTorchTrainer,
    ScalingConfig,
    WorkerResourceConfig,
    getContext,
)
from torch import nn
from snowflake.snowpark.context import get_active_session

session = get_active_session()

# Create the Snowflake data from a Snowpark dataframe
digits = datasets.load_digits(as_frame=True).frame
digits_df = session.create_dataframe(digits)

# Create sharded data connector
sharded_data_connector = ShardedDataConnector.from_dataframe(digits_df)

# Define the PyTorch model
class DigitsModel(nn.Module):
    def __init__(self):
        super(DigitsModel, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(8 * 8, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

# Define training function that runs across multiple nodes or devices
# Each process receives a unique data shard
def train_func():
    import os
    import torch
    import torch.distributed as dist
    from torch.utils.data import DataLoader
    from torch import nn
    from torch.nn.parallel import DistributedDataParallel as DDP

    # Get context with data shards and model directory
    context = getContext()
    dataset_map = context.get_dataset_map()
    model_dir = context.get_model_dir()
    training_data = dataset_map["train"].get_shard().to_torch_dataset()
    train_dataloader = DataLoader(training_data, batch_size=batch_size, drop_last=True)

    dist.init_process_group()
    device = "cpu"
    label_col = '"target"'
    batch_size = 64

    model = DDP(DigitsModel())
    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

    # Training loop
    for epoch in range(5):
        for batch, batch_data in enumerate(train_dataloader):
            y = batch_data.pop(label_col).flatten().type(torch.LongTensor).to(device)
            X = torch.concat(
                [tensor.to(torch.float32) for tensor in batch_data.values()],
                dim=-1,
            ).to(device)
            pred = model(X)
            loss = loss_fn(pred, y)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if batch % 100 == 0:
                print(f"Epoch {epoch}, Batch {batch}, Loss: {loss.item()}")

    # Save the model
    if dist.get_rank() == 0:
        torch.save(model.state_dict(), os.path.join(model_dir, "digits_model.pth"))

# Create PyTorch trainer with scaling configuration
pytorch_trainer = PyTorchTrainer(
    train_func=train_func,
    scaling_config=ScalingConfig(
        num_nodes=1,
        num_workers_per_node=4,
        resource_requirements_per_worker=WorkerResourceConfig(num_cpus=1, num_gpus=0),
    ),
)

# Run distributed training
response = pytorch_trainer.run(
    dataset_map=dict(
        train=sharded_data_connector,
    )
)
Copy

Load unstructured data from Snowflake stages

Use the Snowflake DataSource APIs to read unstructured data from Snowflake stages. Each file format has a corresponding datasource class that defines how to read the data.

The following shows the file formats and corresponding APIs that you use to load the data:

  • Binary files: SFStageBinaryFileDataSource

  • Text files: SFStageTextDataSource

  • CSV files: SFStageCSVDataSource

  • Parquet files: SFStageParquetDataSource

  • Image files: SFStageImageDataSource

Load and process data

When you create a Snowflake Datasource, you must provide the following:

  • The name of the stage from which you’re reading the data

  • The database that has the stage (defaults to current session)

  • The schema that has the stage (defaults to current session)

  • The pattern to the filter files being read from the datasource (optional)

The Data API or the Data Connector retrieves all files within the provided path that matches the file pattern.

After you define the Snowflake Datasource, you can load data into a Ray dataset. With the Ray dataset, you can do the following:

  • Use the dataset with Ray APIs

  • Pass the dataset to DataConnector

  • Convert to pandas or PyTorch datasets if needed.

The following example does the following:

  • Reads Parquet files from a Snowflake stage into a Ray dataset

  • Converts the dataset to a DataConnector

import ray
from snowflake.ml.ray.datasource.stage_parquet_file_datasource import SFStageParquetDataSource
from snowflake.ml.data.data_connector import DataConnector

data_source = SFStageParquetDataSource(
    stage_location="@stage/path/",
    database="DB_NAME", # optional
    schema="SCHEMA_NAME", # optional
    file_pattern='*.parquet', # optional
)

# Build Ray dataset from provided datasources
ray_ds = ray.data.read_datasource(data_source)

dc = DataConnector.from_ray_dataset(ray_ds)
Copy

Write structured data back to Snowflake tables

Use the Snowflake DataSink API to write structured data from your Notebook or ML Job back to a Snowflake table. You can write transformed or prediction datasets to Snowflake for further analysis or storage.

To define a data sink, provide the following:

  • Stage name

  • Database name (defaults to current session)

  • Schema name (defaults to current session)

  • File pattern to match specific files (optional)

The following example defines a data sink:

from snowflake.ml.ray.datasink import SnowflakeTableDatasink
datasink = SnowflakeTableDatasink(
    table_name="table_name",
    database="db_name",
    schema="schema_name",
    auto_create_table=True, # create table if not exists
    override=True # replace vs insert to table
)
Copy

After you define a data sink, you can use the following code to write the Ray dataset to a Snowflake table.

import ray

# Get Ray dataset from sources
ray_ds = ray.data.read_datasource(data_source)

# Setup transform operations, not executed yet
transformed_ds = ray_ds.map_batches(example_transform_batch_function)

# Start writing to Snowflake distributedly
transformed_ds.write_datasink(datasink)
Copy

Best Practices and Considerations

For optimal performance and resource utilization, consider the following best practices:

Parallelism: Design your data source implementations to leverage Ray’s distributed nature. Customize the parallelism and concurrency arguments to better suit your use case. You can manually define how many resources you’re allocating per task in each step.

Partitioning: By default, Ray’s internal logic will partition the dataset based on resources and data size. You can customize number of partitions to choose between large number of small tasks vs small number of big tasks based on use case with ray_ds.repartition(X).

Best practices: Follow Ray Data User Guide for additional guidance.

Ray API details:

Next steps

After loading your data, you can: