You are viewing documentation about an older version (1.13.0). View latest version

snowflake.snowpark.DataFrameReader

class snowflake.snowpark.DataFrameReader(session: Session)[source]

Bases: object

Provides methods to load data in various supported formats from a Snowflake stage to a DataFrame. The paths provided to the DataFrameReader must refer to Snowflake stages.

To use this object:

1. Access an instance of a DataFrameReader by using the Session.read property.

2. Specify any format-specific options and copy options by calling the option() or options() method. These methods return a DataFrameReader that is configured with these options. (Note that although specifying copy options can make error handling more robust during the reading process, it may have an effect on performance.)

3. Specify the schema of the data that you plan to load by constructing a types.StructType object and passing it to the schema() method if the file format is CSV. Other file formats such as JSON, XML, Parquet, ORC, and AVRO don’t accept a schema. This method returns a DataFrameReader that is configured to read data that uses the specified schema. Currently, inferring schema is also supported for CSV and JSON formats as a preview feature open to all accounts.

4. Specify the format of the data by calling the method named after the format (e.g. csv(), json(), etc.). These methods return a DataFrame that is configured to load data in the specified format.

5. Call a DataFrame method that performs an action (e.g. DataFrame.collect()) to load the data from the file.

The following examples demonstrate how to use a DataFrameReader.
>>> # Create a temp stage to run the example code.
>>> _ = session.sql("CREATE or REPLACE temp STAGE mystage").collect()
Copy
Example 1:

Loading the first two columns of a CSV file and skipping the first header line:

>>> from snowflake.snowpark.types import StructType, StructField, IntegerType, StringType, FloatType
>>> _ = session.file.put("tests/resources/testCSV.csv", "@mystage", auto_compress=False)
>>> # Define the schema for the data in the CSV file.
>>> user_schema = StructType([StructField("a", IntegerType()), StructField("b", StringType()), StructField("c", FloatType())])
>>> # Create a DataFrame that is configured to load data from the CSV file.
>>> df = session.read.options({"field_delimiter": ",", "skip_header": 1}).schema(user_schema).csv("@mystage/testCSV.csv")
>>> # Load the data into the DataFrame and return an array of rows containing the results.
>>> df.collect()
[Row(A=2, B='two', C=2.2)]
Copy
Example 2:

Loading a gzip compressed json file:

>>> _ = session.file.put("tests/resources/testJson.json", "@mystage", auto_compress=True)
>>> # Create a DataFrame that is configured to load data from the gzipped JSON file.
>>> json_df = session.read.option("compression", "gzip").json("@mystage/testJson.json.gz")
>>> # Load the data into the DataFrame and return an array of rows containing the results.
>>> json_df.show()
-----------------------
|"$1"                 |
-----------------------
|{                    |
|  "color": "Red",    |
|  "fruit": "Apple",  |
|  "size": "Large"    |
|}                    |
-----------------------
Copy

In addition, if you want to load only a subset of files from the stage, you can use the pattern option to specify a regular expression that matches the files that you want to load.

Example 3:

Loading only the CSV files from a stage location:

>>> from snowflake.snowpark.types import StructType, StructField, IntegerType, StringType
>>> from snowflake.snowpark.functions import col
>>> _ = session.file.put("tests/resources/*.csv", "@mystage", auto_compress=False)
>>> # Define the schema for the data in the CSV files.
>>> user_schema = StructType([StructField("a", IntegerType()), StructField("b", StringType()), StructField("c", FloatType())])
>>> # Create a DataFrame that is configured to load data from the CSV files in the stage.
>>> csv_df = session.read.option("pattern", ".*V[.]csv").schema(user_schema).csv("@mystage").sort(col("a"))
>>> # Load the data into the DataFrame and return an array of rows containing the results.
>>> csv_df.collect()
[Row(A=1, B='one', C=1.2), Row(A=2, B='two', C=2.2), Row(A=3, B='three', C=3.3), Row(A=4, B='four', C=4.4)]
Copy

To load Parquet, ORC and AVRO files, no schema is accepted because the schema will be automatically inferred. Inferring the schema can be disabled by setting option “infer_schema” to False. Then you can use $1 to access the column data as an OBJECT.

Example 4:
Loading a Parquet file with inferring the schema.
>>> from snowflake.snowpark.functions import col
>>> _ = session.file.put("tests/resources/test.parquet", "@mystage", auto_compress=False)
>>> # Create a DataFrame that uses a DataFrameReader to load data from a file in a stage.
>>> df = session.read.parquet("@mystage/test.parquet").where(col('"num"') == 2)
>>> # Load the data into the DataFrame and return an array of rows containing the results.
>>> df.collect()
[Row(str='str2', num=2)]
Copy
Example 5:
Loading an ORC file and infer the schema:
>>> from snowflake.snowpark.functions import col
>>> _ = session.file.put("tests/resources/test.orc", "@mystage", auto_compress=False)
>>> # Create a DataFrame that uses a DataFrameReader to load data from a file in a stage.
>>> df = session.read.orc("@mystage/test.orc").where(col('"num"') == 2)
>>> # Load the data into the DataFrame and return an array of rows containing the results.
>>> df.collect()
[Row(str='str2', num=2)]
Copy
Example 6:
Loading an AVRO file and infer the schema:
>>> from snowflake.snowpark.functions import col
>>> _ = session.file.put("tests/resources/test.avro", "@mystage", auto_compress=False)
>>> # Create a DataFrame that uses a DataFrameReader to load data from a file in a stage.
>>> df = session.read.avro("@mystage/test.avro").where(col('"num"') == 2)
>>> # Load the data into the DataFrame and return an array of rows containing the results.
>>> df.collect()
[Row(str='str2', num=2)]
Copy
Example 7:
Loading a Parquet file without inferring the schema:
>>> from snowflake.snowpark.functions import col
>>> _ = session.file.put("tests/resources/test.parquet", "@mystage", auto_compress=False)
>>> # Create a DataFrame that uses a DataFrameReader to load data from a file in a stage.
>>> df = session.read.option("infer_schema", False).parquet("@mystage/test.parquet").where(col('$1')["num"] == 2)
>>> # Load the data into the DataFrame and return an array of rows containing the results.
>>> df.show()
-------------------
|"$1"             |
-------------------
|{                |
|  "num": 2,      |
|  "str": "str2"  |
|}                |
-------------------
Copy

Loading JSON and XML files doesn’t support schema either. You also need to use $1 to access the column data as an OBJECT.

Example 8:
Loading a JSON file:
>>> from snowflake.snowpark.functions import col, lit
>>> _ = session.file.put("tests/resources/testJson.json", "@mystage", auto_compress=False)
>>> # Create a DataFrame that uses a DataFrameReader to load data from a file in a stage.
>>> df = session.read.json("@mystage/testJson.json").where(col("$1")["fruit"] == lit("Apple"))
>>> # Load the data into the DataFrame and return an array of rows containing the results.
>>> df.show()
-----------------------
|"$1"                 |
-----------------------
|{                    |
|  "color": "Red",    |
|  "fruit": "Apple",  |
|  "size": "Large"    |
|}                    |
|{                    |
|  "color": "Red",    |
|  "fruit": "Apple",  |
|  "size": "Large"    |
|}                    |
-----------------------
Copy
Example 9:
Loading an XML file:
>>> _ = session.file.put("tests/resources/test.xml", "@mystage", auto_compress=False)
>>> # Create a DataFrame that uses a DataFrameReader to load data from a file in a stage.
>>> df = session.read.xml("@mystage/test.xml")
>>> # Load the data into the DataFrame and return an array of rows containing the results.
>>> df.show()
---------------------
|"$1"               |
---------------------
|<test>             |
|  <num>1</num>     |
|  <str>str1</str>  |
|</test>            |
|<test>             |
|  <num>2</num>     |
|  <str>str2</str>  |
|</test>            |
---------------------
Copy
Example 10:
Loading a CSV file with an already existing FILE_FORMAT:
>>> from snowflake.snowpark.types import StructType, StructField, IntegerType, StringType
>>> _ = session.sql("create file format if not exists csv_format type=csv skip_header=1 null_if='none';").collect()
>>> _ = session.file.put("tests/resources/testCSVspecialFormat.csv", "@mystage", auto_compress=False)
>>> # Define the schema for the data in the CSV files.
>>> schema = StructType([StructField("ID", IntegerType()),StructField("USERNAME", StringType()),StructField("FIRSTNAME", StringType()),StructField("LASTNAME", StringType())])
>>> # Create a DataFrame that is configured to load data from the CSV files in the stage.
>>> df = session.read.schema(schema).option("format_name", "csv_format").csv("@mystage/testCSVspecialFormat.csv")
>>> # Load the data into the DataFrame and return an array of rows containing the results.
>>> df.collect()
[Row(ID=0, USERNAME='admin', FIRSTNAME=None, LASTNAME=None), Row(ID=1, USERNAME='test_user', FIRSTNAME='test', LASTNAME='user')]
Copy
Example 11:
Querying metadata for staged files:
>>> from snowflake.snowpark.column import METADATA_FILENAME, METADATA_FILE_ROW_NUMBER
>>> df = session.read.with_metadata(METADATA_FILENAME, METADATA_FILE_ROW_NUMBER.as_("ROW NUMBER")).schema(user_schema).csv("@mystage/testCSV.csv")
>>> # Load the data into the DataFrame and return an array of rows containing the results.
>>> df.show()
--------------------------------------------------------
|"METADATA$FILENAME"  |"ROW NUMBER"  |"A"  |"B"  |"C"  |
--------------------------------------------------------
|testCSV.csv          |1             |1    |one  |1.2  |
|testCSV.csv          |2             |2    |two  |2.2  |
--------------------------------------------------------
Copy
Example 12:
Inferring schema for csv and json files (Preview Feature - Open):
>>> # Read a csv file without a header
>>> df = session.read.option("INFER_SCHEMA", True).csv("@mystage/testCSV.csv")
>>> df.show()
----------------------
|"c1"  |"c2"  |"c3"  |
----------------------
|1     |one   |1.2   |
|2     |two   |2.2   |
----------------------
Copy
>>> # Read a csv file with header and parse the header
>>> _ = session.file.put("tests/resources/testCSVheader.csv", "@mystage", auto_compress=False)
>>> df = session.read.option("INFER_SCHEMA", True).option("PARSE_HEADER", True).csv("@mystage/testCSVheader.csv")
>>> df.show()
----------------------------
|"id"  |"name"  |"rating"  |
----------------------------
|1     |one     |1.2       |
|2     |two     |2.2       |
----------------------------
Copy
>>> df = session.read.option("INFER_SCHEMA", True).json("@mystage/testJson.json")
>>> df.show()
------------------------------
|"color"  |"fruit"  |"size"  |
------------------------------
|Red      |Apple    |Large   |
|Red      |Apple    |Large   |
------------------------------
Copy

Methods

avro(path)

Specify the path of the AVRO file(s) to load.

csv(path)

Specify the path of the CSV file(s) to load.

json(path)

Specify the path of the JSON file(s) to load.

option(key, value)

Sets the specified option in the DataFrameReader.

options(configs)

Sets multiple specified options in the DataFrameReader.

orc(path)

Specify the path of the ORC file(s) to load.

parquet(path)

Specify the path of the PARQUET file(s) to load.

schema(schema)

Define the schema for CSV files that you want to read.

table(name)

Returns a Table that points to the specified table.

with_metadata(*metadata_cols)

Define the metadata columns that need to be selected from stage files.

xml(path)

Specify the path of the XML file(s) to load.