Snowpark ML FileSystem and FileSet — Deprecated¶
The Snowpark ML library includes FileSystem, an abstraction that is similar to a file system for an internal, server-side encrypted Snowflake stage. Specifically, it is an fsspec AbstractFileSystem implementation. The library also includes FileSet, a related class that allows you to move machine learning data from a Snowflake table to the stage, and from there to feed the data to PyTorch or TensorFlow (see Snowpark ML Data Connector).
Tip
Most users should use the newer Dataset API for creating immutable, governed data snapshots in Snowflake and using them in end-to-end machine learning workflows.
Installation¶
The FileSystem and FileSet APIs are part of the Snowpark ML Python package, snowflake-ml-python
. See
Using Snowflake ML Locally for installation instructions.
Creating and Using a File System¶
Creating a Snowpark ML file system requires either a
Snowflake Connector for Python
Connection
object or a Snowpark Python
Session
. See Connecting to Snowflake for instructions.
After you have either a connection or a session, you can create a Snowpark ML SFFileSystem
instance through
which you can access data in your internal stage.
If you have a Snowflake Connector for Python connection, pass it as the sf_connection
argument:
import fsspec
from snowflake.ml.fileset import sfcfs
sf_fs1 = sfcfs.SFFileSystem(sf_connection=sf_connection)
If you have a Snowpark Python session, pass it as the snowpark_session
argument:
import fsspec
from snowflake.ml.fileset import sfcfs
sf_fs2 = sfcfs.SFFileSystem(snowpark_session=sp_session)
SFFileSystem
inherits many features from fsspec.FileSystem
, such as local caching of files. You can
enable this and other features by instantiating a Snowflake file system through the fsspec.filesystem
factory
function, passing target_protocol="sfc"
to use the Snowflake FileSystem implementation:
local_cache_path = "/tmp/sf_files/"
cached_fs = fsspec.filesystem("cached", target_protocol="sfc",
target_options={"sf_connection": sf_connection,
"cache_types": "bytes",
"block_size": 32 * 2**20},
cache_storage=local_cache_path)
The Snowflake file system supports most read-only methods defined for a fsspec FileSystem
, including find
, info
,
isdir
, isfile
, and exists
.
Specifying Files¶
To specify files in a stage, use a path in the form @database.schema.stage/file_path
.
Listing Files¶
The file system’s ls
method is used to get a list of the files in the stage:
print(*cached_fs.ls("@ML_DATASETS.public.my_models/sales_predict/"), end='\n')
Opening and Reading Files¶
You can open files in the stage by using the file system’s open
method. You can then read the files by using the same
methods you use with ordinary Python files. The file object is also a context manager that can be used with Python’s
with
statement, so it is automatically closed when it’s no longer needed.
path = '@ML_DATASETS.public.my_models/test/data_7_7_3.snappy.parquet'
with sf_fs1.open(path, mode='rb') as f:
print(f.read(16))
You can also use the SFFileSystem
instance with other components that accept fsspec file systems. Here, the Parquet data file mentioned in the previous code block is passed to PyArrow’s read_table
method:
import pyarrow.parquet as pq
table = pq.read_table(path, filesystem=sf_fs1)
table.take([1, 3])
Python components that accept files (or file-like objects) can be passed a file object opened from the Snowflake file
system. For example, if you have a gzip-compressed file in your stage, you can use it with Python’s gzip
module
by passing it to gzip.GzipFile
as the fileobj
parameter:
path = "sfc://@ML_DATASETS.public.my_models/dataset.csv.gz"
with cached_fs.open(path, mode='rb', sf_connection=sf_connection) as f:
g = gzip.GzipFile(fileobj=f)
for i in range(3):
print(g.readline())
Creating and Using a FileSet¶
A Snowflake FileSet represents an immutable snapshot of the result of a SQL query in the form of files in an internal
server-side encrypted stage. These files can be accessed through a FileSystem to feed data to tools such as PyTorch and
TensorFlow so that you can train models at scale and within your existing data governance model. To create a FileSet,
use the FileSet.make
method.
You need a Snowflake Python connection or a Snowpark session to create a FileSet. See Connecting to Snowflake for instructions. You must also provide the path to an existing internal server-side encrypted stage, or a subdirectory under such a stage, where the FileSet will be stored.
To create a FileSet from a Snowpark DataFrame, construct a DataFrame
and pass it to FileSet.make
as snowpark_dataframe
; do not call the DataFrame’s collect
method:
# Snowpark Python equivalent of "SELECT * FROM MYDATA LIMIT 5000000"
df = snowpark_session.table('mydata').limit(5000000)
fileset_df = fileset.FileSet.make(
target_stage_loc="@ML_DATASETS.public.my_models/",
name="from_dataframe",
snowpark_dataframe=df,
shuffle=True,
)
To create a FileSet using a Snowflake Connector for Python connection, pass the connection to Fileset.make
as
sf_connection
, and pass the SQL query as query
:
fileset_sf = fileset.FileSet.make(
target_stage_loc="@ML_DATASETS.public.my_models/",
name="from_connector",
sf_connection=sf_connection,
query="SELECT * FROM MYDATA LIMIT 5000000",
shuffle=True, # see later section about shuffling
)
Note
See Shuffling Data in FileSets for information about shuffling your data by using the shuffle
parameter.
Use the files
method to get a list of the files in the FileSet:
print(*fileset_df.files())
For information about feeding the data in the FileSet to PyTorch or TensorFlow, see Feeding a FileSet to PyTorch or Feeding a FileSet to TensorFlow, respectively.
Feeding a FileSet to PyTorch¶
From a Snowflake FileSet, you can get a PyTorch DataPipe, which can be passed to a PyTorch DataLoader. The DataLoader
iterates over the FileSet data and yields batched PyTorch tensors. Create the DataPipe using the FileSet’s
to_torch_datapipe
method, and then pass the DataPipe to PyTorch’s DataLoader
:
from torch.utils.data import DataLoader
# See later sections about shuffling and batching
pipe = fileset_df.to_torch_datapipe(
batch_size=4,
shuffle=True,
drop_last_batch=True)
for batch in DataLoader(pipe, batch_size=None, num_workers=0):
print(batch)
break
Feeding a FileSet to TensorFlow¶
You can get a TensorFlow Dataset from a Snowflake FileSet using the FileSet’s to_tf_dataset
method:
import tensorflow as tf
# See following sections about shuffling and batching
ds = fileset_df.to_tf_dataset(
batch_size=4,
shuffle=True,
drop_last_batch=True)
for batch in ds:
print(batch)
break
Iterating over the Dataset yields batched tensors.
Shuffling Data in FileSets¶
It is often valuable to shuffle the training data to avoid overfitting and other issues. For a discussion of the value of shuffling, see Why should the data be shuffled for machine learning tasks?
If your query does not already shuffle your data sufficiently, a FileSet can shuffle data at two points:
When the FileSet is created by using
FileSet.make
.All rows in your query are shuffled before they are written to the FileSet. This is a high-quality global shuffle and can be expensive with large datasets. Therefore, it is performed only once, when materializing the FileSet. Pass
shuffle=True
as a keyword argument toFileSet.make
.When you create a PyTorch DataPipe or a TensorFlow Dataset from a FileSet.
At this point, the order of the files in the FileSet is randomized, as is the order of the rows in each file. This can be considered an “approximate” global shuffle. It is of lower quality than a true global shuffle, but it is much less expensive. To shuffle at this stage, pass
shuffle=True
as a keyword argument to the FileSet’sto_torch_datapipe
orto_tf_dataset
method.
For best results, shuffle twice: when creating the FileSet and when feeding the data to PyTorch or TensorFlow.
Batching Data in FileSets¶
FileSets have a batching feature that works the same as the batching functionality in PyTorch and TensorFlow but is
more efficient. Snowflake recommends that you use the batch_size
parameter in the FileSet’s to_torch_datapipe
and to_tf_dataset
methods instead of having PyTorch or TensorFlow do the batching. With PyTorch, to disable its batching functionality, you must
explicitly pass batch_size=None
when instantiating DataLoader
.
You can also drop the last batch if it is incomplete by passing drop_last_batch=True
to to_torch_datapipe
or to
to_tf_dataset
.