snowflake.ml.fileset.fileset.FileSet

class snowflake.ml.fileset.fileset.FileSet(*, target_stage_loc: str, name: str, sf_connection: Optional[SnowflakeConnection] = None, snowpark_session: Optional[Session] = None)

Bases: object

A FileSet represents an immutable snapshot of the result of a query in the form of files.

Create a FileSet based on an existing stage directory.

It can be used to restore an existing FileSet that was not deleted before.

Parameters:
  • sf_connection – A Snowflake python connection object. Mutually exclusive to snowpark_session.

  • snowpark_session – A Snowpark Session object. Mutually exclusive to sf_connection.

  • target_stage_loc – A string of the Snowflake stage path where the FileSet will be stored. It needs to be an absolute path with the form of “@{database}.{schema}.{stage}/{optional directory}/”.

  • name – The name of the FileSet. It is the name of the directory which holds result stage files.

Raises:

SnowflakeMLException – An error occurred when not exactly one of sf_connection and snowpark_session is given.

Example:

>>> # Create a new FileSet using Snowflake Python connection
>>> conn = snowflake.connector.connect(**connection_parameters)
>>> my_fileset = snowflake.ml.fileset.FileSet.make(
>>>     target_stage_loc="@mydb.mychema.mystage/mydir"
>>>     name="helloworld",
>>>     sf_connection=conn,
>>>     query="SELECT * FROM Mytable limit 1000000",
>>> )
>>> my_fileset.files()
['sfc://@mydb.myschema.mystage//mydir/helloworld/data_0_0_0.snappy.parquet']
Copy
>>> # Now we can restore the FileSet in another program as long as the FileSet is not deleted
>>> conn = snowflake.connector.connect(**connection_parameters)
>>> my_fileset_pointer = FileSet(sf_connection=conn,
                                target_stage_loc="@mydb.mychema.mystage/mydir",
                                name="helloworld")
>>> my_fileset.files()
['sfc://@mydb.myschema.mystage/mydir/helloworld/data_0_0_0.snappy.parquet']
Copy

Methods

delete() None

Delete the FileSet directory and all the stage files in it.

If not called, the FileSet and all its stage files will stay in Snowflake stage.

Raises:

SnowflakeMLException – An error occurred when the FileSet cannot get deleted.

This function or method is in private preview since 0.2.0.

files() List[str]

Get the list of stage file paths in the current FileSet.

The stage file paths follows the sfc protocol.

Returns:

A list of stage file paths

Example:

>>> my_fileset = FileSet(sf_connection=conn, target_stage_loc="@mydb.mychema.mystage", name="test")
>>> my_fileset.files()
["sfc://@mydb.myschema.mystage/test/hello_world_0_0_0.snappy.parquet",
 "sfc://@mydb.myschema.mystage/test/hello_world_0_0_1.snappy.parquet"]
Copy

This function or method is in private preview since 0.2.0.

fileset_stage_location() str

Get the stage path to the current FileSet in sfc protocol.

Returns:

A string representing the stage path

Example:

>>> my_fileset = FileSet(sf_connection=conn, target_stage_loc="@mydb.mychema.mystage", name="test")
>>> my_fileset.files()
"sfc://@mydb.myschema.mystage/test/
Copy

This function or method is in private preview since 0.2.0.

classmethod make(*, target_stage_loc: str, name: str, snowpark_dataframe: Optional[DataFrame] = None, sf_connection: Optional[SnowflakeConnection] = None, query: str = '', shuffle: bool = False) FileSet

Creates a FileSet object given a SQL query.

The result FileSet object captures the query result deterministically as stage files.

Parameters:
  • target_stage_loc – A string of the Snowflake stage path where the FileSet will be stored. It needs to be an absolute path with the form of “@{database}.{schema}.{stage}/{optional directory}/”.

  • name – The name of the FileSet. It will become the name of the directory which holds result stage files. If there is already a FileSet with the same name in the given stage location, an exception will be raised.

  • snowpark_dataframe – A Snowpark Dataframe. Mutually exclusive to (sf_connection, query).

  • sf_connection – A Snowflake python connection object. Must be provided if query is provided.

  • query – A string of Snowflake SQL query to be executed. Mutually exclusive to snowpark_dataframe. Must also specify sf_connection.

  • shuffle – A boolean represents whether the data should be shuffled globally. Default to be false.

Returns:

A FileSet object.

Raises:
  • ValueError – An error occurred when not exactly one of sf_connection and snowpark_session is given.

  • FileSetExistError – An error occurred whern a FileSet with the same name exists in the given path.

  • FileSetError – An error occurred when the SQL query/dataframe is not able to get materialized.

Note: During the generation of stage files, data casting will occur. The casting rules are as follows:

  • Data casting:

    • DecimalType(NUMBER):

      • If its scale is zero, cast to BIGINT

      • If its scale is non-zero, cast to FLOAT

    • DoubleType(DOUBLE): Cast to FLOAT.

    • ByteType(TINYINT): Cast to SMALLINT.

    • ShortType(SMALLINT):Cast to SMALLINT.

    • IntegerType(INT): Cast to INT.

    • LongType(BIGINT): Cast to BIGINT.

  • No action:

    • FloatType(FLOAT): No action.

    • StringType(String): No action.

    • BinaryType(BINARY): No action.

    • BooleanType(BOOLEAN): No action.

  • Not supported:

    • ArrayType(ARRAY): Not supported. A warning will be logged.

    • MapType(OBJECT): Not supported. A warning will be logged.

    • TimestampType(TIMESTAMP): Not supported. A warning will be logged.

    • TimeType(TIME): Not supported. A warning will be logged.

    • DateType(DATE): Not supported. A warning will be logged.

    • VariantType(VARIANT): Not supported. A warning will be logged.

Example 1: Create a FileSet with Snowflake Python connection

>>> conn = snowflake.connector.connect(**connection_parameters)
>>> my_fileset = snowflake.ml.fileset.FileSet.make(
>>>     target_stage_loc="@mydb.mychema.mystage/mydir"
>>>     name="helloworld",
>>>     sf_connection=conn,
>>>     query="SELECT * FROM mytable limit 1000000",
>>> )
>>> my_fileset.files()
['sfc://@mydb.myschema.mystage/helloworld/data_0_0_0.snappy.parquet']
Copy

Example 2: Create a FileSet with a Snowpark dataframe

>>> new_session = snowflake.snowpark.Session.builder.configs(connection_parameters).create()
>>> df = new_session.sql("SELECT * FROM Mytable limit 1000000")
>>> my_fileset = snowflake.ml.fileset.FileSet.make(
>>>     target_stage_loc="@mydb.mychema.mystage/mydir"
>>>     name="helloworld",
>>>     snowpark_dataframe=df,
>>> )
>>> my_fileset.files()
['sfc://@mydb.myschema.mystage/helloworld/data_0_0_0.snappy.parquet']
Copy
to_snowpark_dataframe() DataFrame

Convert the fileset to a snowpark dataframe.

Only parquet files owned by the FileSet will be read and converted. The parquet files that materialized by FileSet have the name pattern “data_<query_id>_<some_sharding_order>.snappy.parquet”.

Returns:

A Snowpark dataframe that contains the data of this FileSet.

Note: The dataframe generated by this method might not have the same schema as the original one. Specifically,

  • NUMBER type with scale != 0 will become float.

  • Unsupported types (see comments of make()) will not have any guarantee. For example, an OBJECT column may be scanned back as a STRING column.

This function or method is in private preview since 0.2.0.

to_tf_dataset(*, batch_size: int, shuffle: bool = False, drop_last_batch: bool = True) Any

Transform the Snowflake data into a ready-to-use TensorFlow tf.data.Dataset.

Parameters:
  • batch_size – It specifies the size of each data batch which will be yield in the result datapipe

  • shuffle – It specifies whether the data will be shuffled. If True, files will be shuffled, and rows in each file will also be shuffled.

  • drop_last_batch – Whether the last batch of data should be dropped. If set to be true, then the last batch will get dropped if its size is smaller than the given batch_size.

Returns:

A tf.data.Dataset that yields batched tf.Tensors.

Examples:

>>> conn = snowflake.connector.connect(**connection_parameters)
>>> fileset = FileSet.make(
>>>     sf_connection=conn, name="helloworld", target_stage_loc="@mydb.myschema.mystage"
>>>     query="SELECT * FROM Mytable"
>>> )
>>> dp = fileset.to_tf_dataset(batch_size=1)
>>> for data in dp:
>>>     print(data)
{'_COL_1': <tf.Tensor: shape=(1,), dtype=int64, numpy=[10]>}
Copy

This function or method is in private preview since 0.2.0.

to_torch_datapipe(*, batch_size: int, shuffle: bool = False, drop_last_batch: bool = True) Any

Transform the Snowflake data into a ready-to-use Pytorch datapipe.

Return a Pytorch datapipe which iterates on rows of data.

Parameters:
  • batch_size – It specifies the size of each data batch which will be yield in the result datapipe.

  • shuffle – It specifies whether the data will be shuffled. If True, files will be shuffled, and rows in each file will also be shuffled.

  • drop_last_batch – Whether the last batch of data should be dropped. If set to be true, then the last batch will get dropped if its size is smaller than the given batch_size.

Returns:

A Pytorch iterable datapipe that yields data.

Examples:

>>> conn = snowflake.connector.connect(**connection_parameters)
>>> fileset = FileSet.make(
>>>     sf_connection=conn, name="helloworld", target_stage_loc="@mydb.myschema.mystage"
>>>     query="SELECT * FROM Mytable"
>>> )
>>> dp = fileset.to_torch_datapipe(batch_size=1)
>>> for data in dp:
>>>     print(data)
{'_COL_1':[10]}
Copy

This function or method is in private preview since 0.2.0.

Attributes

name

Get the name of the FileSet.