Working with DataFrames in Snowpark

In Snowpark, the main way in which you query and process data is through a DataFrame. This topic explains how to work with DataFrames.

In this Topic:

To retrieve and manipulate data, you use the DataFrame class. A DataFrame represents a relational dataset that is evaluated lazily: it only executes when a specific action is triggered. In a sense, a DataFrame is like a query that needs to be evaluated in order to retrieve data.

To retrieve data into a DataFrame:

  1. Construct a DataFrame, specifying the source of the data for the dataset.

    For example, you can create a DataFrame to hold data from a table, an external CSV file, or the execution of a SQL statement.

  2. Specify how the dataset in the DataFrame should be transformed.

    For example, you can specify which columns should be selected, how the rows should be filtered, how the results should be sorted and grouped, etc.

  3. Execute the statement to retrieve the data into the DataFrame.

    In order to retrieve the data into the DataFrame, you must invoke a method that performs an action (for example, the collect() method).

The next sections explain these steps in more detail.

Setting up the Examples for this Section

Some of the examples of this section use a DataFrame to query a table named sample_product_data. If you want to run these examples, you can create this table and fill the table with some data by executing the following SQL statements:

CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT);
INSERT INTO sample_product_data VALUES
    (1, 0, 5, 'Product 1', 'prod-1', 1, 10),
    (2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20),
    (3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30),
    (4, 0, 10, 'Product 2', 'prod-2', 2, 40),
    (5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50),
    (6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60),
    (7, 0, 20, 'Product 3', 'prod-3', 3, 70),
    (8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80),
    (9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90),
    (10, 0, 50, 'Product 4', 'prod-4', 4, 100),
    (11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100),
    (12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100);

To verify that the table was created, run:

SELECT * FROM sample_product_data;

Constructing a DataFrame

To construct a DataFrame, you can use methods in the Session class. Each of the following methods constructs a DataFrame from a different type of data source:

  • To create a DataFrame from data in a table, view, or stream, call the table method:

    // Create a DataFrame from the data in the "sample_product_data" table.
    val dfTable = session.table("sample_product_data")
    
    // To print out the first 10 rows, call:
    //   dfTable.show()
    

    Note

    The session.table method returns an Updatable object. Updatable extends DataFrame and provides additional methods for working with data in the table (e.g. methods for updating and deleting data). See Updating, Deleting, and Merging Rows in a Table.

  • To create a DataFrame from a sequence of values, call the createDataFrame method:

    // Create a DataFrame containing a sequence of values.
    // In the DataFrame, name the columns "i" and "s".
    val dfSeq = session.createDataFrame(Seq((1, "one"), (2, "two"))).toDF("i", "s")
    
  • To create a DataFrame containing a range of values, call the range method:

    // Create a DataFrame from a range
    val dfRange = session.range(1, 10, 2)
    
  • To create a DataFrame for a file in a stage, call read to get a DataFrameReader object. In the DataFrameReader object, call the method corresponding to the format of the data in the file:

    // Create a DataFrame from data in a stage.
    val dfJson = session.read.json("@mystage2/data1.json")
    
  • To create a DataFrame to hold the results of a SQL query, call the sql method:

    // Create a DataFrame from a SQL query
    val dfSql = session.sql("SELECT name from products")
    

    Note: Although you can use this method to execute SELECT statements that retrieve data from tables and staged files, you should use the table and read methods instead. Methods like table and read can provide better syntax highlighting, error highlighting, and intelligent code completion in development tools.

Specifying How the Dataset Should Be Transformed

To specify which columns should be selected and how the results should be filtered, sorted, grouped, etc., call the DataFrame methods that transform the dataset. To identify columns in these methods, use the col function or an expression that evaluates to a column. (See Specifying Columns and Expressions.)

For example:

  • To specify which rows should be returned, call the filter method:

    // Import the col function from the functions object.
    import com.snowflake.snowpark.functions._
    
    // Create a DataFrame for the rows with the ID 1
    // in the "sample_product_data" table.
    //
    // This example uses the === operator of the Column object to perform an
    // equality check.
    val df = session.table("sample_product_data").filter(col("id") === 1)
    df.show()
    
  • To specify the columns that should be selected, call the select method:

    // Import the col function from the functions object.
    import com.snowflake.snowpark.functions._
    
    // Create a DataFrame that contains the id, name, and serial_number
    // columns in te "sample_product_data" table.
    val df = session.table("sample_product_data").select(col("id"), col("name"), col("serial_number"))
    df.show()
    

Each method returns a new DataFrame object that has been transformed. (The method does not affect the original DataFrame object.) This means that if you want to apply multiple transformations, you can chain method calls, calling each subsequent transformation method on the new DataFrame object returned by the previous method call.

Note that these transformation methods do not retrieve data from the Snowflake database. (The action methods described in Performing an Action to Evaluate a DataFrame perform the data retrieval.) The transformation methods simply specify how the SQL statement should be constructed.

Specifying Columns and Expressions

When calling these transformation methods, you might need to specify columns or expressions that use columns. For example, when calling the select method, you need to specify the columns that should be selected.

To refer to a column, create a Column object by calling the col function in the com.snowflake.snowpark.functions object.

// Import the col function from the functions object.
import com.snowflake.snowpark.functions._

val dfProductInfo = session.table("sample_product_data").select(col("id"), col("name"))
dfProductInfo.show()

Note

To create a Column object for a literal, see Using Literals as Column Objects.

When specifying a filter, projection, join condition, etc., you can use Column objects in an expression. For example:

  • You can use Column objects with the filter method to specify a filter condition:

    // Specify the equivalent of "WHERE id = 20"
    // in an SQL SELECT statement.
    df.filter(col("id") === 20)
    
    // Specify the equivalent of "WHERE a + b < 10"
    // in an SQL SELECT statement.
    df.filter((col("a") + col("b")) < 10)
    
  • You can use Column objects with the select method to define an alias:

    // Specify the equivalent of "SELECT b * 10 AS c"
    // in an SQL SELECT statement.
    df.select((col("b") * 10) as "c")
    
  • You can use Column objects with the join method to define a join condition:

    // Specify the equivalent of "X JOIN Y on X.a_in_X = Y.b_in_Y"
    // in an SQL SELECT statement.
    dfX.join(dfY, col("a_in_X") === col("b_in_Y"))
    

Referring to Columns in Different DataFrames

When referring to columns in two different DataFrame objects that have the same name (for example, joining the DataFrames on that column), you can use the DataFrame.col method in one DataFrame object to refer to a column in that object (for example, df1.col("name") and df2.col("name")).

The following example demonstrates how to use the DataFrame.col method to refer to a column in a specific DataFrame. The example joins two DataFrame objects that both have a column named key. The example uses the Column.as method to change the names of the columns in the newly created DataFrame.

// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.col method to refer to the columns used in the join.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("key") === dfRhs.col("key")).select(dfLhs.col("value").as("L"), dfRhs.col("value").as("R"))

Using the apply Method to Refer to a Column

As an alternative to the DataFrame.col method, you can use the DataFrame.apply method to refer to a column in a specific DataFrame. Like the DataFrame.col method, the DataFrame.apply method accepts a column name as input and returns a Column object.

Note that when an object has an apply method in Scala, you can call the apply method by calling the object as if it were a function. For example, to call df.apply("column_name"), you can simply write df("column_name"). The following calls are equivalent:

  • df.col("<column_name>")

  • df.apply("<column_name>")

  • df("<column_name>")

The following example is the same as the previous example but uses the DataFrame.apply method to refer to the columns in a join operation:

// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.apply method to refer to the columns used in the join.
// Note that dfLhs("key") is shorthand for dfLhs.apply("key").
val dfJoined = dfLhs.join(dfRhs, dfLhs("key") === dfRhs("key")).select(dfLhs("value").as("L"), dfRhs("value").as("R"))

Using Shorthand For a Column Object

As an alternative to using the col function, you can refer to a column in one of these ways:

  • Use a dollar sign in front of the quoted column name ($"column_name").

  • Use an apostrophe (a single quote) in front of the unquoted column name ('column_name).

To do this, import the names from the implicits object after you create a Session object:

val session = Session.builder.configFile("/path/to/properties").create

// Import this after you create the session.
import session.implicits._

// Use the $ (dollar sign) shorthand.
val df = session.table("T").filter($"id" === 10).filter(($"a" + $"b") < 10)

// Use ' (apostrophe) shorthand.
val df = session.table("T").filter('id === 10).filter(('a + 'b) < 10).select('b * 10)

Using Double Quotes Around Object Identifiers (Table Names, Column Names, etc.)

The names of databases, schemas, tables, and stages that you specify must conform to the Snowflake identifier requirements. When you specify a name, Snowflake considers the name to be in upper case. For example, the following calls are equivalent:

// The following calls are equivalent:
df.select(col("id123"))
df.select(col("ID123"))

If the name does not conform to the identifier requirements, you must use double quotes (") around the name. Use a backslash (\) to escape the double quote character within a Scala string literal. For example, the following table name does not start with a letter or an underscore, so you must use double quotes around the name:

val df = session.table("\"10tablename\"")

Note that when specifying the name of a column, you don’t need to use double quotes around the name. The Snowpark library automatically encloses the column name in double quotes for you if the name does not comply with the identifier requirements:.

// The following calls are equivalent:
df.select(col("3rdID"))
df.select(col("\"3rdID\""))

// The following calls are equivalent:
df.select(col("id with space"))
df.select(col("\"id with space\""))

If you have already added double quotes around a column name, the library does not insert additional double quotes around the name.

In some cases, the column name might contain double quote characters:

describe table quoted;
+------------------------+ ...
| name                   | ...
|------------------------+ ...
| name_with_"air"_quotes | ...
| "column_name_quoted"   | ...
+------------------------+ ...

As explained in Identifier Requirements, for each double quote character within a double-quoted identifier, you must use two double quote characters (e.g. "name_with_""air""_quotes" and """column_name_quoted"""):

val dfTable = session.table("quoted")
dfTable.select("\"name_with_\"\"air\"\"_quotes\"").show()
dfTable.select("\"\"\"column_name_quoted\"\"\"").show()

Keep in mind that when an identifier is enclosed in double quotes (whether you explicitly added the quotes or the library added the quotes for you), Snowflake treats the identifier as case-sensitive:

// The following calls are NOT equivalent!
// The Snowpark library adds double quotes around the column name,
// which makes Snowflake treat the column name as case-sensitive.
df.select(col("id with space"))
df.select(col("ID WITH SPACE"))

Using Literals as Column Objects

To use a literal in a method that passes in a Column object, create a Column object for the literal by passing the literal to the lit function in the com.snowflake.snowpark.functions object. For example:

// Import for the lit and col functions.
import com.snowflake.snowpark.functions._

// Show the first 10 rows in which num_items is greater than 5.
// Use `lit(5)` to create a Column object for the literal 5.
df.filter(col("num_items").gt(lit(5))).show()

If the literal is a floating point or double value in Scala (e.g. 0.05 is treated as a Double by default), the Snowpark library generates SQL that implicitly casts the value to the corresponding Snowpark data type (e.g. 0.05::DOUBLE). This can produce an approximate value that differs from the exact number specified.

For example, the following code displays no matching rows, even though the filter (that matches values greater than or equal to 0.05) should match the rows in the DataFrame:

// Create a DataFrame that contains the value 0.05.
val df = session.sql("select 0.05 :: Numeric(5, 2) as a")

// Applying this filter results in no matching rows in the DataFrame.
df.filter(col("a") <= lit(0.06) - lit(0.01)).show()

The problem is that lit(0.06) and lit(0.01) produce approximate values for 0.06 and 0.01, not the exact values.

To avoid this problem, you can use one of the following approaches:

  • Option 1: Cast the literal to the Snowpark type that you want to use. For example, to use a NUMBER with a precision of 5 and a scale of 2:

    df.filter(col("a") <= lit(0.06).cast(new DecimalType(5, 2)) - lit(0.01).cast(new DecimalType(5, 2))).show()
    
  • Option 2: Cast the value to the type that you want to use before passing the value to the lit function. For example, if you want to use the BigDecimal type:

    df.filter(col("a") <= lit(BigDecimal(0.06)) - lit(BigDecimal(0.01))).show()
    

Casting a Column Object to a Specific Type

To cast a Column object to a specific type, call the Column.cast method, and pass in a type object from the com.snowflake.snowpark.types package. For example, to cast a literal as a NUMBER with a precision of 5 and a scale of 2:

// Import for the lit function.
import com.snowflake.snowpark.functions._
// Import for the DecimalType class..
import com.snowflake.snowpark.types._

val decimalValue = lit(0.05).cast(new DecimalType(5,2))

Chaining Method Calls

Because each method that transforms a DataFrame object returns a new DataFrame object that has the transformation applied, you can chain method calls to produce a new DataFrame that is transformed in additional ways.

The following example returns a DataFrame that is configured to:

  • Query the sample_product_data table.

  • Return the row with id = 1.

  • Select the name and serial_number columns.

val dfProductInfo = session.table("sample_product_data").filter(col("id") === 1).select(col("name"), col("serial_number"))
dfProductInfo.show()

In this example:

  • session.table("sample_product_data") returns a DataFrame for the sample_product_data table.

    Although the DataFrame does not yet contain the data from the table, the object does contain the definitions of the columns in the table.

  • filter(col("id") === 1) returns a DataFrame for the sample_product_data table that is set up to return the row with id = 1.

    Note again that the DataFrame does not yet contain the matching row from the table. The matching row is not retrieved until you call an action method.

  • select(col("name"), col("serial_number")) returns a DataFrame that contains the name and serial_number columns for the row in the sample_product_data table that has id = 1.

When you chain method calls, keep in mind that the order of calls is important. Each method call returns a DataFrame that has been transformed. Make sure that subsequent calls work with the transformed DataFrame.

For example, in the code below, the select method returns a DataFrame that just contains two columns: name and serial_number. The filter method call on this DataFrame fails because it uses the id column, which is not in the transformed DataFrame.

// This fails with the error "invalid identifier 'ID'."
val dfProductInfo = session.table("sample_product_data").select(col("name"), col("serial_number")).filter(col("id") === 1)

In contrast, the following code executes successfully because the filter() method is called on a DataFrame that contains all of the columns in the sample_product_data table (including the id column):

// This succeeds because the DataFrame returned by the table() method
// includes the "id" column.
val dfProductInfo = session.table("sample_product_data").filter(col("id") === 1).select(col("name"), col("serial_number"))
dfProductInfo.show()

Keep in mind that you might need to make the select and filter method calls in a different order than you would use the equivalent keywords (SELECT and WHERE) in a SQL statement.

Limiting the Number of Rows in a DataFrame

To limit the number of rows in a DataFrame, you can use the DataFrame.limit transformation method.

The Snowpark API also provides action methods for retrieving and printing out a limited number of rows:

  • the DataFrame.first action method (to execute the query and return the first n rows)

  • the DataFrame.show action method (to execute the query and print the first n rows)

These methods effectively add a LIMIT clause to the SQL statement that is executed.

As explained in the usage notes for LIMIT, the results are non-deterministic unless you specify a sort order (ORDER BY) in conjunction with LIMIT.

To keep the ORDER BY clause with the LIMIT clause (e.g. so that ORDER BY is not in a separate subquery), you must call the method that limits results on the DataFrame returned by the sort method.

For example, if you are chaining method calls:

// Limit the number of rows to 5, sorted by parent_id.
var dfSubset = df.sort(col("parent_id")).limit(5);

// Return the first 5 rows, sorted by parent_id.
var arrayOfRows = df.sort(col("parent_id")).first(5)

// Print the first 5 rows, sorted by parent_id.
df.sort(col("parent_id")).show(5)

Retrieving Column Definitions

To retrieve the definition of the columns in the dataset for the DataFrame, call the schema method. This method returns a StructType object that contains an Array of StructField objects. Each StructField object contains the definition of a column.

// Get the StructType object that describes the columns in the
// underlying rowset.
val tableSchema = session.table("sample_product_data").schema
println("Schema for sample_product_data: " + tableSchema);

In the returned StructType object, the column names are always normalized. Unquoted identifiers are returned in uppercase, and quoted identifiers are returned in the exact case in which they were defined.

The following example creates a DataFrame containing the columns named ID and 3rd. For the column name 3rd, the Snowpark library automatically encloses the name in double quotes ("3rd") because the name does not comply with the requirements for an identifier.

The example calls the schema method and then calls the names method on the returned StructType object to get an ArraySeq of column names. The names are normalized in the StructType returned by the schema method.

// Create a DataFrame containing the "id" and "3rd" columns.
val dfSelectedColumns = session.table("sample_product_data").select(col("id"), col("3rd"))
// Print out the names of the columns in the schema. This prints out:
//   ArraySeq(ID, "3rd")
println(dfSelectedColumns.schema.names.toSeq)

Joining DataFrames

To join DataFrame objects, call the DataFrame.join method.

The following sections explain how to use DataFrames to perform a join:

Setting up the Sample Data for the Joins

The examples in the next sections use sample data that you can set up by executing the following SQL statements:

CREATE TABLE sample_a (
  id_a INTEGER,
  name_a VARCHAR
);
INSERT INTO sample_a (id_a, name_a) VALUES
  (10, 'A1'),
  (40, 'A2'),
  (80, 'A3'),
  (90, 'A4')
;
CREATE TABLE sample_b (
  id_b INTEGER,
  name_b VARCHAR,
  id_a INTEGER
);
INSERT INTO sample_b (id_b, name_b, id_a) VALUES
  (4000, 'B1', 40),
  (4001, 'B2', 10),
  (9000, 'B3', 80),
  (9099, 'B4', NULL)
;
CREATE TABLE sample_c (
  id_c INTEGER,
  name_c VARCHAR,
  id_a INTEGER,
  id_b INTEGER
);
INSERT INTO sample_c (id_c, name_c, id_a, id_b) VALUES
  (1012, 'C1', 10, NULL),
  (1040, 'C2', 40, 4000),
  (1041, 'C3', 40, 4001)
;

Specifying the Columns for the Join

With the DataFrame.join method, you can specify the columns to use in one of the following ways:

  • Specify a Column expression that describes the join condition.

  • Specify one or more columns that should be used as the common columns in the join.

The following example performs an inner join on the column named id_a:

// Create a DataFrame that joins the DataFrames for the tables
// "sample_a" and "sample_b" on the column named "id_a".
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"))
dfJoined.show()

Note that the example uses the DataFrame.col method to specify the condition to use for the join. See Specifying Columns and Expressions for more about this method.

This prints the following output:

----------------------------------------------------------------
|"l_zxlC_ID_A"  |"NAME_A"  |"ID_B"  |"NAME_B"  |"r_RI6I_ID_A"  |
----------------------------------------------------------------
|10             |A1        |4001    |B2        |10             |
|40             |A2        |4000    |B1        |40             |
|80             |A3        |9000    |B3        |80             |
----------------------------------------------------------------

Note that the Snowpark library generates aliases for columns that have the same name. These aliases are in the form l_xxxx_column_name (for columns from the table on the left side of the join) and r_xxxx_column_name (for columns from the table on the right side of the join).

In the output above:

  • l_zxlC_ID_A is an alias for the column id_a in sample_a.

  • r_RI6I_ID_A is an alias for the column id_a in sample_b.

Performing a Natural Join

To perform natural join (where DataFrames are joined on columns that have the same name), call the DataFrame.naturalJoin method.

The following example joins the DataFrames for the tables sample_a and sample_b on their common columns (the column id_a):

val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.naturalJoin(dfRhs)
dfJoined.show()

This prints the following output:

-----------------------------------------
|"ID_A"  |"NAME_A"  |"ID_B"  |"NAME_B"  |
-----------------------------------------
|10      |A1        |4001    |B2        |
|40      |A2        |4000    |B1        |
|80      |A3        |9000    |B3        |
-----------------------------------------

Specifying the Type of Join

By default, the DataFrame.join method creates an inner join. To specify a different type of join, set the joinType argument to one of the following values:

Type of Join

joinType

Inner join

inner (default)

Left outer join

left

Right outer join

right

Full outer join

full

Cross join

cross

For example:

// Create a DataFrame that performs a left outer join on
// "sample_a" and "sample_b" on the column named "id_a".
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfLeftOuterJoin = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"), "left")
dfLeftOuterJoin.show()

This prints the following output:

----------------------------------------------------------------
|"l_p3Mb_ID_A"  |"NAME_A"  |"ID_B"  |"NAME_B"  |"r_ZTyi_ID_A"  |
----------------------------------------------------------------
|40             |A2        |4000    |B1        |40             |
|10             |A1        |4001    |B2        |10             |
|80             |A3        |9000    |B3        |80             |
|90             |A4        |NULL    |NULL      |NULL           |
----------------------------------------------------------------

As explained earlier, the Snowpark library generates aliases for columns in the output that have the same name. In the output above:

  • l_p3Mb_ID_A is an alias for the column id_a in sample_a.

  • r_ZTyi_ID_A is an alias for the column id_a in sample_b.

Joining Multiple Tables

To join multiple tables:

  1. Create a DataFrame for each table.

  2. Call the DataFrame.join method on the first DataFrame, passing in the second DataFrame.

  3. Using the DataFrame returned by the join method, call the join method, passing in the third DataFrame.

You can chain the join calls as shown below:

val dfFirst = session.table("sample_a")
val dfSecond  = session.table("sample_b")
val dfThird = session.table("sample_c")
val dfJoinThreeTables = dfFirst.join(dfSecond, dfFirst.col("id_a") === dfSecond.col("id_a")).join(dfThird, dfFirst.col("id_a") === dfThird.col("id_a"))
dfJoinThreeTables.show()

This prints the following output:

--------------------------------------------------------------------------------------------------------------------
|"l_PZ1M_ID_A"  |"NAME_A"  |"l_8qBx_ID_B"  |"NAME_B"  |"r_clyW_ID_A"  |"ID_C"  |"NAME_C"  |"ID_A"  |"r_TtMO_ID_B"  |
--------------------------------------------------------------------------------------------------------------------
|10             |A1        |4001           |B2        |10             |1012    |C1        |10      |NULL           |
|40             |A2        |4000           |B1        |40             |1040    |C2        |40      |4000           |
|40             |A2        |4000           |B1        |40             |1041    |C3        |40      |4001           |
--------------------------------------------------------------------------------------------------------------------

As explained earlier, the Snowpark library generates aliases for columns in the output that have the same name. In the output above:

  • l_PZIM_ID_A is an alias for the column id_a in dfFirst.

  • l_8qBx_ID_B is an alias for the column id_b in dfFirst.

  • r_clyW_ID_A is an alias for the column id_a in dfSecond.

  • r_TtMO_ID_B is an alias for the column id_b in dfThird.

Performing a Self-Join

If you need to join a table with itself on different columns, you cannot perform the self-join with a single DataFrame. The following examples that use a single DataFrame to perform a self-join fail because the column expressions for "id" are present in the left and right sides of the join:

// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
val dfJoined = df.join(df, col("id") === col("parent_id"))
// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
val dfJoined = df.join(df, df("id") === df("parent_id"))

Both of these examples fail with the following exception:

Exception in thread "main" com.snowflake.snowpark.SnowparkClientException:
  Joining a DataFrame to itself can lead to incorrect results due to ambiguity of column references.
  Instead, join this DataFrame to a clone() of itself.

Instead, use the DataFrame.clone() method to create a clone of the DataFrame object, and use the two DataFrame objects to perform the join:

// Create a DataFrame object for the "sample_product_data" table for the left-hand side of the join.
val dfLhs = session.table("sample_product_data")
// Clone the DataFrame object to use as the right-hand side of the join.
val dfRhs = dfLhs.clone()

// Create a DataFrame that joins the two DataFrames
// for the "sample_product_data" table on the
// "id" and "parent_id" columns.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id") === dfRhs.col("parent_id"))
dfJoined.show()

If you want to perform a self-join on the same column, call the join method that passes in a Seq of column expressions for the USING clause:

// Create a DataFrame that performs a self-join on a DataFrame
// using the column named "key".
val dfJoined = df.join(df, Seq("key"))

Performing an Action to Evaluate a DataFrame

As mentioned earlier, the DataFrame is lazily evaluated, which means the SQL statement isn’t sent to the server for execution until you perform an action. An action causes the DataFrame to be evaluated and sends the corresponding SQL statement to the server for execution.

The following sections explain how to perform an action synchronously and asynchronously on a DataFrame:

Performing an Action Synchronously

To perform an action synchronously, call one of the following action methods:

Method to Perform an Action Synchronously

Description

DataFrame.collect

Evaluates the DataFrame and returns the resulting dataset as an Array of Row objects. See Returning All Rows.

DataFrame.toLocalIterator

Evaluates the DataFrame and returns an Iterator of Row objects. If the result set is large, use this method to avoid loading all the results into memory at once. See Returning an Iterator for the Rows.

DataFrame.count

Evaluates the DataFrame and returns the number of rows.

DataFrame.show

Evaluates the DataFrame and prints the rows to the console. Note that this method limits the number of rows to 10 (by default). See Printing the Rows in a DataFrame.

DataFrame.cacheResult

Executes the query, creates a temporary table, and puts the results into the table. The method returns a HasCachedResult object that you can use to access the data in this temporary table. See Caching a DataFrame.

DataFrame.write.saveAsTable

Saves the data in the DataFrame to the specified table. See Saving Data to a Table.

DataFrame.read.fileformat.copyInto('tableName')

Copies the data in the DataFrame to the specified table. See Copying Data from a File into a Table.

Session.table('tableName').delete

Deletes rows in the specified table. See Updating, Deleting, and Merging Rows in a Table.

Session.table('tableName').update

Updates rows in the specified table. See Updating, Deleting, and Merging Rows in a Table.

Session.table('tableName').merge.methods.collect

Merges rows into the specified table. See Updating, Deleting, and Merging Rows in a Table.

To execute the query and return the number of results, call the count method:

// Create a DataFrame for the "sample_product_data" table.
val dfProducts = session.table("sample_product_data")

// Send the query to the server for execution and
// print the count of rows in the table.
println("Rows returned: " + dfProducts.count())

You can also call action methods to:

Note: If you are calling the schema method to get the definitions of the columns in the DataFrame, you do not need to call an action method.

Performing an Action Asynchronously

Note

This feature was introduced in Snowpark 0.11.0.

To perform an action asynchronously, call the async method to return an “async actor” object (e.g. DataFrameAsyncActor), and call an asynchronous action method in that object.

These action methods of an async actor object return a TypedAsyncJob object, which you can use to check the status of the asynchronous action and retrieve the results of the action.

The next sections explain how to perform actions asynchronously and check the results.

Understanding the Basic Flow of Asynchronous Actions

You can use the following methods to perform an action asynchronously:

Method to Perform an Action Asynchronously

Description

DataFrame.async.collect

Asynchronously evaluates the DataFrame to retrieve the resulting dataset as an Array of Row objects. See Returning All Rows.

DataFrame.async.toLocalIterator

Asynchronously evaluates the DataFrame to retrieve an Iterator of Row objects. If the result set is large, use this method to avoid loading all the results into memory at once. See Returning an Iterator for the Rows.

DataFrame.async.count

Asynchronously evaluates the DataFrame to retrieve the number of rows.

DataFrame.write.async.saveAsTable

Asynchronously saves the data in the DataFrame to the specified table. See Saving Data to a Table.

DataFrame.read.fileformat.async.copyInto('tableName')

Asynchronously copies the data in the DataFrame to the specified table. See Copying Data from a File into a Table.

Session.table('tableName').async.delete

Asynchronously deletes rows in the specified table. See Updating, Deleting, and Merging Rows in a Table.

Session.table('tableName').async.update

Asynchronously updates rows in the specified table. See Updating, Deleting, and Merging Rows in a Table.

From the returned TypedAsyncJob object, you can do the following:

  • To determine if the action has completed, call the isDone method.

  • To get the query ID that corresponds to the action, call the getQueryId method.

  • To return the results of the action (e.g. the Array of Row objects for the collect method or the count of rows for the count method), call the getResult method.

    Note that getResult is a blocking call.

  • To cancel the action, call the cancel method.

For example, to execute a query asynchronously and retrieve the results as an Array of Row objects, call DataFrame.async.collect:

// Create a DataFrame with the "id" and "name" columns from the "sample_product_data" table.
// This does not execute the query.
val df = session.table("sample_product_data").select(col("id"), col("name"))

// Execute the query asynchronously.
// This call does not block.
val asyncJob = df.async.collect()
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// Get an Array of Rows containing the results, and print the results.
// Note that getResult is a blocking call.
val results = asyncJob.getResult()
results.foreach(println)

To execute the query asynchronously and retrieve the number of results, call DataFrame.async.count:

// Create a DataFrame for the "sample_product_data" table.
val dfProducts = session.table("sample_product_data")

// Execute the query asynchronously.
// This call does not block.
val asyncJob = df.async.count()
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// Print the count of rows in the table.
// Note that getResult is a blocking call.
println("Rows returned: " + asyncJob.getResult())

Specifying the Maximum Number of Seconds to Wait

When calling the getResult method, you can use the maxWaitTimeInSeconds argument to specify the maximum number of seconds to wait for the query to complete before attempting to retrieve the results. For example:

// Wait a maximum of 10 seconds for the query to complete before retrieving the results.
val results = asyncJob.getResult(10)

If you omit this argument, the method waits for the maximum number of seconds specified by the snowpark_request_timeout_in_seconds configuration property. (This is a property that you can set when creating the Session object.)

Accessing an Asynchronous Query by ID

If you have the query ID of an asynchronous query that you submitted earlier, you can call Session.createAsyncJob method to create an AsyncJob object that you can use to check the status of the query, retrieve the query results, or cancel the query.

Note that unlike TypedAsyncJob, AsyncJob does not provide a getResult method for retrieving the results. If you need to retrieve the results, call the getRows or getIterator method instead.

For example:

val asyncJob = session.createAsyncJob(myQueryId)
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// If you need to retrieve the results, call getRows to return an Array of Rows containing the results.
// Note that getRows is a blocking call.
val rows = asyncJob.getRows()
rows.foreach(println)

Retrieving Rows into a DataFrame

After you specify how the DataFrame should be transformed, you can call an action method to execute a query and return the results. You can return all of the rows in an Array, or you can return an Iterator that allows you to iterate over the results, row by row. In the latter case, if the amount of data is large, the rows are loaded into memory by chunk to avoid loading a large amount of data into memory.

Returning All Rows

To return all rows at once, call the DataFrame.collect method. This method returns an Array of Row objects. To retrieve the values from the row, call the getType method (e.g. getString, getInt, etc.).

For example:

import com.snowflake.snowpark.functions_

val rows = session.table("sample_product_data").select(col("name"), col("category_id")).sort(col("name")).collect()
for (row <- rows) {
  println(s"Name: ${row.getString(0)}; Category ID: ${row.getInt(1)}")
}

Returning an Iterator for the Rows

If you want to use an Iterator to iterate over the Row objects in the results, call DataFrame.toLocalIterator. If the amount of data in the results is large, the method loads the rows by chunk to avoid loading all rows into memory at once.

For example:

import com.snowflake.snowpark.functions_

while (rowIterator.hasNext) {
  val row = rowIterator.next()
  println(s"Name: ${row.getString(0)}; Category ID: ${row.getInt(1)}")
}

Returning the First n Rows

To return the first n rows, call the DataFrame.first method, passing in the number of rows to return.

As explained in Limiting the Number of Rows in a DataFrame, the results are non-deterministic. If you want the results to be deterministic, call this method on a sorted DataFrame (df.sort().first()).

For example:

import com.snowflake.snowpark.functions_

val df = session.table("sample_product_data")
val rows = df.sort(col("name")).first(5)
rows.foreach(println)

Printing the Rows in a DataFrame

To print the first 10 rows in the DataFrame to the console, call the DataFrame.show method. To print out a different number of rows, pass in the number of rows to print.

As explained in Limiting the Number of Rows in a DataFrame, the results are non-deterministic. If you want the results to be deterministic, call this method on a sorted DataFrame (df.sort().show()).

For example:

import com.snowflake.snowpark.functions_

val df = session.table("sample_product_data")
df.sort(col("name")).show()

Updating, Deleting, and Merging Rows in a Table

Note

This feature was introduced in Snowpark 0.7.0.

When you call Session.table to create a DataFrame object for a table, the method returns an Updatable object, which extends DataFrame with additional methods for updating and deleting data in the table. (See Updatable.)

If you need to update or delete rows in a table, you can use the following methods of the Updatable class:

Updating Rows in a Table

For the update method, pass in a Map that associates the columns to update and the corresponding values to assign to those columns. update returns an UpdateResult object, which contains the number of rows that were updated. (See UpdateResult.)

Note

update is an action method, which means that calling the method sends SQL statements to the server for execution.

For example, to replace the values in the column named count with the value 1:

val updatableDf = session.table("sample_product_data")
val updateResult = updatableDf.update(Map("count" -> lit(1)))
println(s"Number of rows updated: ${updateResult.rowsUpdated}")

The example above uses the name of the column to identify the column. You can also use a column expression:

val updateResult = updatableDf.update(Map(col("count") -> lit(1)))

If the update should be made only when a condition is met, you can specify that condition as an argument. For example, to replace the values in the column named count for rows in which the category_id column has the value 20:

val updateResult = updatableDf.update(Map(col("count") -> lit(1)), col("category_id") === 20)

If you need to base the condition on a join with a different DataFrame object, you can pass that DataFrame in as an argument and use that DataFrame in the condition. For example, to replace the values in the column named count for rows in which the category_id column matches the category_id in the DataFrame dfParts:

val updatableDf = session.table("sample_product_data")
val dfParts = session.table("parts")
val updateResult = updatableDf.update(Map(col("count") -> lit(1)), updatableDf("category_id") === dfParts("category_id"), dfParts)

Deleting Rows in a Table

For the delete method, you can specify a condition that identifies the rows to delete, and you can base that condition on a join with another DataFrame. delete returns a DeleteResult object, which contains the number of rows that were deleted. (See DeleteResult.)

Note

delete is an action method, which means that calling the method sends SQL statements to the server for execution.

For example, to delete the rows that have the value 1 in the category_id column:

val updatableDf = session.table("sample_product_data")
val deleteResult = updatableDf.delete(updatableDf("category_id") === 1)
println(s"Number of rows deleted: ${deleteResult.rowsDeleted}")

If the condition refers to columns in a different DataFrame, pass that DataFrame in as the second argument. For example, to delete the rows in which the category_id column matches the category_id in the DataFrame dfParts, pass in dfParts as the second argument:

val updatableDf = session.table("sample_product_data")
val deleteResult = updatableDf.delete(updatableDf("category_id") === dfParts("category_id"), dfParts)
println(s"Number of rows deleted: ${deleteResult.rowsDeleted}")

Merging Rows into a Table

To insert, update, and deletes rows in one table based on values in a second table or a subquery (the equivalent of the MERGE command in SQL), do the following:

  1. In the Updatable object for the table where you want the data merged in, call the merge method, passing in the DataFrame object for the other table and the column expression for the join condition.

    This returns a MergeBuilder object that you can use to specify the actions to take (e.g. insert, update, or delete) on the rows that match and the rows that don’t match. (See MergeBuilder.)

  2. Using the MergeBuilder object:

    • To specify the update or deletion that should be performed on matching rows, call the whenMatched method.

      If you need to specify an additional condition whe rows should be updated or deleted, you can pass in a column expression for that condition.

      This method returns a MatchedClauseBuilder object that you can use to specify the action to perform. (See MatchedClauseBuilder.)

      Call the update or delete method in the MatchedClauseBuilder object to specify the update or delete action that should be performed on matching rows. These methods return a MergeBuilder object that you can use to specify additional clauses.

    • To specify the insert that should be performed when rows do not match, call the whenNotMatched method.

      If you need to specify an additional condition when rows should be inserted, you can pass in a column expression for that condition.

      This method returns a NotMatchedClauseBuilder object that you can use to specify the action to perform. (See NotMatchedClauseBuilder.)

      Call the insert method in the NotMatchedClauseBuilder object to specify the insert action that should be performed when rows do not match. These methods return a MergeBuilder object that you can use to specify additional clauses.

  3. When you are done specifying the inserts, updates, and deletions that should be performed, call the collect method of the MergeBuilder object to perform the specified inserts, updates, and deletions on the table.

    collect returns a MergeResult object, which contains the number of rows that were inserted, updated, and deleted. (See MergeResult.)

The following example inserts a row with the id and value columns from the source table into the target table if the target table does not contain a row with a matching ID:

val mergeResult = target.merge(source, target("id") === source("id"))
                      .whenNotMatched.insert(Seq(source("id"), source("value")))
                      .collect()

The following example updates a row in the target table with the value of the value column from the row in the source table that has the same ID:

val mergeResult = target.merge(source, target("id") === source("id"))
                      .whenMatched.update(Map("value" -> source("value")))
                      .collect()

Saving Data to a Table

You can save the contents of a DataFrame to a new or existing table. In order to do this, you must have the following privileges:

  • CREATE TABLE privileges on the schema, if the table does not exist.

  • INSERT privileges on the table.

To save the contents of a DataFrame to a table:

  1. Call the DataFrame.write method to get a DataFrameWriter object.

  2. Call the DataFrameWriter.mode method, passing in a SaveMode object that specifies your preferences for writing to the table:

    • To insert rows, pass in SaveMode.Append.

    • To overwrite the existing table, pass in SaveMode.Overwrite.

    This method returns the same DataFrameWriter object configured with the specified mode.

  3. Call the DataFrameWriter.saveAsTable to save the contents of the DataFrame to a specified table.

    You do not need to call a separate method (e.g. collect) to execute the SQL statement that saves the data to the table. saveAsTable is an action method that executes the SQL statement.

For example:

df.write.mode(SaveMode.Overwrite).saveAsTable(tableName)

Creating a View From a DataFrame

To create a view from a DataFrame, call the DataFrame.createOrReplaceView method:

df.createOrReplaceView("db.schema.viewName")

Note that calling createOrReplaceView immediately creates the new view. More importantly, it does not cause the DataFrame to be evaluated. (The DataFrame itself is not evaluated until you perform an action.)

Views that you create by calling createOrReplaceView are persistent. If you no longer need that view, you can drop the view manually.

If you need to create a temporary view just for the session, call the DataFrame.createOrReplaceTempView method instead:

df.createOrReplaceTempView("db.schema.viewName")

Caching a DataFrame

In some cases, you may need to materialize a copy of query results for use in subsequent operations during the session (e.g. cases in which the query is complex and you might create a temporary table containing the results of a subquery).

In these cases, you can call the DataFrame.cacheResult method to create a cached copy of the results. This method returns a HasCachedResult object, which provides access to the cached copy. Because HasCachedResult extends DataFrame, you can perform some of the same operations on this cached data as you can perform on a DataFrame.

Note

  • You do not need to call an action method to retrieve the results before calling cacheResult.

    cacheResult is an action method that executes the query, creates a temporary table, and puts the results into that table.

  • Because cacheResult creates a temporary table, you must have the CREATE TABLE privilege on the schema that is in use.

  • The original DataFrame is not affected when you call this method.

    For example, suppose that dfTable is a DataFrame for the table sample_product_data:

    val dfTempTable = dfTable.cacheResult()
    

    After you call cacheResult, dfTable still points to the sample_product_data table, and you can continue to use dfTable to query and update that table.

    To use the cached data in the temporary table, you use dfTempTable (the HasCachedResult object returned by cacheResult).

For example:

import com.snowflake.snowpark.functions_

// Set up a DataFrame to query a table.
val df = session.table("sample_product_data").filter(col("category_id") > 10)
// Retrieve the results and cache the data.
val cachedDf = df.cacheResult()
// Create a DataFrame containing a subset of the cached data.
val dfSubset = cachedDf.filter(col("category_id") === lit(20)).select(col("name"), col("category_id"))
dfSubset.show()

Working With Files in a Stage

The Snowpark library provides classes and methods that you can use to load data into Snowflake and unload data from Snowflake by using files in stages.

Note

In order to use these classes and methods on a stage, you must have the required privileges for working with the stage.

The next sections explain how to use these classes and methods:

Uploading and Downloading Files in a Stage

To upload and download files in a stage, use the FileOperation object:

Uploading Files to a Stage

To upload files to a stage:

  1. Verify that you have the privileges to upload files to the stage.

  2. Use Session.file to access the FileOperation object for the session.

  3. Call the FileOperation.put method to upload the files to a stage.

    This method executes a SQL PUT command.

    • To specify any optional parameters for the PUT command, create a Map of the parameters and values, and pass in the Map as the options argument. For example:

      // Upload a file to a stage without compressing the file.
      val putOptions = Map("AUTO_COMPRESS" -> "FALSE")
      val putResults = session.file.put("file:///tmp/myfile.csv", "@myStage", putOptions)
      
    • In the localFilePath argument, you can use wildcards (* and ?) to identify a set of files to upload. For example:

      
      

      // Upload the CSV files in /tmp with names that start with “file”. // You can use the wildcard characters “*” and “?” to match multiple files. val putResults = session.file.put(“file:///tmp/file*.csv”, “@myStage/prefix2”)

  4. Check the Array of PutResult objects returned by the put method to determine if the files were successfully uploaded. For example, to print the filename and the status of the PUT operation for that file:

    // Print the filename and the status of the PUT operation.
    putResults.foreach(r => println(s"  ${r.sourceFileName}: ${r.status}"))
    

Downloading Files from a Stage

To download files from a stage:

  1. Verify that you have the privileges to download files from the stage.

  2. Use Session.file to access the FileOperation object for the session.

  3. Call the FileOperation.get method to download the files from a stage.

    This method executes a SQL GET command.

    To specify any optional parameters for the GET command, create a Map of the parameters and values, and pass in the Map as the options argument. For example:

    // Download files with names that match a regular expression pattern.
    val getOptions = Map("PATTERN" -> s"'.*file_.*.csv.gz'")
    val getResults = session.file.get("@myStage", "file:///tmp", getOptions)
    
  4. Check the Array of GetResult objects returned by the get method to determine if the files were successfully downloaded. For example, to print the filename and the status of the GET operation for that file:

    // Print the filename and the status of the GET operation.
    getResults.foreach(r => println(s"  ${r.fileName}: ${r.status}"))
    

Setting Up a DataFrame for a File in a Stage

This section explains how to set up a DataFrame for a file in a Snowflake stage. Once you create this DataFrame, you can use the DataFrame to:

To set up a DataFrame for a file in a Snowflake stage, use the DataFrameReader class:

  1. Verify that you have the following privileges:

  2. Call the read method in the Session class to access a DataFrameReader object.

  3. If the files are in CSV format, describe the fields in the file. To do this:

    1. Create a StructType object that consists of a sequence of StructField objects that describe the fields in the file.

    2. For each StructField object, specify the following:

      • The name of the field.

      • The data type of the field (specified as an object in the com.snowflake.snowpark.types package).

      • Whether or not the field is nullable.

      For example:

      import com.snowflake.snowpark.types._
      
      val schemaForDataFile = StructType(
          Seq(
              StructField("id", StringType, true),
              StructField("name", StringType, true)))
      
    3. Call the schema method in the DataFrameReader object, passing in the StructType object.

      For example:

      var dfReader = session.read.schema(schemaForDataFile)
      

      The schema method returns a DataFrameReader object that is configured to read files containing the specified fields.

      Note that you do not need to do this for files in other formats (such as JSON). For those files, the DataFrameReader treats the data as a single field of the VARIANT type with the field name $1.

  4. If you need to specify additional information about how the data should be read (for example, that the data is compressed or that a CSV file uses a semicolon instead of a comma to delimit fields), call the options method of the DataFrameReader object.

    Pass in the name and value of the option that you want to set. You can set the following types of options:

    The following example sets up the DataFrameReader object to query data in a CSV file that is not compressed and that uses a semicolon for the field delimiter.

    dfReader = dfReader.option("field_delimiter", ";").option("COMPRESSION", "NONE")
    

    The options method returns a DataFrameReader object that is configured with the specified options.

  5. Call the method corresponding to the format of the file (e.g. the csv method), passing in the location of the file.

    val df = dfReader.csv("@s3_ts_stage/emails/data_0_0_0.csv")
    

    The methods corresponding to the format of a file return a CopyableDataFrame object for that file. CopyableDataFrame extends DataFrame and provides additional methods for working the data in staged files.

  6. Call an action method to:

    As is the case with DataFrames for tables, the data is not retrieved into the DataFrame until you call an action method.

Retrieve Data from the File to the DataFrame

After you set up a DataFrame for a file in a stage, you can retrieve the data into the DataFrame:

  1. Use the DataFrame object methods to perform any transformations needed on the dataset (for example, selecting specific fields, filtering rows, etc.).

    For example, to extract the color element from a JSON file named data.json in the stage named mystage:

    val df = session.read.json("@mystage/data.json").select(col("$1")("color"))
    

    As explained earlier, for files in formats other than CSV (e.g. JSON), the DataFrameReader treats the data in the file as a single VARIANT column with the name $1.

  2. Call the DataFrame.collect method to retrieve the data. For example:

    val results = df.collect()
    

Copying Data from a File into a Table

After you set up a DataFrame for a file in a stage, you can call the CopyableDataFrame.copyInto method to copy the data into a table. This method executes the COPY INTO <table> command.

Note

You do not need to call the collect method before calling copyInto. The data from the file does not need to be in the DataFrame before you call copyInto.

For example, the following code loads data from the CSV file specified by myFileStage into the table mytable. Because the data is in a CSV file, the code must also describe the fields in the file. The example does this by calling the DataFrameReader.schema method and passing in a StructType object (csvFileSchema) containing a sequence of StructField objects that describe the fields.

val df = session.read.schema(csvFileSchema).csv(myFileStage)
df.copyInto("mytable")

Working with Semi-Structured Data

Using a DataFrame, you can query and access semi-structured data (e.g JSON data). The next sections explain how to work with semi-structured data in a DataFrame.

Note

The examples in these sections use the sample data in Sample Data Used in Examples.

Traversing Semi-Structured Data

To refer to a specific field or element in semi-structured data, use the following methods of the Column object:

Note

If the field name or elements in the path are irregular and make it difficult to use the Column.apply methods, you can use the get, get_ignore_case, or get_path functions as an alternative.

As mentioned in Using the apply Method to Refer to a Column, you can omit the method name apply:

col("column_name")("field_name")
col("column_name")(index)

For example, the following code selects the dealership field in objects in the src column of the sample data:

val df = session.table("car_sales")
df.select(col("src")("dealership")).show()

The code prints the following output:

----------------------------
|"""SRC""['DEALERSHIP']"   |
----------------------------
|"Valley View Auto Sales"  |
|"Tindel Toyota"           |
----------------------------

Note

The values in the DataFrame are surrounded by double quotes because these values are returned as string literals. To cast these values to a specific type, see Explicitly Casting Values in Semi-Structured Data.

You can also chain method calls to traverse a path to a specific field or element.

For example, the following code selects the name field in the salesperson object:

val df = session.table("car_sales")
df.select(col("src")("salesperson")("name")).show()

The code prints the following output:

------------------------------------
|"""SRC""['SALESPERSON']['NAME']"  |
------------------------------------
|"Frank Beasley"                   |
|"Greg Northrup"                   |
------------------------------------

As another example, the following code selects the first element of vehicle field, which holds an array of vehicles. The example also selects the price field from the first element.

val df = session.table("car_sales")
df.select(col("src")("vehicle")(0)).show()
df.select(col("src")("vehicle")(0)("price")).show()

The code prints the following output:

---------------------------
|"""SRC""['VEHICLE'][0]"  |
---------------------------
|{                        |
|  "extras": [            |
|    "ext warranty",      |
|    "paint protection"   |
|  ],                     |
|  "make": "Honda",       |
|  "model": "Civic",      |
|  "price": "20275",      |
|  "year": "2017"         |
|}                        |
|{                        |
|  "extras": [            |
|    "ext warranty",      |
|    "rust proofing",     |
|    "fabric protection"  |
|  ],                     |
|  "make": "Toyota",      |
|  "model": "Camry",      |
|  "price": "23500",      |
|  "year": "2017"         |
|}                        |
---------------------------

------------------------------------
|"""SRC""['VEHICLE'][0]['PRICE']"  |
------------------------------------
|"20275"                           |
|"23500"                           |
------------------------------------

As an alternative to the apply method, you can use the get, get_ignore_case, or get_path functions if the field name or elements in the path are irregular and make it difficult to use the Column.apply methods.

For example, the following lines of code both print the value of a specified field in an object:

df.select(get(col("src"), lit("dealership"))).show()
df.select(col("src")("dealership")).show()

Similarly, the following lines of code both print the value of a field at a specified path in an object:

df.select(get_path(col("src"), lit("vehicle[0].make"))).show()
df.select(col("src")("vehicle")(0)("make")).show()

Explicitly Casting Values in Semi-Structured Data

By default, the values of fields and elements are returned as string literals (including the double quotes), as shown in the examples above.

To avoid unexpected results, call the cast method to cast the value to a specific type. For example, the following code prints out the values without and with casting:

// Import the objects for the data types, including StringType.
import com.snowflake.snowpark.types._
...
val df = session.table("car_sales")
df.select(col("src")("salesperson")("id")).show()
df.select(col("src")("salesperson")("id").cast(StringType)).show()

The code prints the following output:

----------------------------------
|"""SRC""['SALESPERSON']['ID']"  |
----------------------------------
|"55"                            |
|"274"                           |
----------------------------------

---------------------------------------------------
|"CAST (""SRC""['SALESPERSON']['ID'] AS STRING)"  |
---------------------------------------------------
|55                                               |
|274                                              |
---------------------------------------------------

Flattening an Array of Objects into Rows

If you need to “flatten” semi-structured data into a DataFrame (e.g. producing a row for every object in an array), call the DataFrame.flatten method. This method is equivalent to the FLATTEN SQL function. If you pass in a path to an object or array, the method returns a DataFrame that contains a row for each field or element in the object or array.

For example, in the sample data, src:customer is an array of objects that contain information about a customer. Each object contains a name and address field.

If you pass this path to the flatten function:

val df = session.table("car_sales")
df.flatten(col("src")("customer")).show()

the method returns a DataFrame:

----------------------------------------------------------------------------------------------------------------------------------------------------------
|"SRC"                                      |"SEQ"  |"KEY"  |"PATH"  |"INDEX"  |"VALUE"                            |"THIS"                               |
----------------------------------------------------------------------------------------------------------------------------------------------------------
|{                                          |1      |NULL   |[0]     |0        |{                                  |[                                    |
|  "customer": [                            |       |       |        |         |  "address": "San Francisco, CA",  |  {                                  |
|    {                                      |       |       |        |         |  "name": "Joyce Ridgely",         |    "address": "San Francisco, CA",  |
|      "address": "San Francisco, CA",      |       |       |        |         |  "phone": "16504378889"           |    "name": "Joyce Ridgely",         |
|      "name": "Joyce Ridgely",             |       |       |        |         |}                                  |    "phone": "16504378889"           |
|      "phone": "16504378889"               |       |       |        |         |                                   |  }                                  |
|    }                                      |       |       |        |         |                                   |]                                    |
|  ],                                       |       |       |        |         |                                   |                                     |
|  "date": "2017-04-28",                    |       |       |        |         |                                   |                                     |
|  "dealership": "Valley View Auto Sales",  |       |       |        |         |                                   |                                     |
|  "salesperson": {                         |       |       |        |         |                                   |                                     |
|    "id": "55",                            |       |       |        |         |                                   |                                     |
|    "name": "Frank Beasley"                |       |       |        |         |                                   |                                     |
|  },                                       |       |       |        |         |                                   |                                     |
|  "vehicle": [                             |       |       |        |         |                                   |                                     |
|    {                                      |       |       |        |         |                                   |                                     |
|      "extras": [                          |       |       |        |         |                                   |                                     |
|        "ext warranty",                    |       |       |        |         |                                   |                                     |
|        "paint protection"                 |       |       |        |         |                                   |                                     |
|      ],                                   |       |       |        |         |                                   |                                     |
|      "make": "Honda",                     |       |       |        |         |                                   |                                     |
|      "model": "Civic",                    |       |       |        |         |                                   |                                     |
|      "price": "20275",                    |       |       |        |         |                                   |                                     |
|      "year": "2017"                       |       |       |        |         |                                   |                                     |
|    }                                      |       |       |        |         |                                   |                                     |
|  ]                                        |       |       |        |         |                                   |                                     |
|}                                          |       |       |        |         |                                   |                                     |
|{                                          |2      |NULL   |[0]     |0        |{                                  |[                                    |
|  "customer": [                            |       |       |        |         |  "address": "New York, NY",       |  {                                  |
|    {                                      |       |       |        |         |  "name": "Bradley Greenbloom",    |    "address": "New York, NY",       |
|      "address": "New York, NY",           |       |       |        |         |  "phone": "12127593751"           |    "name": "Bradley Greenbloom",    |
|      "name": "Bradley Greenbloom",        |       |       |        |         |}                                  |    "phone": "12127593751"           |
|      "phone": "12127593751"               |       |       |        |         |                                   |  }                                  |
|    }                                      |       |       |        |         |                                   |]                                    |
|  ],                                       |       |       |        |         |                                   |                                     |
|  "date": "2017-04-28",                    |       |       |        |         |                                   |                                     |
|  "dealership": "Tindel Toyota",           |       |       |        |         |                                   |                                     |
|  "salesperson": {                         |       |       |        |         |                                   |                                     |
|    "id": "274",                           |       |       |        |         |                                   |                                     |
|    "name": "Greg Northrup"                |       |       |        |         |                                   |                                     |
|  },                                       |       |       |        |         |                                   |                                     |
|  "vehicle": [                             |       |       |        |         |                                   |                                     |
|    {                                      |       |       |        |         |                                   |                                     |
|      "extras": [                          |       |       |        |         |                                   |                                     |
|        "ext warranty",                    |       |       |        |         |                                   |                                     |
|        "rust proofing",                   |       |       |        |         |                                   |                                     |
|        "fabric protection"                |       |       |        |         |                                   |                                     |
|      ],                                   |       |       |        |         |                                   |                                     |
|      "make": "Toyota",                    |       |       |        |         |                                   |                                     |
|      "model": "Camry",                    |       |       |        |         |                                   |                                     |
|      "price": "23500",                    |       |       |        |         |                                   |                                     |
|      "year": "2017"                       |       |       |        |         |                                   |                                     |
|    }                                      |       |       |        |         |                                   |                                     |
|  ]                                        |       |       |        |         |                                   |                                     |
|}                                          |       |       |        |         |                                   |                                     |
----------------------------------------------------------------------------------------------------------------------------------------------------------

From this DataFrame, you can select the name and address fields from each object in the VALUE field:

df.flatten(col("src")("customer")).select(col("value")("name"), col("value")("address")).show()
-------------------------------------------------
|"""VALUE""['NAME']"   |"""VALUE""['ADDRESS']"  |
-------------------------------------------------
|"Joyce Ridgely"       |"San Francisco, CA"     |
|"Bradley Greenbloom"  |"New York, NY"          |
-------------------------------------------------

The following code adds to the previous example by casting the values to a specific type and changing the names of the columns:

df.flatten(col("src")("customer")).select(col("value")("name").cast(StringType).as("Customer Name"), col("value")("address").cast(StringType).as("Customer Address")).show()
-------------------------------------------
|"Customer Name"     |"Customer Address"  |
-------------------------------------------
|Joyce Ridgely       |San Francisco, CA   |
|Bradley Greenbloom  |New York, NY        |
-------------------------------------------

Executing SQL Statements

To execute a SQL statement that you specify, call the sql method in the Session class, and pass in the statement to be executed. The method returns a DataFrame.

Note that the SQL statement won’t be executed until you call an action method.

// Get the list of the files in a stage.
// The collect() method causes this SQL statement to be executed.
val stageFilesDf = session.sql("ls @myStage").collect()

// Resume the operation of a warehouse.
// Note that you must call the collect method in order to execute
// the SQL statement.
session.sql("alter warehouse if exists myWarehouse resume if suspended").collect()

val tableDf = session.table("table").select(col("a"), col("b"))
// Get the count of rows from the table.
val numRows = tableDf.count()

// Set up a SQL statement to copy data from a stage to a table.
val copyDf = session.sql("copy into myTable from @myStage file_format=(type = csv)").collect()

If you want to call methods to transform the DataFrame (e.g. filter, select, etc.), note that these methods work only if the underlying SQL statement is a SELECT statement. The transformation methods are not supported for other kinds of SQL statements.

val df = session.sql("select a, c from table where b < 1")
// Because the underlying SQL statement for the DataFrame is a SELECT statement,
// you can call the filter method to transform this DataFrame.
val results = df.filter(col("c") < 10).select(col("a")).collect()

// In this example, the underlying SQL statement is not a SELECT statement.
val df = session.sql("ls @myStage")
// Calling the filter method results in an error.
df.filter(...)