データのロードと書き込み¶
Snowflake ML を使用して、Snowflakeテーブルとステージから機械学習ワークフローに効率的にデータをロードします。Snowflake ML は、Snowflakeの分散処理を活用した最適化されたデータロード機能を提供し、トレーニングおよび推論ワークフローのデータ取り込みを高速化します。
次を使用してデータをロードし、処理することができます。
Snowflake Notebooks:データを探索し、ML モデルを構築するためのインタラクティブな開発環境。詳細については、 Notebooks on Container Runtime for ML をご参照ください。
Snowflake ML Jobs:どのような開発環境からでも、ML ワークロードを非同期で実行できます。詳細については、 Snowflake MLのジョブ をご参照ください。
ノートブックも ML ジョブも、Container Runtime for ML 上で実行され、分散処理機能を備えた機械学習ワークロード用に最適化された事前設定済みの環境を提供します。Container Runtimeは、分散コンピューティングのオープンソースフレームワークであるRayを使用して、複数のコンピュートノードにまたがるデータを効率的に処理します。Container Runtime forML の詳細については、ML のContainer Runtime を参照してください。
SnowflakeML は、構造化データと非構造化データをロードするための異なる APIs を提供します。
構造化データ(テーブルとデータセット)
DataConnector*:SnowflakeテーブルとSnowflakeデータセットからデータをロードします。詳細については、 Snowflakeテーブルから構造化データをロードする をご参照ください。
DataSink*:データをSnowflakeテーブルに書き戻します。詳細については、 構造化データをSnowflakeテーブルに書き戻します をご参照ください。
非構造化データ(ステージ内ファイル)
DataSource APIs:Snowflakeステージから様々なファイル形式(CSV 、Parquet、画像など)のデータをロードします。詳細については、 Snowflakeステージから非構造化データをロードする をご参照ください。
以下の表は、ユースケースに適したAPI を選択するのに役立ちます。
データ型 |
データソース |
ローディング用 API |
書き込み用 API |
|---|---|---|---|
構造化 |
Snowflakeテーブル |
DataConnector |
DataSink |
構造化 |
Snowflake Datasets |
DataConnector |
DataSink |
非構造化 |
CSV ファイル(ステージ) |
DataSource に API |
N/A |
非構造化 |
Parquetファイル(ステージ) |
DataSource に API |
N/A |
非構造化 |
その他のステージングされたファイル |
DataSource に API |
N/A |
Snowflakeテーブルから構造化データをロードする¶
Snowflake DataConnector を使用して、SnowflakeテーブルおよびSnowflakeデータセットからSnowflakeノートブックまたはSnowflake ML ジョブに構造化データをロードします。DataConnector は、複数のコンピュートノードにまたがって読み込みを並列化することで、データのロードを高速化します。
DataConnector は、Snowpark DataFrames またはSnowflakeデータセットのいずれかで動作します。
Snowpark DataFrames:Snowflakeテーブルのデータに直接アクセスできます。開発中に使用するのが最適です。
Snowflakeデータセット:バージョン管理されたスキーマレベルのオブジェクト。プロダクションワークフローに最適です。詳細については、 Snowflake Datasets をご参照ください。
読み込みを並列化した後、DataConnector は、データを以下のデータ構造のいずれかに変換することができます。
pandasデータフレーム
PyTorch データセット
TensorFlow データセット
DataConnector の作成¶
SnowparkDataFrame または Snowflakeデータセットから DataConnector を作成できます。
以下のコードを使用して、Snowpark DataFrame から DataConnector を作成します。
from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session
session = get_active_session()
# Create DataConnector from a Snowflake table
data_connector = DataConnector.from_dataframe(session.table("example-table-name"))
次のコードを使用して、Snowflakeデータセットから DataConnector を作成します。
from snowflake.ml.data.data_connector import DataConnector
# Create DataConnector from a Snowflake Dataset
data_connector = DataConnector.from_dataset(snowflake_dataset)
DataConnector を他の形式に変換する¶
DataConnector を作成した後、さまざまな ML フレームワークで使用できるように、さまざまなデータ構造に変換することができます。
scikit-learnや他のpandas互換ライブラリで使用するために、DataConnector を pandasデータフレームに変換することができます。
次の例では、Snowflakeテーブルからpandasデータフレームにデータを読み込み、XGBoost 分類器をトレーニングします。
from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session
import xgboost as xgb
session = get_active_session()
# Specify training table location
table_name = "TRAINING_TABLE"
# Load table into DataConnector
data_connector = DataConnector.from_dataframe(session.table(table_name))
# Convert to pandas dataframe
pandas_df = data_connector.to_pandas()
# Prepare features and labels
label_column_name = 'TARGET'
X, y = pandas_df.drop(label_column_name, axis=1), pandas_df[label_column_name]
# Train classifier
clf = xgb.Classifier()
clf.fit(X, y)
PyTorch モデルやデータローダーで使用するために、DataConnector をPyTorch データセットに変換することができます。
以下の例では、SnowflakeテーブルからPyTorch データセットにデータをロードします。
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from snowflake.ml.data.data_connector import DataConnector
# Create DataConnector (see previous examples)
# data_connector = DataConnector.from_dataframe(...)
# Convert to PyTorch dataset
torch_dataset = data_connector.to_torch_dataset(batch_size=32)
dataloader = DataLoader(torch_dataset, batch_size=None)
label_col = 'TARGET'
feature_cols = ['FEATURE1', 'FEATURE2']
for batch_idx, batch in enumerate(dataloader):
y = batch_data.pop(label_col).squeeze()
X = torch.stack(
[tensor.squeeze() for key, tensor in batch.items() if key in feature_cols]
)
TensorFlow モデルで使用するために、DataConnector をTensorFlow データセットに変換することができます。データの読み込みはストリーミング方式で行われ、最大限の効率を実現します。
次の例は、DataConnector をTensorFlow データセットに変換します。
from snowflake.ml.data.data_connector import DataConnector
# Create DataConnector (see previous examples)
# data_connector = DataConnector.from_dataframe(...)
# Convert to TensorFlow dataset
tf_ds = data_connector.to_tf_dataset(
batch_size=4,
shuffle=True,
drop_last_batch=True
)
for batch in tf_ds:
print(batch)
Snowflakeの分散トレーニング APIs の使用¶
最高のパフォーマンスを得るためには、DataConnector を、pandas、PyTorch 、TensorFlow のデータセットに最初に変換するのではなく、Snowflakeの最適化された分散トレーニング APIs に直接渡すことができます。
以下の例では、Snowflakeの分散XGBoost 推定量を使用して、XGBoost モデルをトレーニングしています。
from snowflake.ml.data.data_connector import DataConnector
from snowflake.ml.modeling.distributors.xgboost.xgboost_estimator import (
XGBEstimator,
XGBScalingConfig,
)
from snowflake.snowpark.context import get_active_session
session = get_active_session()
# Create DataConnector from a Snowpark dataframe
snowflake_df = session.table("TRAINING_TABLE")
data_connector = DataConnector.from_dataframe(snowflake_df)
# Create Snowflake XGBoost estimator
snowflake_est = XGBEstimator(
n_estimators=1,
objective="reg:squarederror",
scaling_config=XGBScalingConfig(use_gpu=False),
)
# Train using the data connector
# When using a data connector, input_cols and label_col must be provided
fit_booster = snowflake_est.fit(
data_connector,
input_cols=NUMERICAL_COLS,
label_col=LABEL_COL
)
PyTorch ディストリビューターでシャーディングを使用する¶
SnowflakePyTorch ディストリビューターを使用した分散トレーニングのために、ShardedDataConnector を使用してデータを複数のノードにシャードすることができます。
次の例では、複数のプロセスにわたるシャードデータを使用して、数字データセットで PyTorch モデルをトレーニングしています。
from sklearn import datasets
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector
from snowflake.ml.modeling.pytorch import (
PyTorchTrainer,
ScalingConfig,
WorkerResourceConfig,
getContext,
)
from torch import nn
from snowflake.snowpark.context import get_active_session
session = get_active_session()
# Create the Snowflake data from a Snowpark dataframe
digits = datasets.load_digits(as_frame=True).frame
digits_df = session.create_dataframe(digits)
# Create sharded data connector
sharded_data_connector = ShardedDataConnector.from_dataframe(digits_df)
# Define the PyTorch model
class DigitsModel(nn.Module):
def __init__(self):
super(DigitsModel, self).__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(8 * 8, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 10)
)
def forward(self, x):
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits
# Define training function that runs across multiple nodes or devices
# Each process receives a unique data shard
def train_func():
import os
import torch
import torch.distributed as dist
from torch.utils.data import DataLoader
from torch import nn
from torch.nn.parallel import DistributedDataParallel as DDP
# Get context with data shards and model directory
context = getContext()
dataset_map = context.get_dataset_map()
model_dir = context.get_model_dir()
training_data = dataset_map["train"].get_shard().to_torch_dataset()
train_dataloader = DataLoader(training_data, batch_size=batch_size, drop_last=True)
dist.init_process_group()
device = "cpu"
label_col = '"target"'
batch_size = 64
model = DDP(DigitsModel())
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
# Training loop
for epoch in range(5):
for batch, batch_data in enumerate(train_dataloader):
y = batch_data.pop(label_col).flatten().type(torch.LongTensor).to(device)
X = torch.concat(
[tensor.to(torch.float32) for tensor in batch_data.values()],
dim=-1,
).to(device)
pred = model(X)
loss = loss_fn(pred, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if batch % 100 == 0:
print(f"Epoch {epoch}, Batch {batch}, Loss: {loss.item()}")
# Save the model
if dist.get_rank() == 0:
torch.save(model.state_dict(), os.path.join(model_dir, "digits_model.pth"))
# Create PyTorch trainer with scaling configuration
pytorch_trainer = PyTorchTrainer(
train_func=train_func,
scaling_config=ScalingConfig(
num_nodes=1,
num_workers_per_node=4,
resource_requirements_per_worker=WorkerResourceConfig(num_cpus=1, num_gpus=0),
),
)
# Run distributed training
response = pytorch_trainer.run(
dataset_map=dict(
train=sharded_data_connector,
)
)
Snowflakeステージから非構造化データをロードする¶
Snowflake DataSource APIs を使用して、Snowflakeステージから非構造化データを読み込みます。各ファイル形式にはデータの読み取り方法を定義する対応するデータソースクラスがあります。
以下に、データのロードに使用するファイル形式と対応する APIs を示します。
バイナリファイル:
SFStageBinaryFileDataSourceテキストファイル:
SFStageTextDataSourceCSV ファイル:
SFStageCSVDataSourceParquetファイル:
SFStageParquetDataSource画像ファイル:
SFStageImageDataSource
データの読み込みと処理¶
Snowflakeデータソースを作成するには、以下を指定する必要があります。
データの読み込み元のステージ名
ステージを持つデータベース(デフォルトは現在のセッション)
ステージを持つスキーマ(デフォルトは現在のセッション)
データソースから読み取られるフィルターファイルのパターン(オプション)
データ API またはデータコネクタは、提供されたパス内でファイルパターンに一致するすべてのファイルを検索します。
Snowflakeデータソースを定義すると、データをRayデータセットにロードできます。Rayデータセットを使って、以下のことができます。
Ray APIs でデータセットを使う
DataConnector にデータセットを渡す
必要に応じてpandasまたはPyTorch データセットに変換する。
次の例では次のようにします。
SnowflakeステージからRayデータセットにParquetファイルを読み込みます
データセットを DataConnector に変換します
import ray
from snowflake.ml.ray.datasource.stage_parquet_file_datasource import SFStageParquetDataSource
from snowflake.ml.data.data_connector import DataConnector
data_source = SFStageParquetDataSource(
stage_location="@stage/path/",
database="DB_NAME", # optional
schema="SCHEMA_NAME", # optional
file_pattern='*.parquet', # optional
)
# Build Ray dataset from provided datasources
ray_ds = ray.data.read_datasource(data_source)
dc = DataConnector.from_ray_dataset(ray_ds)
構造化データをSnowflakeテーブルに書き戻します¶
Snowflake DataSink API を使用して、ノートブックまたは ML ジョブから Snowflakeテーブルに構造化データを書き戻します。変換または予測データセットをSnowflakeに書き込んで、さらなる分析や保存を行うことができます。
データシンクを定義するには、次を指定します。
ステージ名
データベース名(デフォルトは現在のセッション)
スキーマ名(デフォルトは現在のセッション)
特定のファイルに一致するファイルパターン(オプション)
以下の例では、データシンクを定義しています。
from snowflake.ml.ray.datasink import SnowflakeTableDatasink
datasink = SnowflakeTableDatasink(
table_name="table_name",
database="db_name",
schema="schema_name",
auto_create_table=True, # create table if not exists
override=True # replace vs insert to table
)
データシンクを定義した後、以下のコードを使用してRayデータセットをSnowflakeテーブルに書き込むことができます。
import ray
# Get Ray dataset from sources
ray_ds = ray.data.read_datasource(data_source)
# Setup transform operations, not executed yet
transformed_ds = ray_ds.map_batches(example_transform_batch_function)
# Start writing to Snowflake distributedly
transformed_ds.write_datasink(datasink)
ベストプラクティスと考慮事項¶
最適なパフォーマンスとリソース利用のために、以下のベストプラクティスを検討してください。
並列処理:Rayの分散性を活用したデータソースの実装を設計します。ユースケースに合わせて並列処理と同時実行の引数をカスタマイズします。各ステップでタスクごとに割り当てるリソースの数を手動で定義できます。
分割:デフォルトでは、Rayの内部ロジックはリソースとデータサイズに基づいてデータセットを分割します。ray_ds.repartition(X) を使って、ユースケースに応じてパーティション数をカスタマイズし、多数の小さなタスクと少数の大きなタスクを選択することができます。
ベストプラクティス:追加ガイダンスについては、`Rayデータユーザーガイド<https://docs.ray.io/en/latest/data/user-guide.html>`_ に従ってください。
Ray API 詳細:
次のステップ¶
データをロードした後、次のことが可能になります。