Laden und Schreiben von Daten

Verwenden Sie Snowflake ML, um Daten effizient aus Snowflake-Tabellen und -Stagingbereichen in Ihre Workflows für maschinelles Lernen zu laden. Snowflake ML bietet optimierte Funktionen zum Laden von Daten, die die verteilte Verarbeitung von Snowflake nutzen, um die Datenerfassung für Ihre Trainings- und Inferenz-Workflows zu beschleunigen.

Sie können Daten laden und verarbeiten mit:

  • Snowflake Notebooks: Interaktive Entwicklungsumgebung für die Erkundung von Daten und die Erstellung von ML-Modellen. Weitere Informationen dazu finden Sie unter Notebooks auf Container Runtime für ML.

  • Snowflake ML-Jobs: Führen Sie Ihre ML-Workloads asynchron in einer Entwicklungsumgebung aus. Weitere Informationen dazu finden Sie unter Snowflake ML Jobs.

Sowohl Notebooks als auch ML-Jobs werden in der Container Runtime für ML ausgeführt, die vorkonfigurierte Umgebungen bietet, die für Workloads für maschinellen Lernens optimiert sind, mit verteilten Verarbeitungsmöglichkeiten. Die Container Runtime verwendet Ray, ein Open-Source-Framework für verteiltes Computing, um Daten effizient über mehrere Serverknoten hinweg zu verarbeiten. Weitere Informationen zur Container Runtime für ML finden Sie unter Container Runtime für ML.

Snowflake ML bietet verschiedene APIs zum Laden strukturierter und unstrukturierter Daten:

Strukturierte Daten (Tabellen und Datensets)

Unstrukturierte Daten (Dateien in Stagingbereichen)

Die folgende Tabelle kann Ihnen bei der Auswahl der richtigen API für Ihren Anwendungsfall helfen:

Datenquellen und APIs

Datentyp

Datenquelle

API zum Laden

API zum Schreiben

Strukturiert

Snowflake-Tabellen

DataConnector

DataSink

Strukturiert

Snowflake Datasets

DataConnector

DataSink

Unstrukturiert

CSV-Dateien (Stagingbereich)

DataSource-API

N/A

Unstrukturiert

Parquet-Dateien (Stagingbereich)

DataSource-API

N/A

Unstrukturiert

Andere Stagingdateien

DataSource-API

N/A

Laden von strukturierten Daten aus Snowflake-Tabellen

Verwenden Sie den Snowflake DataConnector zum Laden strukturierter Daten aus Snowflake-Tabellen und Snowflake-Datensets in ein Snowflake Notebook oder einen Snowflake ML-Job. Der DataConnector beschleunigt das Laden von Daten, indem die Lesevorgänge über mehrere Serverknoten hinweg parallelisiert werden.

Der DataConnector kann entweder mit Snowpark DataFrames oder Snowflake-Datensets verwendet werden:

  • Snowpark DataFrames: Bieten Sie direkten Zugriff auf die Daten in Ihren Snowflake-Tabellen. Werden am besten während der Entwicklung verwendet.

  • Snowflake-Datensets: Versionierte Objekte auf Schemaebene. Werden am besten für Produktions-Workflows verwendet. Weitere Informationen dazu finden Sie unter Snowflake Datasets.

Nach der Parallelisierung der Leseoperationen kann der DataConnector die Daten in eine der folgenden Datenstrukturen umwandeln:

  • pandas-Datenframe

  • PyTorch-Datenset

  • TensorFlow-Datenset

Erstellen Sie eine DataConnector, wenn:

Sie können einen DataConnector aus einem Snowpark DataFrame oder einem Snowflake-Datenset erstellen.

Verwenden Sie den folgenden Code, um einen DataConnector aus einem Snowpark DataFrame zu erstellen:

from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session

session = get_active_session()

# Create DataConnector from a Snowflake table
data_connector = DataConnector.from_dataframe(session.table("example-table-name"))
Copy

Verwenden Sie den folgenden Code, um einen DataConnector aus einem Snowflake-Datenset zu erstellen:

from snowflake.ml.data.data_connector import DataConnector

# Create DataConnector from a Snowflake Dataset
data_connector = DataConnector.from_dataset(snowflake_dataset)
Copy

Konvertieren von DataConnector in andere Formate

Nach dem Erstellen eines DataConnector können Sie ihn in verschiedene Datenstrukturen zur Verwendung mit verschiedenen ML-Frameworks konvertieren.

Sie können einen DataConnector zu einem pandas-Datenframe zur Verwendung mit scikit-learn und anderen pandas-kompatiblen Bibliotheken konvertieren.

Im folgenden Beispiel werden Daten aus einer Snowflake-Tabelle in einen pandas-Datenframe geladen und es wird ein ein XGBoost-Klassifizierer trainiert:

from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session
import xgboost as xgb

session = get_active_session()

# Specify training table location
table_name = "TRAINING_TABLE"

# Load table into DataConnector
data_connector = DataConnector.from_dataframe(session.table(table_name))

# Convert to pandas dataframe
pandas_df = data_connector.to_pandas()

# Prepare features and labels
label_column_name = 'TARGET'
X, y = pandas_df.drop(label_column_name, axis=1), pandas_df[label_column_name]

# Train classifier
clf = xgb.Classifier()
clf.fit(X, y)
Copy

Verwenden mit verteilten Trainings-APIs von Snowflake

Für die beste Leistung können Sie einen DataConnector direkt an optimierte verteilte Training-APIs von Snowflake übergeben, anstatt diesen zuerst in Pandas-, PyTorch- oder TensorFlow-Datensets zu konvertieren.

Im folgenden Beispiel wird ein XGBoost-Modell unter Verwendung des verteilten XGBoost-Estimators von Snowflake trainiert:

from snowflake.ml.data.data_connector import DataConnector
from snowflake.ml.modeling.distributors.xgboost.xgboost_estimator import (
    XGBEstimator,
    XGBScalingConfig,
)
from snowflake.snowpark.context import get_active_session

session = get_active_session()

# Create DataConnector from a Snowpark dataframe
snowflake_df = session.table("TRAINING_TABLE")
data_connector = DataConnector.from_dataframe(snowflake_df)

# Create Snowflake XGBoost estimator
snowflake_est = XGBEstimator(
    n_estimators=1,
    objective="reg:squarederror",
    scaling_config=XGBScalingConfig(use_gpu=False),
)

# Train using the data connector
# When using a data connector, input_cols and label_col must be provided
fit_booster = snowflake_est.fit(
    data_connector,
    input_cols=NUMERICAL_COLS,
    label_col=LABEL_COL
)
Copy

Verwenden von Sharding mit dem PyTorch Distributor

Sie können den ShardedDataConnector verwenden, um Ihre Daten für verteiltes Training mit dem Snowflake PyTorch Distributor auf mehrere Knoten zu verteilen.

Im folgenden Beispiel wird ein PyTorch-Modell für das Ziffern-Datenset unter Verwendung von Daten trainiert, die auf mehrere Prozesse verteilt sind:

from sklearn import datasets
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector
from snowflake.ml.modeling.pytorch import (
    PyTorchTrainer,
    ScalingConfig,
    WorkerResourceConfig,
    getContext,
)
from torch import nn
from snowflake.snowpark.context import get_active_session

session = get_active_session()

# Create the Snowflake data from a Snowpark dataframe
digits = datasets.load_digits(as_frame=True).frame
digits_df = session.create_dataframe(digits)

# Create sharded data connector
sharded_data_connector = ShardedDataConnector.from_dataframe(digits_df)

# Define the PyTorch model
class DigitsModel(nn.Module):
    def __init__(self):
        super(DigitsModel, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(8 * 8, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

# Define training function that runs across multiple nodes or devices
# Each process receives a unique data shard
def train_func():
    import os
    import torch
    import torch.distributed as dist
    from torch.utils.data import DataLoader
    from torch import nn
    from torch.nn.parallel import DistributedDataParallel as DDP

    # Get context with data shards and model directory
    context = getContext()
    dataset_map = context.get_dataset_map()
    model_dir = context.get_model_dir()
    training_data = dataset_map["train"].get_shard().to_torch_dataset()
    train_dataloader = DataLoader(training_data, batch_size=batch_size, drop_last=True)

    dist.init_process_group()
    device = "cpu"
    label_col = '"target"'
    batch_size = 64

    model = DDP(DigitsModel())
    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

    # Training loop
    for epoch in range(5):
        for batch, batch_data in enumerate(train_dataloader):
            y = batch_data.pop(label_col).flatten().type(torch.LongTensor).to(device)
            X = torch.concat(
                [tensor.to(torch.float32) for tensor in batch_data.values()],
                dim=-1,
            ).to(device)
            pred = model(X)
            loss = loss_fn(pred, y)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if batch % 100 == 0:
                print(f"Epoch {epoch}, Batch {batch}, Loss: {loss.item()}")

    # Save the model
    if dist.get_rank() == 0:
        torch.save(model.state_dict(), os.path.join(model_dir, "digits_model.pth"))

# Create PyTorch trainer with scaling configuration
pytorch_trainer = PyTorchTrainer(
    train_func=train_func,
    scaling_config=ScalingConfig(
        num_nodes=1,
        num_workers_per_node=4,
        resource_requirements_per_worker=WorkerResourceConfig(num_cpus=1, num_gpus=0),
    ),
)

# Run distributed training
response = pytorch_trainer.run(
    dataset_map=dict(
        train=sharded_data_connector,
    )
)
Copy

Laden unstrukturierter Daten aus Snowflake-Stagingbereichen

Verwenden Sie die Snowflake DataSource APIs zum Lesen unstrukturierter Daten aus Snowflake-Stagingbereichen. Jedes Dateiformat hat eine entsprechende Datenquellenklasse, die definiert, wie die Daten gelesen werden sollen.

Im Folgenden werden die Dateiformate und die zugehörigen APIs angezeigt, die Sie zum Laden der Daten verwenden:

  • Binärdateien: SFStageBinaryFileDataSource

  • Textdateien: SFStageTextDataSource

  • CSV-Dateien: SFStageCSVDataSource

  • Parquet-Dateien: SFStageParquetDataSource

  • Imagedateien: SFStageImageDataSource

Laden und Verarbeiten von Daten

Wenn Sie eine Snowflake-Datenquelle erstellen, müssen Sie Folgendes bereitstellen:

  • Der Name des Stagingbereichs, aus dem Sie die Daten lesen

  • Die Datenbank, die den Stagingbereich enthält (standardmäßig die aktuelle Sitzung)

  • Das Schema, das den Stagingbereich enthält (standardmäßig die aktuelle Sitzung)

  • Das Muster für die Filterdateien, die aus der Datenquelle gelesen werden (optional)

Die Daten-API oder der Datenkonnektor ruft alle Dateien innerhalb des angegebenen Pfads ab, die mit dem Dateimuster übereinstimmen.

Nachdem Sie die Snowflake-Datenquelle definiert haben, können Sie Daten in ein Ray-Datenset laden. Mit dem Ray-Datenset können Sie Folgendes tun:

  • Das Datenset mit Ray-APIs verwenden

  • Das Datenset an DataConnector übergeben

  • Ggf. in pandas- oder PyTorch-Datensets konvertieren.

Im folgenden Beispiel wird Folgendes ausgeführt:

  • Liest Parquet-Dateien aus einem Snowflake-Stagingbereich in ein Ray-Datenset

  • Konvertiert das Datenset in einen DataConnector

import ray
from snowflake.ml.ray.datasource.stage_parquet_file_datasource import SFStageParquetDataSource
from snowflake.ml.data.data_connector import DataConnector

data_source = SFStageParquetDataSource(
    stage_location="@stage/path/",
    database="DB_NAME", # optional
    schema="SCHEMA_NAME", # optional
    file_pattern='*.parquet', # optional
)

# Build Ray dataset from provided datasources
ray_ds = ray.data.read_datasource(data_source)

dc = DataConnector.from_ray_dataset(ray_ds)
Copy

Schreiben von strukturierten Daten zurück in Snowflake-Tabellen

Verwenden Sie die Snowflake DataSink API, um strukturierte Daten von Ihrem Notebook oder ML-Job zurück in eine Snowflake-Tabelle zu schreiben. Sie können transformierte oder Vorhersage-Datensets zur weiteren Analyse oder Speicherung in Snowflake schreiben.

Um eine Datensenke zu definieren, geben Sie Folgendes ein:

  • Name des Stagingbereichs

  • Name der Datenbank (standardmäßig aktuelle Sitzung)

  • Schemaname (standardmäßig aktuelle Sitzung)

  • Dateimuster zur Übereinstimmung mit bestimmten Dateien (optional)

Das folgende Beispiel definiert eine Datensenke:

from snowflake.ml.ray.datasink import SnowflakeTableDatasink
datasink = SnowflakeTableDatasink(
    table_name="table_name",
    database="db_name",
    schema="schema_name",
    auto_create_table=True, # create table if not exists
    override=True # replace vs insert to table
)
Copy

Nachdem Sie eine Datensenke definiert haben, können Sie den folgenden Code verwenden, um das Ray-Datenset in eine Snowflake-Tabelle zu schreiben.

import ray

# Get Ray dataset from sources
ray_ds = ray.data.read_datasource(data_source)

# Setup transform operations, not executed yet
transformed_ds = ray_ds.map_batches(example_transform_batch_function)

# Start writing to Snowflake distributedly
transformed_ds.write_datasink(datasink)
Copy

Best Practices und Hinweise

Um eine optimale Leistung und Auslastung der Ressourcen zu erzielen, sollten Sie die folgenden Best Practices berücksichtigen:

Parallelität: Entwerfen Sie Ihre Datenquellenimplementierungen, um die verteilte Struktur von Ray zu nutzen. Passen Sie die Argumente für Parallelität so an, dass sie besser für Ihren Anwendungsfall geeignet sind. Sie können in jedem Schritt manuell festlegen, wie viele Ressourcen Sie pro Aufgabe zuweisen.

Partitionierung: Standardmäßig partitioniert die interne Logik von Ray das Datenset auf der Grundlage von Ressourcen und Datenumfang. Sie können mit ray_ds.repartition(X) die Anzahl der Partitionen anpassen, um zwischen einer großen Anzahl kleiner Aufgaben und einer kleinen Anzahl großer Aufgaben zu wählen, je nach dem Anwendungsfall.

Best Practices: Befolgen Sie Benutzerhandbuch für Ray Data für zusätzliche Hinweise.

Ray API-Details:

Nächste Schritte

Nach dem Laden Ihrer Daten können Sie Folgendes tun: