Snowpark ML FileSystem と FileSet --- 非推奨¶
Snowpark ML ライブラリには、内部であるサーバー側で暗号化されたSnowflake ステージ のためのファイルシステムに似た抽象化、 FileSystem が含まれています。具体的には、 fsspec AbstractFileSystem の実装です。ライブラリには FileSet という関連クラスも含まれており、機械学習データをSnowflakeテーブルからステージに移動し、そこから PyTorch や TensorFlow にデータを供給することができます(Snowpark ML Data Connector を参照)。
Tip
ほとんどのユーザーは、Snowflakeで不変のガバメントデータスナップショットを作成し、エンドツーエンドの機械学習ワークフローで使用するために、新しい Dataset API を使用する必要があります。
インストール¶
FileSystem と FileSet APIs は、Snowpark ML Pythonパッケージ snowflake-ml-python
の一部です。インストール方法については、 Snowflake ML をローカルで使用する をご参照ください。
ファイルシステムの作成と使用¶
Snowpark ML ファイルシステムの作成には、 Python用Snowflakeコネクタ Connection
オブジェクトまたは Snowpark Python Session
のいずれかが必要です。手順については、 Snowflakeへの接続 をご参照ください。
接続またはセッションのいずれかを取得した後、Snowpark ML SFFileSystem
インスタンスを作成し、それを介して内部ステージのデータにアクセスすることができます。
Python接続用Snowflakeコネクタの場合は、 sf_connection
引数として渡します。
import fsspec
from snowflake.ml.fileset import sfcfs
sf_fs1 = sfcfs.SFFileSystem(sf_connection=sf_connection)
Snowpark Pythonセッションの場合は、 snowpark_session
引数として渡します。
import fsspec
from snowflake.ml.fileset import sfcfs
sf_fs2 = sfcfs.SFFileSystem(snowpark_session=sp_session)
SFFileSystem
は、ファイルのローカルキャッシュなど、多くの機能を fsspec.FileSystem
から継承します。 fsspec.filesystem
ファクトリ関数を介してSnowflakeファイルシステムをインスタンス化し、 target_protocol="sfc"
を渡してSnowflake FileSystem 実装を使用することで、この機能やその他の機能を有効にすることができます。
local_cache_path = "/tmp/sf_files/"
cached_fs = fsspec.filesystem("cached", target_protocol="sfc",
target_options={"sf_connection": sf_connection,
"cache_types": "bytes",
"block_size": 32 * 2**20},
cache_storage=local_cache_path)
Snowflakeファイルシステムは、 find
、 info
、 isdir
、 isfile
、および exists
を含む、fsspec FileSystem
に対して定義されたほとんどの読み取り専用メソッドをサポートしています。
ファイルの指定¶
ステージのファイルを指定するには、パスを @database.schema.stage/file_path
の形式で使用します。
ファイルのリスト¶
ファイルシステムの ls
メソッドを使用して、ステージ内のファイルのリストを取得します。
print(*cached_fs.ls("@ML_DATASETS.public.my_models/sales_predict/"), end='\n')
ファイルの展開と読み取り¶
ステージにあるファイルを開くには、ファイルシステムの open
メソッドを使用します。その後、通常のPythonファイルと同じ方法でファイルを読み取ることができます。ファイルオブジェクトは、Pythonの with
ステートメントで使用できるコンテキストマネージャーでもあるため、不要になると自動的に閉じられます。
path = '@ML_DATASETS.public.my_models/test/data_7_7_3.snappy.parquet'
with sf_fs1.open(path, mode='rb') as f:
print(f.read(16))
fsspecファイルシステムを受け付ける他のコンポーネントで SFFileSystem
インスタンスを使用することもできます。ここでは、前のコードブロックで説明したParquetデータファイルが PyArrow の read_table
メソッドに渡されます。
import pyarrow.parquet as pq
table = pq.read_table(path, filesystem=sf_fs1)
table.take([1, 3])
ファイル(またはファイルに似たオブジェクト)を受け付けるPythonコンポーネントには、Snowflakeファイルシステムから開かれたファイルオブジェクトを渡すことができます。たとえば、ステージにgzipで圧縮されたファイルがある場合は、それを gzip.GzipFile
に fileobj
パラメーターとして渡すと、Pythonの gzip
モジュールで使用することができます。
path = "sfc://@ML_DATASETS.public.my_models/dataset.csv.gz"
with cached_fs.open(path, mode='rb', sf_connection=sf_connection) as f:
g = gzip.GzipFile(fileobj=f)
for i in range(3):
print(g.readline())
FileSet の作成および使用¶
Snowflake FileSet は、 SQL クエリの結果の不変スナップショットを、サーバー側で暗号化された内部ステージのファイル形式で表します。これらのデータファイルには、 PyTorch や TensorFlow などのツールにデータを供給するために FileSystem を介してアクセスすることができ、既存のデータガバナンスモデルの範囲内で、大規模にモデルをトレーニングすることができます。FileSet を作成するには、 FileSet.make
メソッドを使用します。
FileSet を作成するには、Snowflake Python接続またはSnowparkセッションが必要です。手順については、 Snowflakeへの接続 をご参照ください。また、 FileSet が格納される、既存の内部サーバー側の暗号化ステージ、またはそのようなステージ下のサブディレクトリへのパスを提供する必要があります。
Snowpark DataFrame から FileSet を作成するには、 DataFrame を構築 し、 snowpark_dataframe
として FileSet.make
に渡します。 DataFrame の collect
メソッドは呼び出さないでください。
# Snowpark Python equivalent of "SELECT * FROM MYDATA LIMIT 5000000"
df = snowpark_session.table('mydata').limit(5000000)
fileset_df = fileset.FileSet.make(
target_stage_loc="@ML_DATASETS.public.my_models/",
name="from_dataframe",
snowpark_dataframe=df,
shuffle=True,
)
Python接続用Snowflakeコネクタを使用して FileSet を作成するには、接続を Fileset.make
に sf_connection
として渡し、 SQL クエリを query
として渡します。
fileset_sf = fileset.FileSet.make(
target_stage_loc="@ML_DATASETS.public.my_models/",
name="from_connector",
sf_connection=sf_connection,
query="SELECT * FROM MYDATA LIMIT 5000000",
shuffle=True, # see later section about shuffling
)
注釈
shuffle
パラメーターを使用したデータのシャッフルについては、 FileSets でのデータのシャッフル をご参照ください。
FileSet にあるファイルのリストを取得するには、 files
メソッドを使用します。
print(*fileset_df.files())
FileSet のデータを PyTorch または TensorFlow に送る情報については、それぞれ FileSet の PyTorch へのフィード または FileSet の TensorFlow へのフィード をご参照ください。
FileSet の PyTorch へのフィード¶
Snowflake FileSet からは PyTorch DataPipe を取得でき、これは PyTorch DataLoader に渡すことができます。DataLoader は FileSet データを反復処理し、バッチ化された PyTorch テンソルを生成します。FileSet の to_torch_datapipe
メソッドで DataPipe を作成し、 DataPipe を PyTorch の DataLoader
に渡します。
from torch.utils.data import DataLoader
# See later sections about shuffling and batching
pipe = fileset_df.to_torch_datapipe(
batch_size=4,
shuffle=True,
drop_last_batch=True)
for batch in DataLoader(pipe, batch_size=None, num_workers=0):
print(batch)
break
FileSet の TensorFlow へのフィード¶
FileSet の to_tf_dataset
メソッドを使用して、Snowflake FileSet から TensorFlow データセットを取得できます。
import tensorflow as tf
# See following sections about shuffling and batching
ds = fileset_df.to_tf_dataset(
batch_size=4,
shuffle=True,
drop_last_batch=True)
for batch in ds:
print(batch)
break
データセットを反復処理すると、バッチ化されたテンソルが生成されます。
FileSets でのデータのシャッフル¶
多くの場合、過剰適合やその他の問題を避けるために、学習データをシャッフルすることには価値があります。シャッフルの価値については、 機械学習タスクのためにデータをシャッフルすべき理由について をご参照ください。
クエリがデータをまだ十分にシャッフルしていない場合は、 FileSet により2つの時点でデータをシャッフルすることができます。
FileSet.make
を使用して FileSet を作成するとき。クエリ内のすべての行は、 FileSet に書き込まれる前にシャッフルされます。これは高品質なグローバルシャッフルであり、大規模なデータセットではコストが上昇する可能性があります。そのため、 FileSet を具体化する際に一度だけ実行されます。
FileSet.make
にキーワード引数としてshuffle=True
を渡します。FileSet から PyTorch DataPipe または TensorFlow データセットを作成するとき。
この時点で、 FileSet のファイルの順番はランダム化され、各ファイルの行の順番もランダム化されます。これは「近似的な」グローバルシャッフルと考えることができます。真のグローバルシャッフルより品質は劣りますが、コストを大幅に低減できます。このステージでシャッフルするには、 FileSet の
to_torch_datapipe
またはto_tf_dataset
メソッドにキーワード引数としてshuffle=True
を渡します。
最良の結果を得るには、 FileSet を作成するときと、 PyTorch または TensorFlow にデータを供給するときの2回シャッフルします。
FileSets でのデータのバッチ処理¶
FileSets にはバッチ処理機能があり、 PyTorch や TensorFlow のバッチ機能と同じ働きをしますが、より効率的です。Snowflakeは、 PyTorch や TensorFlow でバッチ処理を行うのではなく、 FileSet の to_torch_datapipe
と to_tf_dataset
メソッドで batch_size
パラメーターを使用することをお勧めします。PyTorch でバッチ処理機能を無効にするには、 DataLoader
のインスタンス化時に batch_size=None
を明示的に渡す必要があります。
また、 to_torch_datapipe
または to_tf_dataset
に drop_last_batch=True
を渡すと、最後のバッチが不完全であった場合にそのバッチをドロップすることができます。