Snowflake Datasets

データセットは、機械学習ワークフロー用に特別に設計された新しいSnowflakeスキーマレベルのオブジェクトです。Snowflake Datasetsは、バージョンごとに整理されたデータのコレクションを保持します。各バージョンは、データのマテリアライズされたスナップショットを保持し、保証された不変性、効率的なデータアクセス、一般的な深層学習フレームワークとの相互運用性を備えています。

注釈

Datasetsは SQL オブジェクトですが、Snowpark ML 専用です。Snowsightデータベースオブジェクトエクスプローラには表示されず、SQLコマンドも使用しません。

このような場合は、Snowflake Datasetsを使用する必要があります。

  • 再現性のある機械学習モデルのトレーニングとテストのために、大規模なデータセットを管理し、バージョン管理する必要があります。

  • Snowflakeのスケーラブルかつセキュアなデータストレージと処理機能を活用したいと考えています。

  • 分散トレーニングやデータストリーミングには、きめ細かいファイルレベルのアクセスやデータシャッフリングが必要です。

  • 外部の機械学習フレームワークやツールと統合する必要があります。

注釈

マテリアライズドデータセットにはストレージコストがかかります。これらのコストを最小限に抑えるには、未使用のデータセットを削除します。

インストール

Dataset Python SDK はバージョン1.5.0からSnowpark ML(Pythonパッケージ snowflake-ml-python)に含まれています。インストール手順については、 Snowflake ML をローカルで使用する をご参照ください。

必要な権限

データセットの作成には、 CREATE DATASET スキーマレベルの権限が必要です。Datasetsのバージョンの追加や削除など、データセットを変更するには、Datasetに OWNERSHIP が必要です。データセットからの読み込みに必要なのは、そのデータセット(または OWNERSHIP)の USAGE 権限のみです。Snowflakeでの権限付与の詳細については、 GRANT <権限> をご参照ください。

Tip

setup_feature_store メソッドまたは 権限設定 SQL スクリプト のいずれかを使用して、Snowflake機能ストアの権限を設定すると、データセットの権限も設定されます。これらのいずれかの方法で機能ストアの権限をすでに設定している場合は、これ以上の操作は必要ありません。

Datasetsの作成および使用

Datasetsは、 snowflake.ml.dataset.create_from_dataframe 関数にSnowpark DataFrame を渡すことで作成されます。

from snowflake import snowpark
from snowflake.ml import dataset

# Create Snowpark Session
# See https://docs.snowflake.com/en/developer-guide/snowpark/python/creating-session
session = snowpark.Session.builder.configs(connection_parameters).create()

# Create a Snowpark DataFrame to serve as a data source
# In this example, we generate a random table with 100 rows and 1 column
df = session.sql(
    "select uniform(0, 10, random(1)) as x, uniform(0, 10, random(2)) as y from table(generator(rowcount => 100))"
)

# Materialize DataFrame contents into a Dataset
ds1 = dataset.create_from_dataframe(
    session,
    "my_dataset",
    "version1",
    input_dataframe=df)
Copy

Datasetsはバージョン管理されています。各バージョンは、データセットが管理するデータの、不変の、ポイントインタイムのスナップショットです。Python APIには、与えられたデータセットが使用するために選択されているかどうかを示す Dataset.selected_version プロパティが含まれています。このプロパティは、 dataset.create_from_dataframe および dataset.load_dataset ファクトリーメソッドによって自動的に設定されます。したがって、データセットを作成すると、作成されたバージョンが自動的に選択されます。 Dataset.select_version および Dataset.create_version メソッドを使用して、明示的にバージョンを切り替えることもできます。データセットからの読み込みは、アクティブな選択バージョンから読み込みます。

# Inspect currently selected version
print(ds1.selected_version) # DatasetVersion(dataset='my_dataset', version='version1')
print(ds1.selected_version.created_on) # Prints creation timestamp

# List all versions in the Dataset
print(ds1.list_versions()) # ["version1"]

# Create a new version
ds2 = ds1.create_version("version2", df)
print(ds1.selected_version.name)  # "version1"
print(ds2.selected_version.name)  # "version2"
print(ds1.list_versions())        # ["version1", "version2"]

# selected_version is immutable, meaning switching versions with
# ds1.select_version() returns a new Dataset object without
# affecting ds1.selected_version
ds3 = ds1.select_version("version2")
print(ds1.selected_version.name)  # "version1"
print(ds3.selected_version.name)  # "version2"
Copy

Datasetsからのデータの読み込み

データセットのバージョンデータは、Apache Parquetフォーマットの均等サイズのファイルとして保存されます。 Dataset クラスは、Snowflake Datasetsからデータを読み込むための FileSet と同様の API を提供し、TensorFlow および PyTorch の組み込みコネクタを含みます。APIは、カスタムフレームワークコネクタをサポートするために拡張可能です。

Datesetからの読み込みには、アクティブな選択バージョンが必要です。

TensorFlowに接続する

Datasetsを TensorFlow の tf.data.Dataset に変換し、バッチでストリーミングすることで、効率的な学習と評価を行うことができます。

import tensorflow as tf

# Convert Snowflake Dataset to TensorFlow Dataset
tf_dataset = ds1.read.to_tf_dataset(batch_size=32)

# Train a TensorFlow model
for batch in tf_dataset:
    # Extract and build tensors as needed
    input_tensor = tf.stack(list(batch.values()), axis=-1)

    # Forward pass (details not included for brevity)
    outputs = model(input_tensor)
Copy

PyTorchに接続する

Datasetsは、PyTorchDataPipesへの変換もサポートしており、効率的なトレーニングと評価のためにバッチでストリーミングできます。

import torch

# Convert Snowflake Dataset to PyTorch DataPipe
pt_datapipe = ds1.read.to_torch_datapipe(batch_size=32)

# Train a PyTorch model
for batch in pt_datapipe:
    # Extract and build tensors as needed
    input_tensor = torch.stack([torch.from_numpy(v) for v in batch.values()], dim=-1)

    # Forward pass (details not included for brevity)
    outputs = model(input_tensor)
Copy

Snowpark MLに接続する

データセットをSnowpark DataFramesに変換して、Snowpark ML Modelingと統合することもできます。変換されたSnowpark DataFrame は、Dataset作成時に提供された DataFrame と同じではなく、Datasetバージョンのマテリアライズされたデータを指しています。

from snowflake.ml.modeling.ensemble import random_forest_regressor

# Get a Snowpark DataFrame
ds_df = ds1.read.to_snowpark_dataframe()

# Note ds_df != df
ds_df.explain()
df.explain()

# Train a model in Snowpark ML
xgboost_model = random_forest_regressor.RandomForestRegressor(
    n_estimators=100,
    random_state=42,
    input_cols=["X"],
    label_cols=["Y"],
)
xgboost_model.fit(ds_df)
Copy

ファイルへ直接アクセスする

Dataset API は、 fsspec インターフェースも公開します。このインターフェースは、PyArrowや Daskなどの外部ライブラリや、 fsspec をサポートし、分散および/またはストリームベースのモデル学習を可能にするその他のパッケージとのカスタム統合を構築するために使用できます。

print(ds1.read.files()) # ['snow://dataset/my_dataset/versions/version1/data_0_0_0.snappy.parquet']

import pyarrow.parquet as pq
pd_ds = pq.ParquetDataset(ds1.read.files(), filesystem=ds1.read.filesystem())

import dask.dataframe as dd
dd_df = dd.read_parquet(ds1.read.files(), filesystem=ds1.read.filesystem())
Copy

現在の制限事項と既知の問題

  • データセット名は SQL の識別子であり、 Snowflakeの識別子要件 に従います。

  • データセットバージョンは文字列で、最大長は128文字です。一部の文字は許可されていないため、エラーメッセージが表示されます。

  • 広いスキーマ(約4,000列以上)を持つデータセットに対する一部のクエリ操作は、完全に最適化されません。今後のリリースでは改善される予定です。