You are viewing documentation about an older version (1.2.0). View latest version

snowflake.ml.fileset.fileset.FileSet

class snowflake.ml.fileset.fileset.FileSet(*, target_stage_loc: str, name: str, sf_connection: Optional[SnowflakeConnection] = None, snowpark_session: Optional[Session] = None)

Bases: object

A FileSet represents an immutable snapshot of the result of a query in the form of files.

Create a FileSet based on an existing stage directory.

It can be used to restore an existing FileSet that was not deleted before.

Args:

sf_connection: A Snowflake python connection object. Mutually exclusive to snowpark_session. snowpark_session: A Snowpark Session object. Mutually exclusive to sf_connection. target_stage_loc: A string of the Snowflake stage path where the FileSet will be stored.

It needs to be an absolute path with the form of “@{database}.{schema}.{stage}/{optional directory}/”.

name: The name of the FileSet. It is the name of the directory which holds result stage files.

Raises:

SnowflakeMLException: An error occurred when not exactly one of sf_connection and snowpark_session is given.

Example: >>> # Create a new FileSet using Snowflake Python connection >>> conn = snowflake.connector.connect(**connection_parameters) >>> my_fileset = snowflake.ml.fileset.FileSet.make( >>> target_stage_loc=”@mydb.mychema.mystage/mydir” >>> name=”helloworld”, >>> sf_connection=conn, >>> query=”SELECT * FROM Mytable limit 1000000”, >>> ) >>> my_fileset.files() —- [‘sfc://@mydb.myschema.mystage//mydir/helloworld/data_0_0_0.snappy.parquet’]

>>> # Now we can restore the FileSet in another program as long as the FileSet is not deleted
>>> conn = snowflake.connector.connect(**connection_parameters)
>>> my_fileset_pointer = FileSet(sf_connection=conn,
                                 target_stage_loc="@mydb.mychema.mystage/mydir",
                                 name="helloworld")
>>> my_fileset.files()
----
['sfc://@mydb.myschema.mystage/mydir/helloworld/data_0_0_0.snappy.parquet']

Methods

delete() None

Delete the FileSet directory and all the stage files in it.

If not called, the FileSet and all its stage files will stay in Snowflake stage.

Raises:

SnowflakeMLException: An error occurred when the FileSet cannot get deleted.

This function or method is in private preview since 0.2.0.

files() List[str]

Get the list of stage file paths in the current FileSet.

The stage file paths follows the sfc protocol.

Returns:

A list of stage file paths

Example: >>> my_fileset = FileSet(sf_connection=conn, target_stage_loc=”@mydb.mychema.mystage”, name=”test”) >>> my_fileset.files() —- [“sfc://@mydb.myschema.mystage/test/hello_world_0_0_0.snappy.parquet”,

“sfc://@mydb.myschema.mystage/test/hello_world_0_0_1.snappy.parquet”]

This function or method is in private preview since 0.2.0.

fileset_stage_location() str

Get the stage path to the current FileSet in sfc protocol.

Returns:

A string representing the stage path

Example: >>> my_fileset = FileSet(sf_connection=conn, target_stage_loc=”@mydb.mychema.mystage”, name=”test”) >>> my_fileset.files() —- “sfc://@mydb.myschema.mystage/test/

This function or method is in private preview since 0.2.0.

classmethod make(*, target_stage_loc: str, name: str, snowpark_dataframe: Optional[DataFrame] = None, sf_connection: Optional[SnowflakeConnection] = None, query: str = '', shuffle: bool = False) FileSet

Creates a FileSet object given a SQL query.

The result FileSet object captures the query result deterministically as stage files.

Args:
target_stage_loc: A string of the Snowflake stage path where the FileSet will be stored.

It needs to be an absolute path with the form of “@{database}.{schema}.{stage}/{optional directory}/”.

name: The name of the FileSet. It will become the name of the directory which holds result stage files.

If there is already a FileSet with the same name in the given stage location, an exception will be raised.

snowpark_dataframe: A Snowpark Dataframe. Mutually exclusive to (sf_connection, query). sf_connection: A Snowflake python connection object. Must be provided if query is provided. query: A string of Snowflake SQL query to be executed. Mutually exclusive to snowpark_dataframe. Must

also specify sf_connection.

shuffle: A boolean represents whether the data should be shuffled globally. Default to be false.

Returns:

A FileSet object.

Raises:

ValueError: An error occurred when not exactly one of sf_connection and snowpark_session is given. FileSetExistError: An error occurred whern a FileSet with the same name exists in the given path. FileSetError: An error occurred when the SQL query/dataframe is not able to get materialized.

# noqa: DAR401

Note: During the generation of stage files, data casting will occur. The casting rules are as follows::
  • Data casting:
    • DecimalType(NUMBER):
      • If its scale is zero, cast to BIGINT

      • If its scale is non-zero, cast to FLOAT

    • DoubleType(DOUBLE): Cast to FLOAT.

    • ByteType(TINYINT): Cast to SMALLINT.

    • ShortType(SMALLINT):Cast to SMALLINT.

    • IntegerType(INT): Cast to INT.

    • LongType(BIGINT): Cast to BIGINT.

  • No action:
    • FloatType(FLOAT): No action.

    • StringType(String): No action.

    • BinaryType(BINARY): No action.

    • BooleanType(BOOLEAN): No action.

  • Not supported:
    • ArrayType(ARRAY): Not supported. A warning will be logged.

    • MapType(OBJECT): Not supported. A warning will be logged.

    • TimestampType(TIMESTAMP): Not supported. A warning will be logged.

    • TimeType(TIME): Not supported. A warning will be logged.

    • DateType(DATE): Not supported. A warning will be logged.

    • VariantType(VARIANT): Not supported. A warning will be logged.

>>> conn = snowflake.connector.connect(**connection_parameters)
>>> my_fileset = snowflake.ml.fileset.FileSet.make(
>>>     target_stage_loc="@mydb.mychema.mystage/mydir"
>>>     name="helloworld",
>>>     sf_connection=conn,
>>>     query="SELECT * FROM mytable limit 1000000",
>>> )
>>> my_fileset.files()
----
['sfc://@mydb.myschema.mystage/helloworld/data_0_0_0.snappy.parquet']
>>> new_session = snowflake.snowpark.Session.builder.configs(connection_parameters).create()
>>> df = new_session.sql("SELECT * FROM Mytable limit 1000000")
>>> my_fileset = snowflake.ml.fileset.FileSet.make(
>>>     target_stage_loc="@mydb.mychema.mystage/mydir"
>>>     name="helloworld",
>>>     snowpark_dataframe=df,
>>> )
>>> my_fileset.files()
----
['sfc://@mydb.myschema.mystage/helloworld/data_0_0_0.snappy.parquet']
to_snowpark_dataframe() DataFrame

Convert the fileset to a snowpark dataframe.

Only parquet files that owned by the FileSet will be read and converted. The parquet files that materialized by FileSet have the name pattern “data_<query_id>_<some_sharding_order>.snappy.parquet”.

Returns:

A Snowpark dataframe that contains the data of this FileSet.

Note: The dataframe generated by this method might not have the same schema as the original one. Specifically,
  • NUMBER type with scale != 0 will become float.

  • Unsupported types (see comments of make()) will not have any guarantee.

    For example, an OBJECT column may be scanned back as a STRING column.

This function or method is in private preview since 0.2.0.

to_tf_dataset(*, batch_size: int, shuffle: bool = False, drop_last_batch: bool = True) Any

Transform the Snowflake data into a ready-to-use TensorFlow tf.data.Dataset.

Args:
batch_size: It specifies the size of each data batch which will be

yield in the result datapipe

shuffle: It specifies whether the data will be shuffled. If True, files will be shuffled, and

rows in each file will also be shuffled.

drop_last_batch: Whether the last batch of data should be dropped. If set to be true,

then the last batch will get dropped if its size is smaller than the given batch_size.

Returns:

A tf.data.Dataset that yields batched tf.Tensors.

Examples: >>> conn = snowflake.connector.connect(**connection_parameters) >>> fileset = FileSet.make( >>> sf_connection=conn, name=”helloworld”, target_stage_loc=”@mydb.myschema.mystage” >>> query=”SELECT * FROM Mytable” >>> ) >>> dp = fileset.to_tf_dataset(batch_size=1) >>> for data in dp: >>> print(data) —- {‘_COL_1’: <tf.Tensor: shape=(1,), dtype=int64, numpy=[10]>}

This function or method is in private preview since 0.2.0.

to_torch_datapipe(*, batch_size: int, shuffle: bool = False, drop_last_batch: bool = True) Any

Transform the Snowflake data into a ready-to-use Pytorch datapipe.

Return a Pytorch datapipe which iterates on rows of data.

Args:
batch_size: It specifies the size of each data batch which will be

yield in the result datapipe

shuffle: It specifies whether the data will be shuffled. If True, files will be shuffled, and

rows in each file will also be shuffled.

drop_last_batch: Whether the last batch of data should be dropped. If set to be true,

then the last batch will get dropped if its size is smaller than the given batch_size.

Returns:

A Pytorch iterable datapipe that yield data.

Examples: >>> conn = snowflake.connector.connect(**connection_parameters) >>> fileset = FileSet.make( >>> sf_connection=conn, name=”helloworld”, target_stage_loc=”@mydb.myschema.mystage” >>> query=”SELECT * FROM Mytable” >>> ) >>> dp = fileset.to_torch_datapipe(batch_size=1) >>> for data in dp: >>> print(data) —- {‘_COL_1’:[10]}

This function or method is in private preview since 0.2.0.

Attributes

name

Get the name of the FileSet.