Laden und Schreiben von Daten¶
Verwenden Sie Snowflake ML, um Daten effizient aus Snowflake-Tabellen und -Stagingbereichen in Ihre Workflows für maschinelles Lernen zu laden. Snowflake ML bietet optimierte Funktionen zum Laden von Daten, die die verteilte Verarbeitung von Snowflake nutzen, um die Datenerfassung für Ihre Trainings- und Inferenz-Workflows zu beschleunigen.
Sie können Daten laden und verarbeiten mit:
Snowflake Notebooks: Interaktive Entwicklungsumgebung für die Erkundung von Daten und die Erstellung von ML-Modellen. Weitere Informationen dazu finden Sie unter Notebooks auf Container Runtime für ML.
Snowflake ML-Jobs: Führen Sie Ihre ML-Workloads asynchron in einer Entwicklungsumgebung aus. Weitere Informationen dazu finden Sie unter Snowflake ML Jobs.
Sowohl Notebooks als auch ML-Jobs werden in der Container Runtime für ML ausgeführt, die vorkonfigurierte Umgebungen bietet, die für Workloads für maschinellen Lernens optimiert sind, mit verteilten Verarbeitungsmöglichkeiten. Die Container Runtime verwendet Ray, ein Open-Source-Framework für verteiltes Computing, um Daten effizient über mehrere Serverknoten hinweg zu verarbeiten. Weitere Informationen zur Container Runtime für ML finden Sie unter Container Runtime für ML.
Snowflake ML bietet verschiedene APIs zum Laden strukturierter und unstrukturierter Daten:
Strukturierte Daten (Tabellen und Datensets)
DataConnector: Laden von Daten aus Snowflake-Tabellen und Snowflake-Datensets. Weitere Informationen dazu finden Sie unter Laden von strukturierten Daten aus Snowflake-Tabellen.
DataSink: Zurückschreiben von Daten in Snowflake-Tabellen. Weitere Informationen dazu finden Sie unter Schreiben von strukturierten Daten zurück in Snowflake-Tabellen.
Unstrukturierte Daten (Dateien in Stagingbereichen)
DataSource APIs: Laden von Daten aus verschiedenen Dateiformaten (CSV, Parquet, Images und mehr) aus Snowflake-Stagingbereichen. Weitere Informationen dazu finden Sie unter Laden unstrukturierter Daten aus Snowflake-Stagingbereichen.
Die folgende Tabelle kann Ihnen bei der Auswahl der richtigen API für Ihren Anwendungsfall helfen:
Datentyp |
Datenquelle |
API zum Laden |
API zum Schreiben |
|---|---|---|---|
Strukturiert |
Snowflake-Tabellen |
DataConnector |
DataSink |
Strukturiert |
Snowflake Datasets |
DataConnector |
DataSink |
Unstrukturiert |
CSV-Dateien (Stagingbereich) |
DataSource-API |
N/A |
Unstrukturiert |
Parquet-Dateien (Stagingbereich) |
DataSource-API |
N/A |
Unstrukturiert |
Andere Stagingdateien |
DataSource-API |
N/A |
Laden von strukturierten Daten aus Snowflake-Tabellen¶
Verwenden Sie den Snowflake DataConnector zum Laden strukturierter Daten aus Snowflake-Tabellen und Snowflake-Datensets in ein Snowflake Notebook oder einen Snowflake ML-Job. Der DataConnector beschleunigt das Laden von Daten, indem die Lesevorgänge über mehrere Serverknoten hinweg parallelisiert werden.
Der DataConnector kann entweder mit Snowpark DataFrames oder Snowflake-Datensets verwendet werden:
Snowpark DataFrames: Bieten Sie direkten Zugriff auf die Daten in Ihren Snowflake-Tabellen. Werden am besten während der Entwicklung verwendet.
Snowflake-Datensets: Versionierte Objekte auf Schemaebene. Werden am besten für Produktions-Workflows verwendet. Weitere Informationen dazu finden Sie unter Snowflake Datasets.
Nach der Parallelisierung der Leseoperationen kann der DataConnector die Daten in eine der folgenden Datenstrukturen umwandeln:
pandas-Datenframe
PyTorch-Datenset
TensorFlow-Datenset
Erstellen Sie eine DataConnector, wenn:¶
Sie können einen DataConnector aus einem Snowpark DataFrame oder einem Snowflake-Datenset erstellen.
Verwenden Sie den folgenden Code, um einen DataConnector aus einem Snowpark DataFrame zu erstellen:
from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session
session = get_active_session()
# Create DataConnector from a Snowflake table
data_connector = DataConnector.from_dataframe(session.table("example-table-name"))
Verwenden Sie den folgenden Code, um einen DataConnector aus einem Snowflake-Datenset zu erstellen:
from snowflake.ml.data.data_connector import DataConnector
# Create DataConnector from a Snowflake Dataset
data_connector = DataConnector.from_dataset(snowflake_dataset)
Konvertieren von DataConnector in andere Formate¶
Nach dem Erstellen eines DataConnector können Sie ihn in verschiedene Datenstrukturen zur Verwendung mit verschiedenen ML-Frameworks konvertieren.
Sie können einen DataConnector zu einem pandas-Datenframe zur Verwendung mit scikit-learn und anderen pandas-kompatiblen Bibliotheken konvertieren.
Im folgenden Beispiel werden Daten aus einer Snowflake-Tabelle in einen pandas-Datenframe geladen und es wird ein ein XGBoost-Klassifizierer trainiert:
from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session
import xgboost as xgb
session = get_active_session()
# Specify training table location
table_name = "TRAINING_TABLE"
# Load table into DataConnector
data_connector = DataConnector.from_dataframe(session.table(table_name))
# Convert to pandas dataframe
pandas_df = data_connector.to_pandas()
# Prepare features and labels
label_column_name = 'TARGET'
X, y = pandas_df.drop(label_column_name, axis=1), pandas_df[label_column_name]
# Train classifier
clf = xgb.Classifier()
clf.fit(X, y)
Sie können einen DataConnector in ein PyTorch-Datenset zur Verwendung mit PyTorch-Modellen und Datenladeprogrammen konvertieren.
Im folgenden Beispiel werden Daten aus einer Snowflake-Tabelle in ein PyTorch-Datenset geladen:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from snowflake.ml.data.data_connector import DataConnector
# Create DataConnector (see previous examples)
# data_connector = DataConnector.from_dataframe(...)
# Convert to PyTorch dataset
torch_dataset = data_connector.to_torch_dataset(batch_size=32)
dataloader = DataLoader(torch_dataset, batch_size=None)
label_col = 'TARGET'
feature_cols = ['FEATURE1', 'FEATURE2']
for batch_idx, batch in enumerate(dataloader):
y = batch_data.pop(label_col).squeeze()
X = torch.stack(
[tensor.squeeze() for key, tensor in batch.items() if key in feature_cols]
)
Sie können einen DataConnector in ein TensorFlow-Datenset zur Verwendung mit TensorFlow-Modellen konvertieren. Das Laden von Daten erfolgt in einem Streaming-Verfahren für maximale Effizienz.
Im folgenden Beispiel wird ein DataConnector in ein TensorFlow-Datenset konvertiert:
from snowflake.ml.data.data_connector import DataConnector
# Create DataConnector (see previous examples)
# data_connector = DataConnector.from_dataframe(...)
# Convert to TensorFlow dataset
tf_ds = data_connector.to_tf_dataset(
batch_size=4,
shuffle=True,
drop_last_batch=True
)
for batch in tf_ds:
print(batch)
Verwenden mit verteilten Trainings-APIs von Snowflake¶
Für die beste Leistung können Sie einen DataConnector direkt an optimierte verteilte Training-APIs von Snowflake übergeben, anstatt diesen zuerst in Pandas-, PyTorch- oder TensorFlow-Datensets zu konvertieren.
Im folgenden Beispiel wird ein XGBoost-Modell unter Verwendung des verteilten XGBoost-Estimators von Snowflake trainiert:
from snowflake.ml.data.data_connector import DataConnector
from snowflake.ml.modeling.distributors.xgboost.xgboost_estimator import (
XGBEstimator,
XGBScalingConfig,
)
from snowflake.snowpark.context import get_active_session
session = get_active_session()
# Create DataConnector from a Snowpark dataframe
snowflake_df = session.table("TRAINING_TABLE")
data_connector = DataConnector.from_dataframe(snowflake_df)
# Create Snowflake XGBoost estimator
snowflake_est = XGBEstimator(
n_estimators=1,
objective="reg:squarederror",
scaling_config=XGBScalingConfig(use_gpu=False),
)
# Train using the data connector
# When using a data connector, input_cols and label_col must be provided
fit_booster = snowflake_est.fit(
data_connector,
input_cols=NUMERICAL_COLS,
label_col=LABEL_COL
)
Verwenden von Sharding mit dem PyTorch Distributor¶
Sie können den ShardedDataConnector verwenden, um Ihre Daten für verteiltes Training mit dem Snowflake PyTorch Distributor auf mehrere Knoten zu verteilen.
Im folgenden Beispiel wird ein PyTorch-Modell für das Ziffern-Datenset unter Verwendung von Daten trainiert, die auf mehrere Prozesse verteilt sind:
from sklearn import datasets
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector
from snowflake.ml.modeling.pytorch import (
PyTorchTrainer,
ScalingConfig,
WorkerResourceConfig,
getContext,
)
from torch import nn
from snowflake.snowpark.context import get_active_session
session = get_active_session()
# Create the Snowflake data from a Snowpark dataframe
digits = datasets.load_digits(as_frame=True).frame
digits_df = session.create_dataframe(digits)
# Create sharded data connector
sharded_data_connector = ShardedDataConnector.from_dataframe(digits_df)
# Define the PyTorch model
class DigitsModel(nn.Module):
def __init__(self):
super(DigitsModel, self).__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(8 * 8, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 10)
)
def forward(self, x):
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits
# Define training function that runs across multiple nodes or devices
# Each process receives a unique data shard
def train_func():
import os
import torch
import torch.distributed as dist
from torch.utils.data import DataLoader
from torch import nn
from torch.nn.parallel import DistributedDataParallel as DDP
# Get context with data shards and model directory
context = getContext()
dataset_map = context.get_dataset_map()
model_dir = context.get_model_dir()
training_data = dataset_map["train"].get_shard().to_torch_dataset()
train_dataloader = DataLoader(training_data, batch_size=batch_size, drop_last=True)
dist.init_process_group()
device = "cpu"
label_col = '"target"'
batch_size = 64
model = DDP(DigitsModel())
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
# Training loop
for epoch in range(5):
for batch, batch_data in enumerate(train_dataloader):
y = batch_data.pop(label_col).flatten().type(torch.LongTensor).to(device)
X = torch.concat(
[tensor.to(torch.float32) for tensor in batch_data.values()],
dim=-1,
).to(device)
pred = model(X)
loss = loss_fn(pred, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if batch % 100 == 0:
print(f"Epoch {epoch}, Batch {batch}, Loss: {loss.item()}")
# Save the model
if dist.get_rank() == 0:
torch.save(model.state_dict(), os.path.join(model_dir, "digits_model.pth"))
# Create PyTorch trainer with scaling configuration
pytorch_trainer = PyTorchTrainer(
train_func=train_func,
scaling_config=ScalingConfig(
num_nodes=1,
num_workers_per_node=4,
resource_requirements_per_worker=WorkerResourceConfig(num_cpus=1, num_gpus=0),
),
)
# Run distributed training
response = pytorch_trainer.run(
dataset_map=dict(
train=sharded_data_connector,
)
)
Laden unstrukturierter Daten aus Snowflake-Stagingbereichen¶
Verwenden Sie die Snowflake DataSource APIs zum Lesen unstrukturierter Daten aus Snowflake-Stagingbereichen. Jedes Dateiformat hat eine entsprechende Datenquellenklasse, die definiert, wie die Daten gelesen werden sollen.
Im Folgenden werden die Dateiformate und die zugehörigen APIs angezeigt, die Sie zum Laden der Daten verwenden:
Binärdateien:
SFStageBinaryFileDataSourceTextdateien:
SFStageTextDataSourceCSV-Dateien:
SFStageCSVDataSourceParquet-Dateien:
SFStageParquetDataSourceImagedateien:
SFStageImageDataSource
Laden und Verarbeiten von Daten¶
Wenn Sie eine Snowflake-Datenquelle erstellen, müssen Sie Folgendes bereitstellen:
Der Name des Stagingbereichs, aus dem Sie die Daten lesen
Die Datenbank, die den Stagingbereich enthält (standardmäßig die aktuelle Sitzung)
Das Schema, das den Stagingbereich enthält (standardmäßig die aktuelle Sitzung)
Das Muster für die Filterdateien, die aus der Datenquelle gelesen werden (optional)
Die Daten-API oder der Datenkonnektor ruft alle Dateien innerhalb des angegebenen Pfads ab, die mit dem Dateimuster übereinstimmen.
Nachdem Sie die Snowflake-Datenquelle definiert haben, können Sie Daten in ein Ray-Datenset laden. Mit dem Ray-Datenset können Sie Folgendes tun:
Das Datenset mit Ray-APIs verwenden
Das Datenset an DataConnector übergeben
Ggf. in pandas- oder PyTorch-Datensets konvertieren.
Im folgenden Beispiel wird Folgendes ausgeführt:
Liest Parquet-Dateien aus einem Snowflake-Stagingbereich in ein Ray-Datenset
Konvertiert das Datenset in einen DataConnector
import ray
from snowflake.ml.ray.datasource.stage_parquet_file_datasource import SFStageParquetDataSource
from snowflake.ml.data.data_connector import DataConnector
data_source = SFStageParquetDataSource(
stage_location="@stage/path/",
database="DB_NAME", # optional
schema="SCHEMA_NAME", # optional
file_pattern='*.parquet', # optional
)
# Build Ray dataset from provided datasources
ray_ds = ray.data.read_datasource(data_source)
dc = DataConnector.from_ray_dataset(ray_ds)
Schreiben von strukturierten Daten zurück in Snowflake-Tabellen¶
Verwenden Sie die Snowflake DataSink API, um strukturierte Daten von Ihrem Notebook oder ML-Job zurück in eine Snowflake-Tabelle zu schreiben. Sie können transformierte oder Vorhersage-Datensets zur weiteren Analyse oder Speicherung in Snowflake schreiben.
Um eine Datensenke zu definieren, geben Sie Folgendes ein:
Name des Stagingbereichs
Name der Datenbank (standardmäßig aktuelle Sitzung)
Schemaname (standardmäßig aktuelle Sitzung)
Dateimuster zur Übereinstimmung mit bestimmten Dateien (optional)
Das folgende Beispiel definiert eine Datensenke:
from snowflake.ml.ray.datasink import SnowflakeTableDatasink
datasink = SnowflakeTableDatasink(
table_name="table_name",
database="db_name",
schema="schema_name",
auto_create_table=True, # create table if not exists
override=True # replace vs insert to table
)
Nachdem Sie eine Datensenke definiert haben, können Sie den folgenden Code verwenden, um das Ray-Datenset in eine Snowflake-Tabelle zu schreiben.
import ray
# Get Ray dataset from sources
ray_ds = ray.data.read_datasource(data_source)
# Setup transform operations, not executed yet
transformed_ds = ray_ds.map_batches(example_transform_batch_function)
# Start writing to Snowflake distributedly
transformed_ds.write_datasink(datasink)
Best Practices und Hinweise¶
Um eine optimale Leistung und Auslastung der Ressourcen zu erzielen, sollten Sie die folgenden Best Practices berücksichtigen:
Parallelität: Entwerfen Sie Ihre Datenquellenimplementierungen, um die verteilte Struktur von Ray zu nutzen. Passen Sie die Argumente für Parallelität so an, dass sie besser für Ihren Anwendungsfall geeignet sind. Sie können in jedem Schritt manuell festlegen, wie viele Ressourcen Sie pro Aufgabe zuweisen.
Partitionierung: Standardmäßig partitioniert die interne Logik von Ray das Datenset auf der Grundlage von Ressourcen und Datenumfang. Sie können mit ray_ds.repartition(X) die Anzahl der Partitionen anpassen, um zwischen einer großen Anzahl kleiner Aufgaben und einer kleinen Anzahl großer Aufgaben zu wählen, je nach dem Anwendungsfall.
Best Practices: Befolgen Sie Benutzerhandbuch für Ray Data für zusätzliche Hinweise.
Ray API-Details:
Nächste Schritte¶
Nach dem Laden Ihrer Daten können Sie Folgendes tun:
Den Feature Store für das Feature-Management verwenden