データのロードと書き込み

Snowflake ML を使用して、Snowflakeテーブルとステージから機械学習ワークフローに効率的にデータをロードします。Snowflake ML は、Snowflakeの分散処理を活用した最適化されたデータロード機能を提供し、トレーニングおよび推論ワークフローのデータ取り込みを高速化します。

次を使用してデータをロードし、処理することができます。

  • Snowflake Notebooks:データを探索し、ML モデルを構築するためのインタラクティブな開発環境。詳細については、 Notebooks on Container Runtime for ML をご参照ください。

  • Snowflake ML Jobs:どのような開発環境からでも、ML ワークロードを非同期で実行できます。詳細については、 Snowflake MLのジョブ をご参照ください。

ノートブックも ML ジョブも、Container Runtime for ML 上で実行され、分散処理機能を備えた機械学習ワークロード用に最適化された事前設定済みの環境を提供します。Container Runtimeは、分散コンピューティングのオープンソースフレームワークであるRayを使用して、複数のコンピュートノードにまたがるデータを効率的に処理します。Container Runtime forML の詳細については、ML のContainer Runtime を参照してください。

SnowflakeML は、構造化データと非構造化データをロードするための異なる APIs を提供します。

構造化データ(テーブルとデータセット)

非構造化データ(ステージ内ファイル)

以下の表は、ユースケースに適したAPI を選択するのに役立ちます。

データソースと APIs

データ型

データソース

ローディング用 API

書き込み用 API

構造化

Snowflakeテーブル

DataConnector

DataSink

構造化

Snowflake Datasets

DataConnector

DataSink

非構造化

CSV ファイル(ステージ)

DataSource に API

N/A

非構造化

Parquetファイル(ステージ)

DataSource に API

N/A

非構造化

その他のステージングされたファイル

DataSource に API

N/A

Snowflakeテーブルから構造化データをロードする

Snowflake DataConnector を使用して、SnowflakeテーブルおよびSnowflakeデータセットからSnowflakeノートブックまたはSnowflake ML ジョブに構造化データをロードします。DataConnector は、複数のコンピュートノードにまたがって読み込みを並列化することで、データのロードを高速化します。

DataConnector は、Snowpark DataFrames またはSnowflakeデータセットのいずれかで動作します。

  • Snowpark DataFrames:Snowflakeテーブルのデータに直接アクセスできます。開発中に使用するのが最適です。

  • Snowflakeデータセット:バージョン管理されたスキーマレベルのオブジェクト。プロダクションワークフローに最適です。詳細については、 Snowflake Datasets をご参照ください。

読み込みを並列化した後、DataConnector は、データを以下のデータ構造のいずれかに変換することができます。

  • pandasデータフレーム

  • PyTorch データセット

  • TensorFlow データセット

DataConnector の作成

SnowparkDataFrame または Snowflakeデータセットから DataConnector を作成できます。

以下のコードを使用して、Snowpark DataFrame から DataConnector を作成します。

from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session

session = get_active_session()

# Create DataConnector from a Snowflake table
data_connector = DataConnector.from_dataframe(session.table("example-table-name"))
Copy

次のコードを使用して、Snowflakeデータセットから DataConnector を作成します。

from snowflake.ml.data.data_connector import DataConnector

# Create DataConnector from a Snowflake Dataset
data_connector = DataConnector.from_dataset(snowflake_dataset)
Copy

DataConnector を他の形式に変換する

DataConnector を作成した後、さまざまな ML フレームワークで使用できるように、さまざまなデータ構造に変換することができます。

scikit-learnや他のpandas互換ライブラリで使用するために、DataConnector を pandasデータフレームに変換することができます。

次の例では、Snowflakeテーブルからpandasデータフレームにデータを読み込み、XGBoost 分類器をトレーニングします。

from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session
import xgboost as xgb

session = get_active_session()

# Specify training table location
table_name = "TRAINING_TABLE"

# Load table into DataConnector
data_connector = DataConnector.from_dataframe(session.table(table_name))

# Convert to pandas dataframe
pandas_df = data_connector.to_pandas()

# Prepare features and labels
label_column_name = 'TARGET'
X, y = pandas_df.drop(label_column_name, axis=1), pandas_df[label_column_name]

# Train classifier
clf = xgb.Classifier()
clf.fit(X, y)
Copy

Snowflakeの分散トレーニング APIs の使用

最高のパフォーマンスを得るためには、DataConnector を、pandas、PyTorch 、TensorFlow のデータセットに最初に変換するのではなく、Snowflakeの最適化された分散トレーニング APIs に直接渡すことができます。

以下の例では、Snowflakeの分散XGBoost 推定量を使用して、XGBoost モデルをトレーニングしています。

from snowflake.ml.data.data_connector import DataConnector
from snowflake.ml.modeling.distributors.xgboost.xgboost_estimator import (
    XGBEstimator,
    XGBScalingConfig,
)
from snowflake.snowpark.context import get_active_session

session = get_active_session()

# Create DataConnector from a Snowpark dataframe
snowflake_df = session.table("TRAINING_TABLE")
data_connector = DataConnector.from_dataframe(snowflake_df)

# Create Snowflake XGBoost estimator
snowflake_est = XGBEstimator(
    n_estimators=1,
    objective="reg:squarederror",
    scaling_config=XGBScalingConfig(use_gpu=False),
)

# Train using the data connector
# When using a data connector, input_cols and label_col must be provided
fit_booster = snowflake_est.fit(
    data_connector,
    input_cols=NUMERICAL_COLS,
    label_col=LABEL_COL
)
Copy

PyTorch ディストリビューターでシャーディングを使用する

SnowflakePyTorch ディストリビューターを使用した分散トレーニングのために、ShardedDataConnector を使用してデータを複数のノードにシャードすることができます。

次の例では、複数のプロセスにわたるシャードデータを使用して、数字データセットで PyTorch モデルをトレーニングしています。

from sklearn import datasets
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector
from snowflake.ml.modeling.pytorch import (
    PyTorchTrainer,
    ScalingConfig,
    WorkerResourceConfig,
    getContext,
)
from torch import nn
from snowflake.snowpark.context import get_active_session

session = get_active_session()

# Create the Snowflake data from a Snowpark dataframe
digits = datasets.load_digits(as_frame=True).frame
digits_df = session.create_dataframe(digits)

# Create sharded data connector
sharded_data_connector = ShardedDataConnector.from_dataframe(digits_df)

# Define the PyTorch model
class DigitsModel(nn.Module):
    def __init__(self):
        super(DigitsModel, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(8 * 8, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

# Define training function that runs across multiple nodes or devices
# Each process receives a unique data shard
def train_func():
    import os
    import torch
    import torch.distributed as dist
    from torch.utils.data import DataLoader
    from torch import nn
    from torch.nn.parallel import DistributedDataParallel as DDP

    # Get context with data shards and model directory
    context = getContext()
    dataset_map = context.get_dataset_map()
    model_dir = context.get_model_dir()
    training_data = dataset_map["train"].get_shard().to_torch_dataset()
    train_dataloader = DataLoader(training_data, batch_size=batch_size, drop_last=True)

    dist.init_process_group()
    device = "cpu"
    label_col = '"target"'
    batch_size = 64

    model = DDP(DigitsModel())
    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

    # Training loop
    for epoch in range(5):
        for batch, batch_data in enumerate(train_dataloader):
            y = batch_data.pop(label_col).flatten().type(torch.LongTensor).to(device)
            X = torch.concat(
                [tensor.to(torch.float32) for tensor in batch_data.values()],
                dim=-1,
            ).to(device)
            pred = model(X)
            loss = loss_fn(pred, y)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if batch % 100 == 0:
                print(f"Epoch {epoch}, Batch {batch}, Loss: {loss.item()}")

    # Save the model
    if dist.get_rank() == 0:
        torch.save(model.state_dict(), os.path.join(model_dir, "digits_model.pth"))

# Create PyTorch trainer with scaling configuration
pytorch_trainer = PyTorchTrainer(
    train_func=train_func,
    scaling_config=ScalingConfig(
        num_nodes=1,
        num_workers_per_node=4,
        resource_requirements_per_worker=WorkerResourceConfig(num_cpus=1, num_gpus=0),
    ),
)

# Run distributed training
response = pytorch_trainer.run(
    dataset_map=dict(
        train=sharded_data_connector,
    )
)
Copy

Snowflakeステージから非構造化データをロードする

Snowflake DataSource APIs を使用して、Snowflakeステージから非構造化データを読み込みます。各ファイル形式にはデータの読み取り方法を定義する対応するデータソースクラスがあります。

以下に、データのロードに使用するファイル形式と対応する APIs を示します。

  • バイナリファイル:SFStageBinaryFileDataSource

  • テキストファイル:SFStageTextDataSource

  • CSV ファイル: SFStageCSVDataSource

  • Parquetファイル: SFStageParquetDataSource

  • 画像ファイル: SFStageImageDataSource

データの読み込みと処理

Snowflakeデータソースを作成するには、以下を指定する必要があります。

  • データの読み込み元のステージ名

  • ステージを持つデータベース(デフォルトは現在のセッション)

  • ステージを持つスキーマ(デフォルトは現在のセッション)

  • データソースから読み取られるフィルターファイルのパターン(オプション)

データ API またはデータコネクタは、提供されたパス内でファイルパターンに一致するすべてのファイルを検索します。

Snowflakeデータソースを定義すると、データをRayデータセットにロードできます。Rayデータセットを使って、以下のことができます。

  • Ray APIs でデータセットを使う

  • DataConnector にデータセットを渡す

  • 必要に応じてpandasまたはPyTorch データセットに変換する。

次の例では次のようにします。

  • SnowflakeステージからRayデータセットにParquetファイルを読み込みます

  • データセットを DataConnector に変換します

import ray
from snowflake.ml.ray.datasource.stage_parquet_file_datasource import SFStageParquetDataSource
from snowflake.ml.data.data_connector import DataConnector

data_source = SFStageParquetDataSource(
    stage_location="@stage/path/",
    database="DB_NAME", # optional
    schema="SCHEMA_NAME", # optional
    file_pattern='*.parquet', # optional
)

# Build Ray dataset from provided datasources
ray_ds = ray.data.read_datasource(data_source)

dc = DataConnector.from_ray_dataset(ray_ds)
Copy

構造化データをSnowflakeテーブルに書き戻します

Snowflake DataSink API を使用して、ノートブックまたは ML ジョブから Snowflakeテーブルに構造化データを書き戻します。変換または予測データセットをSnowflakeに書き込んで、さらなる分析や保存を行うことができます。

データシンクを定義するには、次を指定します。

  • ステージ名

  • データベース名(デフォルトは現在のセッション)

  • スキーマ名(デフォルトは現在のセッション)

  • 特定のファイルに一致するファイルパターン(オプション)

以下の例では、データシンクを定義しています。

from snowflake.ml.ray.datasink import SnowflakeTableDatasink
datasink = SnowflakeTableDatasink(
    table_name="table_name",
    database="db_name",
    schema="schema_name",
    auto_create_table=True, # create table if not exists
    override=True # replace vs insert to table
)
Copy

データシンクを定義した後、以下のコードを使用してRayデータセットをSnowflakeテーブルに書き込むことができます。

import ray

# Get Ray dataset from sources
ray_ds = ray.data.read_datasource(data_source)

# Setup transform operations, not executed yet
transformed_ds = ray_ds.map_batches(example_transform_batch_function)

# Start writing to Snowflake distributedly
transformed_ds.write_datasink(datasink)
Copy

ベストプラクティスと考慮事項

最適なパフォーマンスとリソース利用のために、以下のベストプラクティスを検討してください。

並列処理:Rayの分散性を活用したデータソースの実装を設計します。ユースケースに合わせて並列処理と同時実行の引数をカスタマイズします。各ステップでタスクごとに割り当てるリソースの数を手動で定義できます。

分割:デフォルトでは、Rayの内部ロジックはリソースとデータサイズに基づいてデータセットを分割します。ray_ds.repartition(X) を使って、ユースケースに応じてパーティション数をカスタマイズし、多数の小さなタスクと少数の大きなタスクを選択することができます。

ベストプラクティス:追加ガイダンスについては、`Rayデータユーザーガイド<https://docs.ray.io/en/latest/data/user-guide.html>`_ に従ってください。

Ray API 詳細:

次のステップ

データをロードした後、次のことが可能になります。