Snowpark ML FileSystem と FileSet --- 非推奨

Snowpark ML ライブラリには、内部であるサーバー側で暗号化されたSnowflake ステージ のためのファイルシステムに似た抽象化、 FileSystem が含まれています。具体的には、 fsspec AbstractFileSystem の実装です。ライブラリには FileSet という関連クラスも含まれており、機械学習データをSnowflakeテーブルからステージに移動し、そこから PyTorch や TensorFlow にデータを供給することができます(Snowpark ML Data Connector を参照)。

Tip

ほとんどのユーザーは、Snowflakeで不変のガバメントデータスナップショットを作成し、エンドツーエンドの機械学習ワークフローで使用するために、新しい Dataset API を使用する必要があります。

インストール

FileSystem と FileSet APIs は、Snowpark ML Pythonパッケージ snowflake-ml-python の一部です。インストール方法については、 Snowflake ML をローカルで使用する をご参照ください。

ファイルシステムの作成と使用

Snowpark ML ファイルシステムの作成には、 Python用Snowflakeコネクタ Connection オブジェクトまたは Snowpark Python Session のいずれかが必要です。手順については、 Snowflakeへの接続 をご参照ください。

接続またはセッションのいずれかを取得した後、Snowpark ML SFFileSystem インスタンスを作成し、それを介して内部ステージのデータにアクセスすることができます。

Python接続用Snowflakeコネクタの場合は、 sf_connection 引数として渡します。

import fsspec
from snowflake.ml.fileset import sfcfs

sf_fs1 = sfcfs.SFFileSystem(sf_connection=sf_connection)
Copy

Snowpark Pythonセッションの場合は、 snowpark_session 引数として渡します。

import fsspec
from snowflake.ml.fileset import sfcfs

sf_fs2 = sfcfs.SFFileSystem(snowpark_session=sp_session)
Copy

SFFileSystem は、ファイルのローカルキャッシュなど、多くの機能を fsspec.FileSystem から継承します。 fsspec.filesystem ファクトリ関数を介してSnowflakeファイルシステムをインスタンス化し、 target_protocol="sfc" を渡してSnowflake FileSystem 実装を使用することで、この機能やその他の機能を有効にすることができます。

local_cache_path = "/tmp/sf_files/"
cached_fs = fsspec.filesystem("cached", target_protocol="sfc",
                    target_options={"sf_connection": sf_connection,
                                    "cache_types": "bytes",
                                    "block_size": 32 * 2**20},
                    cache_storage=local_cache_path)
Copy

Snowflakeファイルシステムは、 findinfoisdirisfile、および exists を含む、fsspec FileSystem に対して定義されたほとんどの読み取り専用メソッドをサポートしています。

ファイルの指定

ステージのファイルを指定するには、パスを @database.schema.stage/file_path の形式で使用します。

ファイルのリスト

ファイルシステムの ls メソッドを使用して、ステージ内のファイルのリストを取得します。

print(*cached_fs.ls("@ML_DATASETS.public.my_models/sales_predict/"), end='\n')
Copy

ファイルの展開と読み取り

ステージにあるファイルを開くには、ファイルシステムの open メソッドを使用します。その後、通常のPythonファイルと同じ方法でファイルを読み取ることができます。ファイルオブジェクトは、Pythonの with ステートメントで使用できるコンテキストマネージャーでもあるため、不要になると自動的に閉じられます。

path = '@ML_DATASETS.public.my_models/test/data_7_7_3.snappy.parquet'

with sf_fs1.open(path, mode='rb') as f:
    print(f.read(16))
Copy

fsspecファイルシステムを受け付ける他のコンポーネントで SFFileSystem インスタンスを使用することもできます。ここでは、前のコードブロックで説明したParquetデータファイルが PyArrow の read_table メソッドに渡されます。

import pyarrow.parquet as pq

table = pq.read_table(path, filesystem=sf_fs1)
table.take([1, 3])
Copy

ファイル(またはファイルに似たオブジェクト)を受け付けるPythonコンポーネントには、Snowflakeファイルシステムから開かれたファイルオブジェクトを渡すことができます。たとえば、ステージにgzipで圧縮されたファイルがある場合は、それを gzip.GzipFilefileobj パラメーターとして渡すと、Pythonの gzip モジュールで使用することができます。

path = "sfc://@ML_DATASETS.public.my_models/dataset.csv.gz"

with cached_fs.open(path, mode='rb', sf_connection=sf_connection) as f:
    g = gzip.GzipFile(fileobj=f)
    for i in range(3):
        print(g.readline())
Copy

FileSet の作成および使用

Snowflake FileSet は、 SQL クエリの結果の不変スナップショットを、サーバー側で暗号化された内部ステージのファイル形式で表します。これらのデータファイルには、 PyTorch や TensorFlow などのツールにデータを供給するために FileSystem を介してアクセスすることができ、既存のデータガバナンスモデルの範囲内で、大規模にモデルをトレーニングすることができます。FileSet を作成するには、 FileSet.make メソッドを使用します。

FileSet を作成するには、Snowflake Python接続またはSnowparkセッションが必要です。手順については、 Snowflakeへの接続 をご参照ください。また、 FileSet が格納される、既存の内部サーバー側の暗号化ステージ、またはそのようなステージ下のサブディレクトリへのパスを提供する必要があります。

Snowpark DataFrame から FileSet を作成するには、 DataFrame を構築 し、 snowpark_dataframe として FileSet.make に渡します。 DataFrame の collect メソッドは呼び出さないでください。

# Snowpark Python equivalent of "SELECT * FROM MYDATA LIMIT 5000000"
df = snowpark_session.table('mydata').limit(5000000)
fileset_df = fileset.FileSet.make(
    target_stage_loc="@ML_DATASETS.public.my_models/",
    name="from_dataframe",
    snowpark_dataframe=df,
    shuffle=True,
)
Copy

Python接続用Snowflakeコネクタを使用して FileSet を作成するには、接続を Fileset.makesf_connection として渡し、 SQL クエリを query として渡します。

fileset_sf = fileset.FileSet.make(
    target_stage_loc="@ML_DATASETS.public.my_models/",
    name="from_connector",
    sf_connection=sf_connection,
    query="SELECT * FROM MYDATA LIMIT 5000000",
    shuffle=True,           # see later section about shuffling
)
Copy

注釈

shuffle パラメーターを使用したデータのシャッフルについては、 FileSets でのデータのシャッフル をご参照ください。

FileSet にあるファイルのリストを取得するには、 files メソッドを使用します。

print(*fileset_df.files())
Copy

FileSet のデータを PyTorch または TensorFlow に送る情報については、それぞれ FileSet の PyTorch へのフィード または FileSet の TensorFlow へのフィード をご参照ください。

FileSet の PyTorch へのフィード

Snowflake FileSet からは PyTorch DataPipe を取得でき、これは PyTorch DataLoader に渡すことができます。DataLoader は FileSet データを反復処理し、バッチ化された PyTorch テンソルを生成します。FileSet の to_torch_datapipe メソッドで DataPipe を作成し、 DataPipe を PyTorch の DataLoader に渡します。

from torch.utils.data import DataLoader

# See later sections about shuffling and batching
pipe = fileset_df.to_torch_datapipe(
    batch_size=4,
    shuffle=True,
    drop_last_batch=True)

for batch in DataLoader(pipe, batch_size=None, num_workers=0):
    print(batch)
    break
Copy

FileSet の TensorFlow へのフィード

FileSet の to_tf_dataset メソッドを使用して、Snowflake FileSet から TensorFlow データセットを取得できます。

import tensorflow as tf

# See following sections about shuffling and batching
ds = fileset_df.to_tf_dataset(
    batch_size=4,
    shuffle=True,
    drop_last_batch=True)

for batch in ds:
    print(batch)
    break
Copy

データセットを反復処理すると、バッチ化されたテンソルが生成されます。

FileSets でのデータのシャッフル

多くの場合、過剰適合やその他の問題を避けるために、学習データをシャッフルすることには価値があります。シャッフルの価値については、 機械学習タスクのためにデータをシャッフルすべき理由について をご参照ください。

クエリがデータをまだ十分にシャッフルしていない場合は、 FileSet により2つの時点でデータをシャッフルすることができます。

  • FileSet.make を使用して FileSet を作成するとき。

    クエリ内のすべての行は、 FileSet に書き込まれる前にシャッフルされます。これは高品質なグローバルシャッフルであり、大規模なデータセットではコストが上昇する可能性があります。そのため、 FileSet を具体化する際に一度だけ実行されます。 FileSet.make にキーワード引数として shuffle=True を渡します。

  • FileSet から PyTorch DataPipe または TensorFlow データセットを作成するとき。

    この時点で、 FileSet のファイルの順番はランダム化され、各ファイルの行の順番もランダム化されます。これは「近似的な」グローバルシャッフルと考えることができます。真のグローバルシャッフルより品質は劣りますが、コストを大幅に低減できます。このステージでシャッフルするには、 FileSet の to_torch_datapipe または to_tf_dataset メソッドにキーワード引数として shuffle=True を渡します。

最良の結果を得るには、 FileSet を作成するときと、 PyTorch または TensorFlow にデータを供給するときの2回シャッフルします。

FileSets でのデータのバッチ処理

FileSets にはバッチ処理機能があり、 PyTorch や TensorFlow のバッチ機能と同じ働きをしますが、より効率的です。Snowflakeは、 PyTorch や TensorFlow でバッチ処理を行うのではなく、 FileSet の to_torch_datapipeto_tf_dataset メソッドで batch_size パラメーターを使用することをお勧めします。PyTorch でバッチ処理機能を無効にするには、 DataLoader のインスタンス化時に batch_size=None を明示的に渡す必要があります。

また、 to_torch_datapipe または to_tf_datasetdrop_last_batch=True を渡すと、最後のバッチが不完全であった場合にそのバッチをドロップすることができます。