snowflake.ml.fileset.fileset.FileSet¶
- class snowflake.ml.fileset.fileset.FileSet(*, target_stage_loc: str, name: str, sf_connection: Optional[SnowflakeConnection] = None, snowpark_session: Optional[Session] = None)¶
Bases:
object
A FileSet represents an immutable snapshot of the result of a query in the form of files.
Create a FileSet based on an existing stage directory.
It can be used to restore an existing FileSet that was not deleted before.
- Args:
sf_connection: A Snowflake python connection object. Mutually exclusive to snowpark_session. snowpark_session: A Snowpark Session object. Mutually exclusive to sf_connection. target_stage_loc: A string of the Snowflake stage path where the FileSet will be stored.
It needs to be an absolute path with the form of “@{database}.{schema}.{stage}/{optional directory}/”.
name: The name of the FileSet. It is the name of the directory which holds result stage files.
- Raises:
SnowflakeMLException: An error occurred when not exactly one of sf_connection and snowpark_session is given.
Example: >>> # Create a new FileSet using Snowflake Python connection >>> conn = snowflake.connector.connect(**connection_parameters) >>> my_fileset = snowflake.ml.fileset.FileSet.make( >>> target_stage_loc=”@mydb.mychema.mystage/mydir” >>> name=”helloworld”, >>> sf_connection=conn, >>> query=”SELECT * FROM Mytable limit 1000000”, >>> ) >>> my_fileset.files() —- [‘sfc://@mydb.myschema.mystage//mydir/helloworld/data_0_0_0.snappy.parquet’]
>>> # Now we can restore the FileSet in another program as long as the FileSet is not deleted >>> conn = snowflake.connector.connect(**connection_parameters) >>> my_fileset_pointer = FileSet(sf_connection=conn, target_stage_loc="@mydb.mychema.mystage/mydir", name="helloworld") >>> my_fileset.files() ---- ['sfc://@mydb.myschema.mystage/mydir/helloworld/data_0_0_0.snappy.parquet']
Methods
- delete() None ¶
Delete the FileSet directory and all the stage files in it.
If not called, the FileSet and all its stage files will stay in Snowflake stage.
- Raises:
SnowflakeMLException: An error occurred when the FileSet cannot get deleted.
This function or method is in private preview since 0.2.0.
- files() List[str] ¶
Get the list of stage file paths in the current FileSet.
The stage file paths follows the sfc protocol.
- Returns:
A list of stage file paths
Example: >>> my_fileset = FileSet(sf_connection=conn, target_stage_loc=”@mydb.mychema.mystage”, name=”test”) >>> my_fileset.files() —- [“sfc://@mydb.myschema.mystage/test/hello_world_0_0_0.snappy.parquet”,
“sfc://@mydb.myschema.mystage/test/hello_world_0_0_1.snappy.parquet”]
This function or method is in private preview since 0.2.0.
- fileset_stage_location() str ¶
Get the stage path to the current FileSet in sfc protocol.
- Returns:
A string representing the stage path
Example: >>> my_fileset = FileSet(sf_connection=conn, target_stage_loc=”@mydb.mychema.mystage”, name=”test”) >>> my_fileset.files() —- “sfc://@mydb.myschema.mystage/test/
This function or method is in private preview since 0.2.0.
- classmethod make(*, target_stage_loc: str, name: str, snowpark_dataframe: Optional[DataFrame] = None, sf_connection: Optional[SnowflakeConnection] = None, query: str = '', shuffle: bool = False) FileSet ¶
Creates a FileSet object given a SQL query.
The result FileSet object captures the query result deterministically as stage files.
- Args:
- target_stage_loc: A string of the Snowflake stage path where the FileSet will be stored.
It needs to be an absolute path with the form of “@{database}.{schema}.{stage}/{optional directory}/”.
- name: The name of the FileSet. It will become the name of the directory which holds result stage files.
If there is already a FileSet with the same name in the given stage location, an exception will be raised.
snowpark_dataframe: A Snowpark Dataframe. Mutually exclusive to (sf_connection, query). sf_connection: A Snowflake python connection object. Must be provided if query is provided. query: A string of Snowflake SQL query to be executed. Mutually exclusive to snowpark_dataframe. Must
also specify sf_connection.
shuffle: A boolean represents whether the data should be shuffled globally. Default to be false.
- Returns:
A FileSet object.
- Raises:
ValueError: An error occurred when not exactly one of sf_connection and snowpark_session is given. FileSetExistError: An error occurred whern a FileSet with the same name exists in the given path. FileSetError: An error occurred when the SQL query/dataframe is not able to get materialized.
# noqa: DAR401
- Note: During the generation of stage files, data casting will occur. The casting rules are as follows::
- Data casting:
- DecimalType(NUMBER):
If its scale is zero, cast to BIGINT
If its scale is non-zero, cast to FLOAT
DoubleType(DOUBLE): Cast to FLOAT.
ByteType(TINYINT): Cast to SMALLINT.
ShortType(SMALLINT):Cast to SMALLINT.
IntegerType(INT): Cast to INT.
LongType(BIGINT): Cast to BIGINT.
- No action:
FloatType(FLOAT): No action.
StringType(String): No action.
BinaryType(BINARY): No action.
BooleanType(BOOLEAN): No action.
- Not supported:
ArrayType(ARRAY): Not supported. A warning will be logged.
MapType(OBJECT): Not supported. A warning will be logged.
TimestampType(TIMESTAMP): Not supported. A warning will be logged.
TimeType(TIME): Not supported. A warning will be logged.
DateType(DATE): Not supported. A warning will be logged.
VariantType(VARIANT): Not supported. A warning will be logged.
>>> conn = snowflake.connector.connect(**connection_parameters) >>> my_fileset = snowflake.ml.fileset.FileSet.make( >>> target_stage_loc="@mydb.mychema.mystage/mydir" >>> name="helloworld", >>> sf_connection=conn, >>> query="SELECT * FROM mytable limit 1000000", >>> ) >>> my_fileset.files() ---- ['sfc://@mydb.myschema.mystage/helloworld/data_0_0_0.snappy.parquet']
>>> new_session = snowflake.snowpark.Session.builder.configs(connection_parameters).create() >>> df = new_session.sql("SELECT * FROM Mytable limit 1000000") >>> my_fileset = snowflake.ml.fileset.FileSet.make( >>> target_stage_loc="@mydb.mychema.mystage/mydir" >>> name="helloworld", >>> snowpark_dataframe=df, >>> ) >>> my_fileset.files() ---- ['sfc://@mydb.myschema.mystage/helloworld/data_0_0_0.snappy.parquet']
- to_snowpark_dataframe() DataFrame ¶
Convert the fileset to a snowpark dataframe.
Only parquet files that owned by the FileSet will be read and converted. The parquet files that materialized by FileSet have the name pattern “data_<query_id>_<some_sharding_order>.snappy.parquet”.
- Returns:
A Snowpark dataframe that contains the data of this FileSet.
- Note: The dataframe generated by this method might not have the same schema as the original one. Specifically,
NUMBER type with scale != 0 will become float.
- Unsupported types (see comments of
make()
) will not have any guarantee. For example, an OBJECT column may be scanned back as a STRING column.
- Unsupported types (see comments of
This function or method is in private preview since 0.2.0.
- to_tf_dataset(*, batch_size: int, shuffle: bool = False, drop_last_batch: bool = True) Any ¶
Transform the Snowflake data into a ready-to-use TensorFlow tf.data.Dataset.
- Args:
- batch_size: It specifies the size of each data batch which will be
yield in the result datapipe
- shuffle: It specifies whether the data will be shuffled. If True, files will be shuffled, and
rows in each file will also be shuffled.
- drop_last_batch: Whether the last batch of data should be dropped. If set to be true,
then the last batch will get dropped if its size is smaller than the given batch_size.
- Returns:
A tf.data.Dataset that yields batched tf.Tensors.
Examples: >>> conn = snowflake.connector.connect(**connection_parameters) >>> fileset = FileSet.make( >>> sf_connection=conn, name=”helloworld”, target_stage_loc=”@mydb.myschema.mystage” >>> query=”SELECT * FROM Mytable” >>> ) >>> dp = fileset.to_tf_dataset(batch_size=1) >>> for data in dp: >>> print(data) —- {‘_COL_1’: <tf.Tensor: shape=(1,), dtype=int64, numpy=[10]>}
This function or method is in private preview since 0.2.0.
- to_torch_datapipe(*, batch_size: int, shuffle: bool = False, drop_last_batch: bool = True) Any ¶
Transform the Snowflake data into a ready-to-use Pytorch datapipe.
Return a Pytorch datapipe which iterates on rows of data.
- Args:
- batch_size: It specifies the size of each data batch which will be
yield in the result datapipe
- shuffle: It specifies whether the data will be shuffled. If True, files will be shuffled, and
rows in each file will also be shuffled.
- drop_last_batch: Whether the last batch of data should be dropped. If set to be true,
then the last batch will get dropped if its size is smaller than the given batch_size.
- Returns:
A Pytorch iterable datapipe that yield data.
Examples: >>> conn = snowflake.connector.connect(**connection_parameters) >>> fileset = FileSet.make( >>> sf_connection=conn, name=”helloworld”, target_stage_loc=”@mydb.myschema.mystage” >>> query=”SELECT * FROM Mytable” >>> ) >>> dp = fileset.to_torch_datapipe(batch_size=1) >>> for data in dp: >>> print(data) —- {‘_COL_1’:[10]}
This function or method is in private preview since 0.2.0.
Attributes
- name¶
Get the name of the FileSet.