snowflake.snowpark.DataFrameReader

class snowflake.snowpark.DataFrameReader(session: Session)[source]

Bases: object

Provides methods to load data in various supported formats from a Snowflake stage to a DataFrame. The paths provided to the DataFrameReader must refer to Snowflake stages.

To use this object:

1. Access an instance of a DataFrameReader by using the Session.read property.

2. Specify any format-specific options and copy options by calling the option() or options() method. These methods return a DataFrameReader that is configured with these options. (Note that although specifying copy options can make error handling more robust during the reading process, it may have an effect on performance.)

3. Specify the schema of the data that you plan to load by constructing a types.StructType object and passing it to the schema() method if the file format is CSV. Other file formats such as JSON, XML, Parquet, ORC, and AVRO don’t accept a schema. This method returns a DataFrameReader that is configured to read data that uses the specified schema. Currently, inferring schema is also supported for CSV and JSON formats as a preview feature open to all accounts.

4. Specify the format of the data by calling the method named after the format (e.g. csv(), json(), etc.). These methods return a DataFrame that is configured to load data in the specified format.

5. Call a DataFrame method that performs an action (e.g. DataFrame.collect()) to load the data from the file.

The following examples demonstrate how to use a DataFrameReader.
>>> # Create a temp stage to run the example code.
>>> _ = session.sql("CREATE or REPLACE temp STAGE mystage").collect()
Copy
Example 1:

Loading the first two columns of a CSV file and skipping the first header line:

>>> from snowflake.snowpark.types import StructType, StructField, IntegerType, StringType, FloatType
>>> _ = session.file.put("tests/resources/testCSV.csv", "@mystage", auto_compress=False)
>>> # Define the schema for the data in the CSV file.
>>> user_schema = StructType([StructField("a", IntegerType()), StructField("b", StringType()), StructField("c", FloatType())])
>>> # Create a DataFrame that is configured to load data from the CSV file.
>>> df = session.read.options({"field_delimiter": ",", "skip_header": 1}).schema(user_schema).csv("@mystage/testCSV.csv")
>>> # Load the data into the DataFrame and return an array of rows containing the results.
>>> df.collect()
[Row(A=2, B='two', C=2.2)]
Copy
Example 2:

Loading a gzip compressed json file:

>>> _ = session.file.put("tests/resources/testJson.json", "@mystage", auto_compress=True)
>>> # Create a DataFrame that is configured to load data from the gzipped JSON file.
>>> json_df = session.read.option("compression", "gzip").json("@mystage/testJson.json.gz")
>>> # Load the data into the DataFrame and return an array of rows containing the results.
>>> json_df.show()
-----------------------
|"$1"                 |
-----------------------
|{                    |
|  "color": "Red",    |
|  "fruit": "Apple",  |
|  "size": "Large"    |
|}                    |
-----------------------
Copy

In addition, if you want to load only a subset of files from the stage, you can use the pattern option to specify a regular expression that matches the files that you want to load.

Example 3:

Loading only the CSV files from a stage location:

>>> from snowflake.snowpark.types import StructType, StructField, IntegerType, StringType
>>> from snowflake.snowpark.functions import col
>>> _ = session.file.put("tests/resources/*.csv", "@mystage", auto_compress=False)
>>> # Define the schema for the data in the CSV files.
>>> user_schema = StructType([StructField("a", IntegerType()), StructField("b", StringType()), StructField("c", FloatType())])
>>> # Create a DataFrame that is configured to load data from the CSV files in the stage.
>>> csv_df = session.read.option("pattern", ".*V[.]csv").schema(user_schema).csv("@mystage").sort(col("a"))
>>> # Load the data into the DataFrame and return an array of rows containing the results.
>>> csv_df.collect()
[Row(A=1, B='one', C=1.2), Row(A=2, B='two', C=2.2), Row(A=3, B='three', C=3.3), Row(A=4, B='four', C=4.4)]
Copy

To load Parquet, ORC and AVRO files, no schema is accepted because the schema will be automatically inferred. Inferring the schema can be disabled by setting option “infer_schema” to False. Then you can use $1 to access the column data as an OBJECT.

Example 4:
Loading a Parquet file with inferring the schema.
>>> from snowflake.snowpark.functions import col
>>> _ = session.file.put("tests/resources/test.parquet", "@mystage", auto_compress=False)
>>> # Create a DataFrame that uses a DataFrameReader to load data from a file in a stage.
>>> df = session.read.parquet("@mystage/test.parquet").where(col('"num"') == 2)
>>> # Load the data into the DataFrame and return an array of rows containing the results.
>>> df.collect()
[Row(str='str2', num=2)]
Copy
Example 5:
Loading an ORC file and infer the schema:
>>> from snowflake.snowpark.functions import col
>>> _ = session.file.put("tests/resources/test.orc", "@mystage", auto_compress=False)
>>> # Create a DataFrame that uses a DataFrameReader to load data from a file in a stage.
>>> df = session.read.orc("@mystage/test.orc").where(col('"num"') == 2)
>>> # Load the data into the DataFrame and return an array of rows containing the results.
>>> df.collect()
[Row(str='str2', num=2)]
Copy
Example 6:
Loading an AVRO file and infer the schema:
>>> from snowflake.snowpark.functions import col
>>> _ = session.file.put("tests/resources/test.avro", "@mystage", auto_compress=False)
>>> # Create a DataFrame that uses a DataFrameReader to load data from a file in a stage.
>>> df = session.read.avro("@mystage/test.avro").where(col('"num"') == 2)
>>> # Load the data into the DataFrame and return an array of rows containing the results.
>>> df.collect()
[Row(str='str2', num=2)]
Copy
Example 7:
Loading a Parquet file without inferring the schema:
>>> from snowflake.snowpark.functions import col
>>> _ = session.file.put("tests/resources/test.parquet", "@mystage", auto_compress=False)
>>> # Create a DataFrame that uses a DataFrameReader to load data from a file in a stage.
>>> df = session.read.option("infer_schema", False).parquet("@mystage/test.parquet").where(col('$1')["num"] == 2)
>>> # Load the data into the DataFrame and return an array of rows containing the results.
>>> df.show()
-------------------
|"$1"             |
-------------------
|{                |
|  "num": 2,      |
|  "str": "str2"  |
|}                |
-------------------
Copy

Loading JSON and XML files doesn’t support schema either. You also need to use $1 to access the column data as an OBJECT.

Example 8:
Loading a JSON file:
>>> from snowflake.snowpark.functions import col, lit
>>> _ = session.file.put("tests/resources/testJson.json", "@mystage", auto_compress=False)
>>> # Create a DataFrame that uses a DataFrameReader to load data from a file in a stage.
>>> df = session.read.json("@mystage/testJson.json").where(col("$1")["fruit"] == lit("Apple"))
>>> # Load the data into the DataFrame and return an array of rows containing the results.
>>> df.show()
-----------------------
|"$1"                 |
-----------------------
|{                    |
|  "color": "Red",    |
|  "fruit": "Apple",  |
|  "size": "Large"    |
|}                    |
|{                    |
|  "color": "Red",    |
|  "fruit": "Apple",  |
|  "size": "Large"    |
|}                    |
-----------------------
Copy
Example 9:
Loading an XML file:
>>> _ = session.file.put("tests/resources/test.xml", "@mystage", auto_compress=False)
>>> # Create a DataFrame that uses a DataFrameReader to load data from a file in a stage.
>>> df = session.read.xml("@mystage/test.xml")
>>> # Load the data into the DataFrame and return an array of rows containing the results.
>>> df.show()
---------------------
|"$1"               |
---------------------
|<test>             |
|  <num>1</num>     |
|  <str>str1</str>  |
|</test>            |
|<test>             |
|  <num>2</num>     |
|  <str>str2</str>  |
|</test>            |
---------------------
Copy
Example 10:
Loading a CSV file with an already existing FILE_FORMAT:
>>> from snowflake.snowpark.types import StructType, StructField, IntegerType, StringType
>>> _ = session.sql("create file format if not exists csv_format type=csv skip_header=1 null_if='none';").collect()
>>> _ = session.file.put("tests/resources/testCSVspecialFormat.csv", "@mystage", auto_compress=False)
>>> # Define the schema for the data in the CSV files.
>>> schema = StructType([StructField("ID", IntegerType()),StructField("USERNAME", StringType()),StructField("FIRSTNAME", StringType()),StructField("LASTNAME", StringType())])
>>> # Create a DataFrame that is configured to load data from the CSV files in the stage.
>>> df = session.read.schema(schema).option("format_name", "csv_format").csv("@mystage/testCSVspecialFormat.csv")
>>> # Load the data into the DataFrame and return an array of rows containing the results.
>>> df.collect()
[Row(ID=0, USERNAME='admin', FIRSTNAME=None, LASTNAME=None), Row(ID=1, USERNAME='test_user', FIRSTNAME='test', LASTNAME='user')]
Copy
Example 11:
Querying metadata for staged files:
>>> from snowflake.snowpark.column import METADATA_FILENAME, METADATA_FILE_ROW_NUMBER
>>> df = session.read.with_metadata(METADATA_FILENAME, METADATA_FILE_ROW_NUMBER.as_("ROW NUMBER")).schema(user_schema).csv("@mystage/testCSV.csv")
>>> # Load the data into the DataFrame and return an array of rows containing the results.
>>> df.show()
--------------------------------------------------------
|"METADATA$FILENAME"  |"ROW NUMBER"  |"A"  |"B"  |"C"  |
--------------------------------------------------------
|testCSV.csv          |1             |1    |one  |1.2  |
|testCSV.csv          |2             |2    |two  |2.2  |
--------------------------------------------------------
Copy
Example 12:
Inferring schema for csv and json files (Preview Feature - Open):
>>> # Read a csv file without a header
>>> df = session.read.option("INFER_SCHEMA", True).csv("@mystage/testCSV.csv")
>>> df.show()
----------------------
|"c1"  |"c2"  |"c3"  |
----------------------
|1     |one   |1.2   |
|2     |two   |2.2   |
----------------------
Copy
>>> # Read a csv file with header and parse the header
>>> _ = session.file.put("tests/resources/testCSVheader.csv", "@mystage", auto_compress=False)
>>> df = session.read.option("INFER_SCHEMA", True).option("PARSE_HEADER", True).csv("@mystage/testCSVheader.csv")
>>> df.show()
----------------------------
|"id"  |"name"  |"rating"  |
----------------------------
|1     |one     |1.2       |
|2     |two     |2.2       |
----------------------------
Copy
>>> df = session.read.option("INFER_SCHEMA", True).json("@mystage/testJson.json")
>>> df.show()
------------------------------
|"color"  |"fruit"  |"size"  |
------------------------------
|Red      |Apple    |Large   |
|Red      |Apple    |Large   |
------------------------------
Copy

Methods

avro(path)

Specify the path of the AVRO file(s) to load.

csv(path)

Specify the path of the CSV file(s) to load.

dbapi(create_connection, *[, table, query, ...])

Reads data from a database table using a DBAPI connection with optional partitioning, parallel processing, and query customization. By default, the function reads the entire table at a time without a query timeout. There are several ways to break data into small pieces and speed up ingestion, you can also combine them to acquire optimal performance: 1.Use column, lower_bound, upper_bound and num_partitions at the same time when you need to split large tables into smaller partitions for parallel processing. These must all be specified together, otherwise error will be raised. 2.Set max_workers to a proper positive integer. This defines the maximum number of processes and threads used for parallel execution. 3.Adjusting fetch_size can optimize performance by reducing the number of round trips to the database. 4.Use predicates to defining WHERE conditions for partitions, predicates will be ignored if column is specified to generate partition. 5.Set custom_schema to avoid snowpark infer schema, custom_schema must have a matched column name with table in external data source. You can also use session_init_statement to perform any SQL that you want to execute on external data source before fetching data. :param create_connection: a function that return a dbapi connection :param table: Specifies the name of the table in the external data source. This parameter cannot be set simultaneously with the query parameter. :param query: A valid SQL query to be used in the FROM clause. This parameter cannot be set simultaneously with the table parameter. :param column: column name used to create partition, the column type must be numeric like int type or float type, or Date type. :param lower_bound: lower bound of partition, decide the stride of partition along with upper_bound, this parameter does not filter out data. :param upper_bound: upper bound of partition, decide the stride of partition along with lower_bound, this parameter does not filter out data. :param num_partitions: number of partitions to create when reading in parallel from multiple processes and threads. :param max_workers: number of processes and threads used for parallelism. :param query_timeout: timeout(seconds) for each query, default value is 0, which means never timeout. :param fetch_size: batch size when fetching from external data source, which determine how many rows fetched per round trip. This improve performace for drivers that have a low default fetch size. :param custom_schema: a custom snowflake table schema to read data from external data source, the column names should be identical to corresponded column names external data source. This can be a schema string, for example: "id INTEGER, int_col INTEGER, text_col STRING", or StructType, for example: StructType([StructField("ID", IntegerType(), False)]) :param predicates: a list of expressions suitable for inclusion in WHERE clauses, each defines a partition :param session_init_statement: session initiation statements for external data source, this statement will be executed before fetch data from external data source, for example: "insert into test_table values (1, 'sample_data')" will insert data into test_table before fetch data from it.

format(format)

Specify the format of the file(s) to load.

json(path)

Specify the path of the JSON file(s) to load.

load([path])

Specify the path of the file(s) to load.

option(key, value)

Sets the specified option in the DataFrameReader.

options([configs])

Sets multiple specified options in the DataFrameReader.

orc(path)

Specify the path of the ORC file(s) to load.

parquet(path)

Specify the path of the PARQUET file(s) to load.

schema(schema)

Define the schema for CSV files that you want to read.

table(name)

Returns a Table that points to the specified table.

with_metadata(*metadata_cols)

Define the metadata columns that need to be selected from stage files.

xml(path)

Specify the path of the XML file(s) to load.