Snowpark ML FileSystem および FileSet

注釈

Snowpark ML 1.5.0では、 データセット を導入しました。これは、機械学習アプリケーションで使用するために設計された、バージョン管理された不変のスナップショットです。ほとんどのユースケースでは、このトピックで説明した FileSet API よりも優れています。FileSet API はプレビュー機能であり、一般公開はされないが、現時点でもサポートされています。

Snowpark ML ライブラリには、内部であるサーバー側で暗号化されたSnowflake ステージ のためのファイルシステムに似た抽象化、 FileSystem が含まれています。具体的には、 fsspec AbstractFileSystem の実装です。ライブラリには FileSet という関連クラスも含まれており、機械学習データをSnowflakeテーブルからステージに移動し、そこから PyTorch や TensorFlow にデータを供給することができます(Snowpark ML フレームワークコネクタ を参照)。

Tip

ほとんどのユーザーは、Snowflakeで不変のガバメントデータスナップショットを作成し、エンドツーエンドの機械学習ワークフローで使用するために、新しい Dataset API を使用する必要があります。

インストール

FileSystem と FileSet APIs は、Snowpark ML Pythonパッケージ snowflake-ml-python の一部です。インストール方法については、 Snowflake ML をローカルで使用する をご参照ください。

ファイルシステムの作成と使用

Snowpark ML ファイルシステムの作成には、 Python用Snowflakeコネクタ Connection オブジェクトまたは Snowpark Python Session のいずれかが必要です。手順については、 Snowflakeへの接続 をご参照ください。

接続またはセッションのいずれかを取得した後、Snowpark ML SFFileSystem インスタンスを作成し、それを介して内部ステージのデータにアクセスすることができます。

Python接続用Snowflakeコネクタの場合は、 sf_connection 引数として渡します。

import fsspec
from snowflake.ml.fileset import sfcfs

sf_fs1 = sfcfs.SFFileSystem(sf_connection=sf_connection)
Copy

Snowpark Pythonセッションの場合は、 snowpark_session 引数として渡します。

import fsspec
from snowflake.ml.fileset import sfcfs

sf_fs2 = sfcfs.SFFileSystem(snowpark_session=sp_session)
Copy

SFFileSystem は、ファイルのローカルキャッシュなど、多くの機能を fsspec.FileSystem から継承します。 fsspec.filesystem ファクトリ関数を介してSnowflakeファイルシステムをインスタンス化し、 target_protocol="sfc" を渡してSnowflake FileSystem 実装を使用することで、この機能やその他の機能を有効にすることができます。

local_cache_path = "/tmp/sf_files/"
cached_fs = fsspec.filesystem("cached", target_protocol="sfc",
                    target_options={"sf_connection": sf_connection,
                                    "cache_types": "bytes",
                                    "block_size": 32 * 2**20},
                    cache_storage=local_cache_path)
Copy

Snowflakeファイルシステムは、 findinfoisdirisfile、および exists を含む、fsspec FileSystem に対して定義されたほとんどの読み取り専用メソッドをサポートしています。

ファイルの指定

ステージのファイルを指定するには、パスを @database.schema.stage/file_path の形式で使用します。

ファイルのリスト

ファイルシステムの ls メソッドを使用して、ステージ内のファイルのリストを取得します。

print(*cached_fs.ls("@ML_DATASETS.public.my_models/sales_predict/"), end='\n')
Copy

ファイルの展開と読み取り

ステージにあるファイルを開くには、ファイルシステムの open メソッドを使用します。その後、通常のPythonファイルと同じ方法でファイルを読み取ることができます。ファイルオブジェクトは、Pythonの with ステートメントで使用できるコンテキストマネージャーでもあるため、不要になると自動的に閉じられます。

path = '@ML_DATASETS.public.my_models/test/data_7_7_3.snappy.parquet'

with sf_fs1.open(path, mode='rb') as f:
    print(f.read(16))
Copy

fsspecファイルシステムを受け付ける他のコンポーネントで SFFileSystem インスタンスを使用することもできます。ここでは、前のコードブロックで説明したParquetデータファイルが PyArrow の read_table メソッドに渡されます。

import pyarrow.parquet as pq

table = pq.read_table(path, filesystem=sf_fs1)
table.take([1, 3])
Copy

ファイル(またはファイルに似たオブジェクト)を受け付けるPythonコンポーネントには、Snowflakeファイルシステムから開かれたファイルオブジェクトを渡すことができます。たとえば、ステージにgzipで圧縮されたファイルがある場合は、それを gzip.GzipFilefileobj パラメーターとして渡すと、Pythonの gzip モジュールで使用することができます。

path = "sfc://@ML_DATASETS.public.my_models/dataset.csv.gz"

with cached_fs.open(path, mode='rb', sf_connection=sf_connection) as f:
    g = gzip.GzipFile(fileobj=f)
    for i in range(3):
        print(g.readline())
Copy

FileSet の作成および使用

Snowflake FileSet は、 SQL クエリの結果の不変スナップショットを、サーバー側で暗号化された内部ステージのファイル形式で表します。これらのデータファイルには、 PyTorch や TensorFlow などのツールにデータを供給するために FileSystem を介してアクセスすることができ、既存のデータガバナンスモデルの範囲内で、大規模にモデルをトレーニングすることができます。FileSet を作成するには、 FileSet.make メソッドを使用します。

FileSet を作成するには、Snowflake Python接続またはSnowparkセッションが必要です。手順については、 Snowflakeへの接続 をご参照ください。また、 FileSet が格納される、既存の内部サーバー側の暗号化ステージ、またはそのようなステージ下のサブディレクトリへのパスを提供する必要があります。

Snowpark DataFrame から FileSet を作成するには、 DataFrame を構築 し、 snowpark_dataframe として FileSet.make に渡します。 DataFrame の collect メソッドは呼び出さないでください。

# Snowpark Python equivalent of "SELECT * FROM MYDATA LIMIT 5000000"
df = snowpark_session.table('mydata').limit(5000000)
fileset_df = fileset.FileSet.make(
    target_stage_loc="@ML_DATASETS.public.my_models/",
    name="from_dataframe",
    snowpark_dataframe=df,
    shuffle=True,
)
Copy

Python接続用Snowflakeコネクタを使用して FileSet を作成するには、接続を Fileset.makesf_connection として渡し、 SQL クエリを query として渡します。

fileset_sf = fileset.FileSet.make(
    target_stage_loc="@ML_DATASETS.public.my_models/",
    name="from_connector",
    sf_connection=sf_connection,
    query="SELECT * FROM MYDATA LIMIT 5000000",
    shuffle=True,           # see later section about shuffling
)
Copy

注釈

shuffle パラメーターを使用したデータのシャッフルについては、 FileSets でのデータのシャッフル をご参照ください。

FileSet にあるファイルのリストを取得するには、 files メソッドを使用します。

print(*fileset_df.files())
Copy

FileSet のデータを PyTorch または TensorFlow にフィードする方法については、 Snowpark ML フレームワークコネクタ をご参照ください。