snowflake.ml.fileset.fileset.FileSet¶
- class snowflake.ml.fileset.fileset.FileSet(*, target_stage_loc: str, name: str, sf_connection: Optional[SnowflakeConnection] = None, snowpark_session: Optional[Session] = None)¶
Bases:
object
A FileSet represents an immutable snapshot of the result of a query in the form of files.
Create a FileSet based on an existing stage directory.
It can be used to restore an existing FileSet that was not deleted before.
- Parameters:
sf_connection – A Snowflake python connection object. Mutually exclusive to snowpark_session.
snowpark_session – A Snowpark Session object. Mutually exclusive to sf_connection.
target_stage_loc – A string of the Snowflake stage path where the FileSet will be stored. It needs to be an absolute path with the form of “@{database}.{schema}.{stage}/{optional directory}/”.
name – The name of the FileSet. It is the name of the directory which holds result stage files.
- Raises:
SnowflakeMLException – An error occurred when not exactly one of sf_connection and snowpark_session is given.
Example: >>> # Create a new FileSet using Snowflake Python connection >>> conn = snowflake.connector.connect(**connection_parameters) >>> my_fileset = snowflake.ml.fileset.FileSet.make( >>> target_stage_loc=”@mydb.mychema.mystage/mydir” >>> name=”helloworld”, >>> sf_connection=conn, >>> query=”SELECT * FROM Mytable limit 1000000”, >>> ) >>> my_fileset.files() —- [‘sfc://@mydb.myschema.mystage//mydir/helloworld/data_0_0_0.snappy.parquet’]
>>> # Now we can restore the FileSet in another program as long as the FileSet is not deleted >>> conn = snowflake.connector.connect(**connection_parameters) >>> my_fileset_pointer = FileSet(sf_connection=conn, target_stage_loc="@mydb.mychema.mystage/mydir", name="helloworld") >>> my_fileset.files() ---- ['sfc://@mydb.myschema.mystage/mydir/helloworld/data_0_0_0.snappy.parquet']
Methods
- delete() None ¶
Delete the FileSet directory and all the stage files in it.
If not called, the FileSet and all its stage files will stay in Snowflake stage.
- Raises:
SnowflakeMLException – An error occurred when the FileSet cannot get deleted.
This function or method is in private preview since 0.2.0.
- files() List[str] ¶
Get the list of stage file paths in the current FileSet.
The stage file paths follows the sfc protocol.
- Returns:
A list of stage file paths
Example: >>> my_fileset = FileSet(sf_connection=conn, target_stage_loc=”@mydb.mychema.mystage”, name=”test”) >>> my_fileset.files() —- [“sfc://@mydb.myschema.mystage/test/hello_world_0_0_0.snappy.parquet”,
“sfc://@mydb.myschema.mystage/test/hello_world_0_0_1.snappy.parquet”]
This function or method is in private preview since 0.2.0.
- fileset_stage_location() str ¶
Get the stage path to the current FileSet in sfc protocol.
- Returns:
A string representing the stage path
Example: >>> my_fileset = FileSet(sf_connection=conn, target_stage_loc=”@mydb.mychema.mystage”, name=”test”) >>> my_fileset.files() —- “sfc://@mydb.myschema.mystage/test/
This function or method is in private preview since 0.2.0.
- classmethod make(*, target_stage_loc: str, name: str, snowpark_dataframe: Optional[DataFrame] = None, sf_connection: Optional[SnowflakeConnection] = None, query: str = '', shuffle: bool = False) FileSet ¶
Creates a FileSet object given a SQL query.
The result FileSet object captures the query result deterministically as stage files.
- Parameters:
target_stage_loc – A string of the Snowflake stage path where the FileSet will be stored. It needs to be an absolute path with the form of “@{database}.{schema}.{stage}/{optional directory}/”.
name – The name of the FileSet. It will become the name of the directory which holds result stage files. If there is already a FileSet with the same name in the given stage location, an exception will be raised.
snowpark_dataframe – A Snowpark Dataframe. Mutually exclusive to (sf_connection, query).
sf_connection – A Snowflake python connection object. Must be provided if query is provided.
query – A string of Snowflake SQL query to be executed. Mutually exclusive to snowpark_dataframe. Must also specify sf_connection.
shuffle – A boolean represents whether the data should be shuffled globally. Default to be false.
- Returns:
A FileSet object.
- Raises:
ValueError – An error occurred when not exactly one of sf_connection and snowpark_session is given.
FileSetExistError – An error occurred whern a FileSet with the same name exists in the given path.
FileSetError – An error occurred when the SQL query/dataframe is not able to get materialized.
- Note: During the generation of stage files, data casting will occur. The casting rules are as follows::
- Data casting:
- DecimalType(NUMBER):
If its scale is zero, cast to BIGINT
If its scale is non-zero, cast to FLOAT
DoubleType(DOUBLE): Cast to FLOAT.
ByteType(TINYINT): Cast to SMALLINT.
ShortType(SMALLINT):Cast to SMALLINT.
IntegerType(INT): Cast to INT.
LongType(BIGINT): Cast to BIGINT.
- No action:
FloatType(FLOAT): No action.
StringType(String): No action.
BinaryType(BINARY): No action.
BooleanType(BOOLEAN): No action.
- Not supported:
ArrayType(ARRAY): Not supported. A warning will be logged.
MapType(OBJECT): Not supported. A warning will be logged.
TimestampType(TIMESTAMP): Not supported. A warning will be logged.
TimeType(TIME): Not supported. A warning will be logged.
DateType(DATE): Not supported. A warning will be logged.
VariantType(VARIANT): Not supported. A warning will be logged.
>>> conn = snowflake.connector.connect(**connection_parameters) >>> my_fileset = snowflake.ml.fileset.FileSet.make( >>> target_stage_loc="@mydb.mychema.mystage/mydir" >>> name="helloworld", >>> sf_connection=conn, >>> query="SELECT * FROM mytable limit 1000000", >>> ) >>> my_fileset.files() ---- ['sfc://@mydb.myschema.mystage/helloworld/data_0_0_0.snappy.parquet']
>>> new_session = snowflake.snowpark.Session.builder.configs(connection_parameters).create() >>> df = new_session.sql("SELECT * FROM Mytable limit 1000000") >>> my_fileset = snowflake.ml.fileset.FileSet.make( >>> target_stage_loc="@mydb.mychema.mystage/mydir" >>> name="helloworld", >>> snowpark_dataframe=df, >>> ) >>> my_fileset.files() ---- ['sfc://@mydb.myschema.mystage/helloworld/data_0_0_0.snappy.parquet']
- to_snowpark_dataframe() DataFrame ¶
Convert the fileset to a snowpark dataframe.
Only parquet files that owned by the FileSet will be read and converted. The parquet files that materialized by FileSet have the name pattern “data_<query_id>_<some_sharding_order>.snappy.parquet”.
- Returns:
A Snowpark dataframe that contains the data of this FileSet.
- Note: The dataframe generated by this method might not have the same schema as the original one. Specifically,
NUMBER type with scale != 0 will become float.
- Unsupported types (see comments of
make()
) will not have any guarantee. For example, an OBJECT column may be scanned back as a STRING column.
- Unsupported types (see comments of
This function or method is in private preview since 0.2.0.
- to_tf_dataset(*, batch_size: int, shuffle: bool = False, drop_last_batch: bool = True) Any ¶
Transform the Snowflake data into a ready-to-use TensorFlow tf.data.Dataset.
- Parameters:
batch_size – It specifies the size of each data batch which will be yield in the result datapipe
shuffle – It specifies whether the data will be shuffled. If True, files will be shuffled, and rows in each file will also be shuffled.
drop_last_batch – Whether the last batch of data should be dropped. If set to be true, then the last batch will get dropped if its size is smaller than the given batch_size.
- Returns:
A tf.data.Dataset that yields batched tf.Tensors.
Examples: >>> conn = snowflake.connector.connect(**connection_parameters) >>> fileset = FileSet.make( >>> sf_connection=conn, name=”helloworld”, target_stage_loc=”@mydb.myschema.mystage” >>> query=”SELECT * FROM Mytable” >>> ) >>> dp = fileset.to_tf_dataset(batch_size=1) >>> for data in dp: >>> print(data) —- {‘_COL_1’: <tf.Tensor: shape=(1,), dtype=int64, numpy=[10]>}
This function or method is in private preview since 0.2.0.
- to_torch_datapipe(*, batch_size: int, shuffle: bool = False, drop_last_batch: bool = True) Any ¶
Transform the Snowflake data into a ready-to-use Pytorch datapipe.
Return a Pytorch datapipe which iterates on rows of data.
- Parameters:
batch_size – It specifies the size of each data batch which will be yield in the result datapipe
shuffle – It specifies whether the data will be shuffled. If True, files will be shuffled, and rows in each file will also be shuffled.
drop_last_batch – Whether the last batch of data should be dropped. If set to be true, then the last batch will get dropped if its size is smaller than the given batch_size.
- Returns:
A Pytorch iterable datapipe that yield data.
Examples: >>> conn = snowflake.connector.connect(**connection_parameters) >>> fileset = FileSet.make( >>> sf_connection=conn, name=”helloworld”, target_stage_loc=”@mydb.myschema.mystage” >>> query=”SELECT * FROM Mytable” >>> ) >>> dp = fileset.to_torch_datapipe(batch_size=1) >>> for data in dp: >>> print(data) —- {‘_COL_1’:[10]}
This function or method is in private preview since 0.2.0.
Attributes
- name¶
Get the name of the FileSet.