package snowpark
- Alphabetic
- Public
- All
Type Members
-
class
AsyncJob
extends
AnyRef
Provides a way to track an asynchronous query in Snowflake.
Provides a way to track an asynchronous query in Snowflake.
You can use this object to check the status of an asynchronous query and retrieve the results.
To check the status of an asynchronous query that you submitted earlier, call Session.createAsyncJob , and pass in the query ID. This returns an
AsyncJob
object that you can use to check the status of the query and retrieve the query results.Example 1: Create an AsyncJob by specifying a valid
<query_id>
, check whether the query is running or not, and get the result rows.val asyncJob = session.createAsyncJob(<query_id>) println(s"Is query ${asyncJob.getQueryId()} running? ${asyncJob.isRunning()}") val rows = asyncJob.getRows()
Example 2: Create an AsyncJob by specifying a valid
<query_id>
and cancel the query if it is still running.session.createAsyncJob(<query_id>).cancel()
- Since
-
0.11.0
-
class
CaseExpr
extends
Column
Represents a CASE expression.
Represents a CASE expression.
To construct this object for a CASE expression, call the functions.when . specifying a condition and the corresponding result for that condition. Then, call the when and otherwise methods to specify additional conditions and results.
For example:
import com.snowflake.snowpark.functions._ df.select( when(col("col").is_null, lit(1)) .when(col("col") === 1, lit(2)) .otherwise(lit(3)) )
- Since
-
0.2.0
-
case class
Column
extends
Logging
with
Product
with
Serializable
Represents a column or an expression in a DataFrame.
Represents a column or an expression in a DataFrame.
To create a Column object to refer to a column in a DataFrame, you can:
- Use the functions.col function.
- Use the DataFrame.col method.
-
Use the shorthand for the
DataFrame.apply
method (
<dataframe>
("<col_name>")).
For example:
import com.snowflake.snowpark.functions.col df.select(col("name")) df.select(df.col("name")) dfLeft.select(dfRight, dfLeft("name") === dfRight("name"))
This class also defines utility functions for constructing expressions with Columns.
The following examples demonstrate how to use Column objects in expressions:
df .filter(col("id") === 20) .filter((col("a") + col("b")) < 10) .select((col("b") * 10) as "c")
- Since
-
0.1.0
-
class
CopyableDataFrame
extends
DataFrame
DataFrame for loading data from files in a stage to a table.
DataFrame for loading data from files in a stage to a table. Objects of this type are returned by the DataFrameReader methods that load data from files (e.g. csv ).
To save the data from the staged files to a table, call the
copyInto()
methods. This method uses the COPY INTO<table_name>
command to copy the data to a specified table.- Since
-
0.9.0
-
class
CopyableDataFrameAsyncActor
extends
DataFrameAsyncActor
Provides APIs to execute CopyableDataFrame actions asynchronously.
Provides APIs to execute CopyableDataFrame actions asynchronously.
- Since
-
0.11.0
-
class
DataFrame
extends
Logging
Represents a lazily-evaluated relational dataset that contains a collection of Row objects with columns defined by a schema (column name and type).
Represents a lazily-evaluated relational dataset that contains a collection of Row objects with columns defined by a schema (column name and type).
A DataFrame is considered lazy because it encapsulates the computation or query required to produce a relational dataset. The computation is not performed until you call a method that performs an action (e.g. collect ).
Creating a DataFrame
You can create a DataFrame in a number of different ways, as shown in the examples below.
Example 1: Creating a DataFrame by reading a table.
val dfPrices = session.table("itemsdb.publicschema.prices")
Example 2: Creating a DataFrame by reading files from a stage.
val dfCatalog = session.read.csv("@stage/some_dir")
Example 3: Creating a DataFrame by specifying a sequence or a range.
val df = session.createDataFrame(Seq((1, "one"), (2, "two")))
val df = session.range(1, 10, 2)
Example 4: Create a new DataFrame by applying transformations to other existing DataFrames.
val dfMergedData = dfCatalog.join(dfPrices, dfCatalog("itemId") === dfPrices("ID"))
Performing operations on a DataFrame
Broadly, the operations on DataFrame can be divided into two types:
- Transformations produce a new DataFrame from one or more existing DataFrames. Note that tranformations are lazy and don't cause the DataFrame to be evaluated. If the API does not provide a method to express the SQL that you want to use, you can use functions.sqlExpr as a workaround.
- Actions cause the DataFrame to be evaluated. When you call a method that performs an action, Snowpark sends the SQL query for the DataFrame to the server for evaluation.
Transforming a DataFrame
The following examples demonstrate how you can transform a DataFrame.
Example 5. Using the select method to select the columns that should be in the DataFrame (similar to adding a
SELECT
clause).// Return a new DataFrame containing the ID and amount columns of the prices table. This is // equivalent to: // SELECT ID, AMOUNT FROM PRICES; val dfPriceIdsAndAmounts = dfPrices.select(col("ID"), col("amount"))
Example 6. Using the Column.as method to rename a column in a DataFrame (similar to using
SELECT col AS alias
).// Return a new DataFrame containing the ID column of the prices table as a column named // itemId. This is equivalent to: // SELECT ID AS itemId FROM PRICES; val dfPriceItemIds = dfPrices.select(col("ID").as("itemId"))
Example 7. Using the filter method to filter data (similar to adding a
WHERE
clause).// Return a new DataFrame containing the row from the prices table with the ID 1. This is // equivalent to: // SELECT * FROM PRICES WHERE ID = 1; val dfPrice1 = dfPrices.filter((col("ID") === 1))
Example 8. Using the sort method to specify the sort order of the data (similar to adding an
ORDER BY
clause).// Return a new DataFrame for the prices table with the rows sorted by ID. This is equivalent // to: // SELECT * FROM PRICES ORDER BY ID; val dfSortedPrices = dfPrices.sort(col("ID"))
Example 9. Using the groupBy method to return a RelationalGroupedDataFrame that you can use to group and aggregate results (similar to adding a
GROUP BY
clause).RelationalGroupedDataFrame provides methods for aggregating results, including:
- avg (equivalent to AVG(column))
- count (equivalent to COUNT())
- max (equivalent to MAX(column))
- median (equivalent to MEDIAN(column))
- min (equivalent to MIN(column))
- sum (equivalent to SUM(column))
// Return a new DataFrame for the prices table that computes the sum of the prices by // category. This is equivalent to: // SELECT CATEGORY, SUM(AMOUNT) FROM PRICES GROUP BY CATEGORY; val dfTotalPricePerCategory = dfPrices.groupBy(col("category")).sum(col("amount"))
Example 10. Using a Window to build a WindowSpec object that you can use for windowing functions (similar to using '<function> OVER ... PARTITION BY ... ORDER BY').
// Define a window that partitions prices by category and sorts the prices by date within the // partition. val window = Window.partitionBy(col("category")).orderBy(col("price_date")) // Calculate the running sum of prices over this window. This is equivalent to: // SELECT CATEGORY, PRICE_DATE, SUM(AMOUNT) OVER // (PARTITION BY CATEGORY ORDER BY PRICE_DATE) // FROM PRICES ORDER BY PRICE_DATE; val dfCumulativePrices = dfPrices.select( col("category"), col("price_date"), sum(col("amount")).over(window)).sort(col("price_date"))
Performing an action on a DataFrame
The following examples demonstrate how you can perform an action on a DataFrame.
Example 11: Performing a query and returning an array of Rows.
val results = dfPrices.collect()
Example 12: Performing a query and print the results.
dfPrices.show()
- Since
-
0.1.0
-
class
DataFrameAsyncActor
extends
AnyRef
Provides APIs to execute DataFrame actions asynchronously.
Provides APIs to execute DataFrame actions asynchronously.
- Since
-
0.11.0
-
final
class
DataFrameNaFunctions
extends
Logging
Provides functions for handling missing values in a DataFrame.
Provides functions for handling missing values in a DataFrame.
- Since
-
0.2.0
-
class
DataFrameReader
extends
AnyRef
Provides methods to load data in various supported formats from a Snowflake stage to a DataFrame.
Provides methods to load data in various supported formats from a Snowflake stage to a DataFrame. The paths provided to the DataFrameReader must refer to Snowflake stages.
To use this object:
- Access an instance of a DataFrameReader by calling the Session.read method.
- Specify any format-specific options and copy options by calling the option or options method. These methods return a DataFrameReader that is configured with these options. (Note that although specifying copy options can make error handling more robust during the reading process, it may have an effect on performance.)
- Specify the schema of the data that you plan to load by constructing a types.StructType object and passing it to the schema method. This method returns a DataFrameReader that is configured to read data that uses the specified schema.
- Specify the format of the data by calling the method named after the format (e.g. csv , json , etc.). These methods return a DataFrame that is configured to load data in the specified format.
-
Call a
DataFrame
method that performs an action.
- For example, to load the data from the file, call DataFrame.collect .
-
As another example, to save the data from the file to a table, call
CopyableDataFrame.copyInto(tableName:String)*
.
This uses the COPY INTO
<table_name>
command.
The following examples demonstrate how to use a DataFrameReader.
Example 1: Loading the first two columns of a CSV file and skipping the first header line.
// Import the package for StructType. import com.snowflake.snowpark.types._ val filePath = "@mystage1" // Define the schema for the data in the CSV file. val userSchema = StructType(Seq(StructField("a", IntegerType), StructField("b", StringType))) // Create a DataFrame that is configured to load data from the CSV file. val csvDF = session.read.option("skip_header", 1).schema(userSchema).csv(filePath) // Load the data into the DataFrame and return an Array of Rows containing the results. val results = csvDF.collect()
Example 2: Loading a gzip compressed json file.
val filePath = "@mystage2/data.json.gz" // Create a DataFrame that is configured to load data from the gzipped JSON file. val jsonDF = session.read.option("compression", "gzip").json(filePath) // Load the data into the DataFrame and return an Array of Rows containing the results. val results = jsonDF.collect()
If you want to load only a subset of files from the stage, you can use the pattern option to specify a regular expression that matches the files that you want to load.
Example 3: Loading only the CSV files from a stage location.
import com.snowflake.snowpark.types._ // Define the schema for the data in the CSV files. val userSchema: StructType = StructType(Seq(StructField("a", IntegerType),StructField("b", StringType))) // Create a DataFrame that is configured to load data from the CSV files in the stage. val csvDF = session.read.option("pattern", ".*[.]csv").schema(userSchema).csv("@stage_location") // Load the data into the DataFrame and return an Array of Rows containing the results. val results = csvDF.collect()
In addition, if you want to load the files from the stage into a specified table with COPY INTO
<table_name>
command, you can use acopyInto()
method e.g. CopyableDataFrame.copyInto(tableName:String)* .Example 4: Loading data from a JSON file in a stage to a table by using COPY INTO
<table_name>
.val filePath = "@mystage1" // Create a DataFrame that is configured to load data from the JSON file. val jsonDF = session.read.json(filePath) // Load the data into the specified table `T1`. // The table "T1" should exist before calling copyInto(). jsonDF.copyInto("T1")
- Since
-
0.1.0
-
final
class
DataFrameStatFunctions
extends
Logging
Provides eagerly computed statistical functions for DataFrames.
Provides eagerly computed statistical functions for DataFrames.
To access an object of this class, use DataFrame.stat .
- Since
-
0.2.0
-
class
DataFrameWriter
extends
AnyRef
Provides methods for writing data from a DataFrame to supported output destinations.
Provides methods for writing data from a DataFrame to supported output destinations.
You can write data to the following locations:
- A Snowflake table
- A file on a stage
Saving Data to a Table
To use this object to write into a table:
- Access an instance of a DataFrameWriter by calling the DataFrame.write method.
- Specify the save mode to use (overwrite or append) by calling the mode method. This method returns a DataFrameWriter that is configured to save data using the specified mode. The default SaveMode is SaveMode.Append .
- (Optional) If you need to set some options for the save operation (e.g. columnOrder), call the options or option method.
-
Call a
saveAs*
method to save the data to the specified destination.
For example:
df.write.mode("overwrite").saveAsTable("T")
Saving Data to a File on a Stage
To save data to a file on a stage:
- Access an instance of a DataFrameWriter by calling the DataFrame.write method.
- Specify the save mode to use (Overwrite or ErrorIfExists) by calling the mode method. This method returns a DataFrameWriter that is configured to save data using the specified mode. The default SaveMode is SaveMode.ErrorIfExists for this case.
- (Optional) If you need to set some options for the save operation (e.g. file format options), call the options or option method.
- Call the method named after a file format to save the data in the specified format:
For example:
Example 1: Write a DataFrame to a CSV file.
val result = df.write.csv("@myStage/prefix")
Example 2: Write a DataFrame to a CSV file without compression.
val result = df.write.option("compression", "none").csv("@myStage/prefix")
- Since
-
0.1.0
-
class
DataFrameWriterAsyncActor
extends
AnyRef
Provides APIs to execute DataFrameWriter actions asynchronously.
Provides APIs to execute DataFrameWriter actions asynchronously.
- Since
-
0.11.0
-
case class
DeleteResult
(
rowsDeleted:
Long
)
extends
Product
with
Serializable
Result of deleting rows in an Updatable
Result of deleting rows in an Updatable
- Since
-
0.7.0
-
final
class
FileOperation
extends
Logging
Provides methods for working on files in a stage.
Provides methods for working on files in a stage.
To access an object of this class, use Session.file .
For example:
// Upload a file to a stage. session.file.put("file:///tmp/file1.csv", "@myStage/prefix1") // Download a file from a stage. session.file.get("@myStage/prefix1/file1.csv", "file:///tmp")
- Since
-
0.4.0
-
case class
GetResult
(
fileName:
String
,
sizeBytes:
Long
,
status:
String
,
encryption:
String
,
message:
String
)
extends
Product
with
Serializable
Represents the results of downloading a file from a stage location to the local file system.
Represents the results of downloading a file from a stage location to the local file system.
NOTE:
fileName
is the relative path to the file on the stage. For example, if you download@myStage/prefix1/file1.csv.gz
,fileName
isprefix1/file1.csv.gz
.- Since
-
0.4.0
-
case class
GroupingSets
(
sets:
Seq
[
Set
[
Column
]]
)
extends
Product
with
Serializable
A Container of grouping sets that you pass to DataFrame.groupByGroupingSets .
A Container of grouping sets that you pass to DataFrame.groupByGroupingSets .
- sets
-
a list of grouping sets
- Since
-
0.4.0
-
class
HasCachedResult
extends
DataFrame
A DataFrame that returns cached data.
A DataFrame that returns cached data. Repeated invocations of actions on this type of dataframe are guaranteed to produce the same results. It is returned from
cacheResult
functions (e.g. DataFrame.cacheResult ).- Since
-
0.4.0
-
class
MatchedClauseBuilder
extends
AnyRef
Builder for a matched clause.
Builder for a matched clause. It provides APIs to build update and delete actions
- Since
-
0.7.0
-
class
MergeBuilder
extends
AnyRef
Builder for a merge action.
Builder for a merge action. It provides APIs to build matched and not matched clauses.
- Since
-
0.7.0
-
class
MergeBuilderAsyncActor
extends
AnyRef
Provides APIs to execute MergeBuilder actions asynchronously.
Provides APIs to execute MergeBuilder actions asynchronously.
- Since
-
1.3.0
-
case class
MergeResult
(
rowsInserted:
Long
,
rowsUpdated:
Long
,
rowsDeleted:
Long
)
extends
Product
with
Serializable
Result of merging a DataFrame into an Updatable DataFrame
Result of merging a DataFrame into an Updatable DataFrame
- Since
-
0.7.0
-
class
MergeTypedAsyncJob
extends
TypedAsyncJob
[
MergeResult
]
Provides a way to track an asynchronously executed action in a MergeBuilder.
Provides a way to track an asynchronously executed action in a MergeBuilder.
- Since
-
1.3.0
-
class
NotMatchedClauseBuilder
extends
AnyRef
Builder for a not matched clause.
Builder for a not matched clause. It provides APIs to build insert actions
- Since
-
0.7.0
-
class
PublicPreview
extends
Annotation
with
Annotation
with
ClassfileAnnotation
- Annotations
- @Documented ()
-
case class
PutResult
(
sourceFileName:
String
,
targetFileName:
String
,
sourceSizeBytes:
Long
,
targetSizeBytes:
Long
,
sourceCompression:
String
,
targetCompression:
String
,
status:
String
,
encryption:
String
,
message:
String
)
extends
Product
with
Serializable
Represents the results of uploading a local file to a stage location.
Represents the results of uploading a local file to a stage location.
- Since
-
0.4.0
-
class
RelationalGroupedDataFrame
extends
AnyRef
Represents an underlying DataFrame with rows that are grouped by common values.
Represents an underlying DataFrame with rows that are grouped by common values. Can be used to define aggregations on these grouped DataFrames.
Example:
val groupedDf: RelationalGroupedDataFrame = df.groupBy("dept") val aggDf: DataFrame = groupedDf.agg(groupedDf("salary") -> "mean")
The methods DataFrame.groupBy , DataFrame.cube and DataFrame.rollup return an instance of type RelationalGroupedDataFrame
- Since
-
0.1.0
-
class
Row
extends
Serializable
Represents a row returned by the evaluation of a DataFrame .
Represents a row returned by the evaluation of a DataFrame .
- Since
-
0.1.0
-
class
SProcRegistration
extends
AnyRef
Provides methods to register a SProc (Stored Procedure) in the Snowflake database.
Provides methods to register a SProc (Stored Procedure) in the Snowflake database.
Session.sproc returns an object of this class.
To register anonymous temporary SProcs which work in the current session:
val sp = session.sproc.registerTemporary((session: Session, num: Int) => s"num: $num") session.storedProcedure(sp, 123)
To register named temporary SProcs which work in the current session:
val name = "sproc" val sp = session.sproc.registerTemporary(name, (session: Session, num: Int) => s"num: $num") session.storedProcedure(sp, 123) session.storedProcedure(name, 123)
It requires a user stage when registering a permanent SProc. Snowpark will upload all JAR files for the SProc and any dependencies. It is also required to specify Owner or Caller modes via the parameter 'isCallerMode'.
val name = "sproc" val stageName = "<a user stage name>" val sp = session.sproc.registerPermanent(name, (session: Session, num: Int) => s"num: $num", stageName, isCallerMode = true) session.storedProcedure(sp, 123) session.storedProcedure(name, 123)
This object also provides a convenient methods to execute SProc lambda functions directly with current session on the client side. The functions are designed for debugging and development only. Since the local and Snowflake server environments are different, the outputs of executing a SP function with these test function and on Snowflake server may be different too.
// a client side Scala lambda val func = (session: Session, num: Int) => s"num: $num" // register a server side stored procedure val sp = session.sproc.registerTemporary(func) // execute the lambda function of this SP from the client side val localResult = session.sproc.runLocally(func, 123) // execute this SP from the server side val resultDF = session.storedProcedure(sp, 123)
- Since
-
1.8.0
-
sealed
trait
SaveMode
extends
AnyRef
Please refer to the companion SaveMode$ object.
Please refer to the companion SaveMode$ object.
- Since
-
0.1.0
-
class
Session
extends
Logging
Establishes a connection with a Snowflake database and provides methods for creating DataFrames and accessing objects for working with files in stages.
Establishes a connection with a Snowflake database and provides methods for creating DataFrames and accessing objects for working with files in stages.
When you create a
Session
object, you provide configuration settings to establish a connection with a Snowflake database (e.g. the URL for the account, a user name, etc.). You can specify these settings in a configuration file or in a Map that associates configuration setting names with values.To create a Session from a file:
val session = Session.builder.configFile("/path/to/file.properties").create
To create a Session from a map of configuration properties:
val configMap = Map( "URL" -> "demo.snowflakecomputing.com", "USER" -> "testUser", "PASSWORD" -> "******", "ROLE" -> "myrole", "WAREHOUSE" -> "warehouse1", "DB" -> "db1", "SCHEMA" -> "schema1" ) Session.builder.configs(configMap).create
Session contains functions to construct DataFrame s like Session.table , Session.sql , and Session.read
- Since
-
0.1.0
-
class
SnowparkClientException
extends
RuntimeException
Represents a Snowpark client side exception.
Represents a Snowpark client side exception.
- Since
-
0.1.0
-
case class
StoredProcedure
extends
Product
with
Serializable
The reference to a Stored Procedure which can be created by
Session.sproc.register
methods, and used inSession.storedProcedure
method.The reference to a Stored Procedure which can be created by
Session.sproc.register
methods, and used inSession.storedProcedure
method.For example:
val sp = session.sproc.registerTemporary( (session: Session, num: Int) => { val result = session.sql(s"select $num").collect().head.getInt(0) result + 100 }) session.storedProcedure(sp, 123).show()
- Since
-
1.8.0
-
case class
TableFunction
(
funcName:
String
)
extends
Product
with
Serializable
Looks up table functions by funcName and returns tableFunction object which can be used in DataFrame.join and Session.tableFunction methods.
Looks up table functions by funcName and returns tableFunction object which can be used in DataFrame.join and Session.tableFunction methods.
It can reference both system-defined table function and user-defined table functions.
Example
import com.snowflake.snowpark.functions._ import com.snowflake.snowpark.TableFunction session.tableFunction( TableFunction("flatten"), Map("input" -> parse_json(lit("[1,2]"))) ) df.join(TableFunction("split_to_table"), df("a"), lit(","))
- funcName
-
table function name, can be a short name like func or a fully qualified name like database.schema.func
- Since
-
0.4.0
-
class
TypedAsyncJob
[
T
]
extends
AsyncJob
Provides a way to track an asynchronously executed action in a DataFrame.
Provides a way to track an asynchronously executed action in a DataFrame.
To get the result of the action (e.g. the number of results from a
count()
action or an Array of Row objects from thecollect()
action), call the getResult method.To perform an action on a DataFrame asynchronously, call an action method on the DataFrameAsyncActor object returned by DataFrame.async . For example:
val asyncJob1 = df.async.collect() val asyncJob2 = df.async.toLocalIterator() val asyncJob3 = df.async.count()
Each of these methods returns a TypedAsyncJob object that you can use to get the results of the action.
- Since
-
0.11.0
-
class
UDFRegistration
extends
Logging
Provides methods to register lambdas and functions as UDFs in the Snowflake database.
Provides methods to register lambdas and functions as UDFs in the Snowflake database.
Session.udf returns an object of this class.
You can use this object to register temporary UDFs that you plan to use in the current session:
session.udf.registerTemporary("mydoubleudf", (x: Int) => x * x) session.sql(s"SELECT mydoubleudf(c) from T)
You can also register permanent UDFs that you can use in subsequent sessions. When registering a permanent UDF, you must specify a stage where the registration method will upload the JAR files for the UDF and any dependencies.
session.udf.registerPermanent("mydoubleudf", (x: Int) => x * x, "mystage") session.sql(s"SELECT mydoubleudf(c) from T)
The methods that register a UDF return a UserDefinedFunction object, which you can use in Column expressions.
val myUdf = session.udf.registerTemporary("mydoubleudf", (x: Int) => x * x) session.table("T").select(myUdf(col("c")))
If you do not need to refer to a UDF by name, use com.snowflake.snowpark.functions.udf to create an anonymous UDF instead.
Snowflake supports the following data types for the parameters for a UDF:
SQL Type
Scala Type
Notes
NUMBER
Short or Option[Short]
Supported
NUMBER
Int or Option[Int]
Supported
NUMBER
Long or Option[Long]
Supported
FLOAT
Float or Option[Float]
Supported
DOUBLE
Double or Option[Double]
Supported
NUMBER
java.math.BigDecimal
Supported
VARCHAR
String or java.lang.String
Supported
BOOL
Boolean or Option[Boolean]
Supported
DATE
java.sql.Date
Supported
TIMESTAMP
java.sql.Timestamp
Supported
BINARY
Array[Byte]
Supported
ARRAY
Array[String] or Array[Variant]
Supported array of type Array[String] or Array[Variant]
OBJECT
Map[String, String] or Map[String, Variant]
Supported mutable map of type scala.collection.mutable.Map[String, String] or scala.collection.mutable.Map[String, Variant]
GEOGRAPHY
com.snowflake.snowpark.types.Geography
Supported
VARIANT
com.snowflake.snowpark.types.Variant
Supported
- Since
-
0.1.0
-
class
UDTFRegistration
extends
Logging
Provides methods to register a UDTF (user-defined table function) in the Snowflake database.
Provides methods to register a UDTF (user-defined table function) in the Snowflake database.
Session.udtf returns an object of this class.
To register an UDTF, you must:
- Define a UDTF class.
- Create an instance of that class, and register that instance as a UDTF.
The next sections describe these steps in more detail.
Defining the UDTF Class
Define a class that inherits from one of the
UDTF[N]
classes (e.g.UDTF0
,UDTF1
, etc.), where n specifies the number of input arguments for your UDTF. For example, if your UDTF passes in 3 input arguments, extend theUDTF3
class.In your class, override the following three methods:
-
process()
, which is called once for each row in the input partition. -
endPartition()
, which is called once for each partition after all rows have been passed toprocess()
. -
outputSchema()
, which returns a types.StructType object that describes the schema for the returned rows.
When a UDTF is called, the rows are grouped into partitions before they are passed to the UDTF:
- If the statement that calls the UDTF specifies the PARTITION clause (explicit partitions), that clause determines how the rows are partitioned.
- If the statement does not specify the PARTITION clause (implicit partitions), Snowflake determines how best to partition the rows.
For an explanation of partitions, see Table Functions and Partitions
Defining the process() Method
This method is invoked once for each row in the input partition.
The arguments passed to the registered UDTF are passed to
process()
. For each argument passed to the UDTF, you must have a corresponding argument in the signature of theprocess()
method. Make sure that the type of the argument in theprocess()
method matches the Snowflake data type of the corresponding argument in the UDTF.Snowflake supports the following data types for the parameters for a UDTF:
SQL Type
Scala Type
Notes
NUMBER
Short or Option[Short]
Supported
NUMBER
Int or Option[Int]
Supported
NUMBER
Long or Option[Long]
Supported
FLOAT
Float or Option[Float]
Supported
DOUBLE
Double or Option[Double]
Supported
NUMBER
java.math.BigDecimal
Supported
VARCHAR
String or java.lang.String
Supported
BOOL
Boolean or Option[Boolean]
Supported
DATE
java.sql.Date
Supported
TIMESTAMP
java.sql.Timestamp
Supported
BINARY
Array[Byte]
Supported
ARRAY
Array[String] or Array[Variant]
Supported array of type Array[String] or Array[Variant]
OBJECT
Map[String, String] or Map[String, Variant]
Supported mutable map of type scala.collection.mutable.Map[String, String] or scala.collection.mutable.Map[String, Variant]
VARIANT
com.snowflake.snowpark.types.Variant
Supported
Defining the endPartition() Method
This method is invoked once for each partition, after all rows in that partition have been passed to the
process()
method.You can use this method to generate output rows, based on any state information that you aggregate in the
process()
method.Defining the outputSchema() Method
In this method, define the output schema for the rows returned by the
process()
andendPartition()
methods.Construct and return a types.StructType object that uses an Array of types.StructField objects to specify the Snowflake data type of each field in a returned row.
Snowflake supports the following DataTypes for the output schema for a UDTF:
DataType
SQL Type
Notes
BooleanType
Boolean
Supported
ShortType
NUMBER
Supported
IntegerType
NUMBER
Supported
LongType
NUMBER
Supported
DecimalType
NUMBER
Supported
FloatType
FLOAT
Supported
DoubleType
DOUBLE
Supported
StringType
VARCHAR
Supported
BinaryType
BINARY
Supported
TimeType
TIME
Supported
DateType
DATE
Supported
TimestampType
TIMESTAMP
Supported
VariantType
VARIANT
Supported
ArrayType(StringType)
ARRAY
Supported
ArrayType(VariantType)
ARRAY
Supported
MapType(StringType, StringType)
OBJECT
Supported
MapType(StringType, VariantType)
OBJECT
Supported
Example of a UDTF Class
The following is an example of a UDTF class that generates a range of rows.
The UDTF passes in 2 arguments, so the class extends
UDTF2
.The arguments
start
andcount
specify the starting number for the row and the number of rows to generate.class MyRangeUdtf extends UDTF2[Int, Int] { override def process(start: Int, count: Int): Iterable[Row] = (start until (start + count)).map(Row(_)) override def endPartition(): Iterable[Row] = Array.empty[Row] override def outputSchema(): StructType = StructType(StructField("C1", IntegerType)) }
Registering the UDTF
Next, create an instance of the new class, and register the class by calling one of the UDTFRegistration methods. You can register a temporary or permanent UDTF by name. If you don't need to call the UDTF by name, you can register an anonymous UDTF.
Registering a Temporary UDTF By Name
To register a temporary UDTF by name, call
registerTemporary
, passing in a name for the UDTF and an instance of the UDTF class. For example:// Use the MyRangeUdtf defined in previous example. val tableFunction = session.udtf.registerTemporary("myUdtf", new MyRangeUdtf()) session.tableFunction(tableFunction, lit(10), lit(5)).show
Registering a Permanent UDTF By Name
If you need to use the UDTF in subsequent sessions, register a permanent UDTF.
When registering a permanent UDTF, you must specify a stage where the registration method will upload the JAR files for the UDTF and its dependencies. For example:
val tableFunction = session.udtf.registerPermanent("myUdtf", new MyRangeUdtf(), "@myStage") session.tableFunction(tableFunction, lit(10), lit(5)).show
Registering an Anonymous Temporary UDTF
If you do not need to refer to a UDTF by name, use UDTF) to create an anonymous UDTF instead.
Calling a UDTF
The methods that register a UDTF return a TableFunction object, which you can use in Session.tableFunction .
val tableFunction = session.udtf.registerTemporary("myUdtf", new MyRangeUdtf()) session.tableFunction(tableFunction, lit(10), lit(5)).show
- Since
-
1.2.0
-
class
Updatable
extends
DataFrame
Represents a lazily-evaluated Updatable.
Represents a lazily-evaluated Updatable. It extends DataFrame so all DataFrame operations can be applied on it.
Creating an Updatable
You can create an Updatable by calling session.table with the name of the Updatable.
Example 1: Creating a Updatable by reading a table.
val dfPrices = session.table("itemsdb.publicschema.prices")
- Since
-
0.7.0
-
class
UpdatableAsyncActor
extends
DataFrameAsyncActor
Provides APIs to execute Updatable actions asynchronously.
Provides APIs to execute Updatable actions asynchronously.
- Since
-
0.11.0
-
case class
UpdateResult
(
rowsUpdated:
Long
,
multiJoinedRowsUpdated:
Long
)
extends
Product
with
Serializable
Result of updating rows in an Updatable
Result of updating rows in an Updatable
- Since
-
0.7.0
-
case class
UserDefinedFunction
extends
Product
with
Serializable
Encapsulates a user defined lambda or function that is returned by UDFRegistration.registerTemporary or by com.snowflake.snowpark.functions.udf .
Encapsulates a user defined lambda or function that is returned by UDFRegistration.registerTemporary or by com.snowflake.snowpark.functions.udf .
Use UserDefinedFunction.apply to generate Column expressions from an instance.
import com.snowflake.snowpark.functions._ val myUdf = udf((x: Int, y: String) => y + x) df.select(myUdf(col("i"), col("s")))
- Since
-
0.1.0
-
class
WindowSpec
extends
AnyRef
Represents a window frame clause.
Represents a window frame clause.
- Since
-
0.1.0
-
case class
WriteFileResult
(
rows:
Array
[
Row
]
,
schema:
StructType
)
extends
Product
with
Serializable
Represents the results of writing data from a DataFrame to a file in a stage.
Represents the results of writing data from a DataFrame to a file in a stage.
To write the data, the DataFrameWriter effectively executes the
COPY INTO <location>
command. WriteFileResult encapsulates the output returned by the command:-
rows
represents the rows of output from the command. -
schema
defines the schema for these rows.
For example, if the DETAILED_OUTPUT option is TRUE, each row contains a
file_name
,file_size
, androw_count
field.schema
defines the names and types of these fields. If the DETAILED_OUTPUT option is not specified (meaning that the option is FALSE), each row contains arows_unloaded
,input_bytes
, andoutput_bytes
field.- rows
-
The output rows produced by the
COPY INTO <location>
command. - schema
-
The names and types of the fields in the output rows.
- Since
-
1.5.0
-
Value Members
-
object
GroupingSets
extends
Serializable
Constructors of GroupingSets object.
Constructors of GroupingSets object.
- Since
-
0.4.0
-
object
Row
extends
Serializable
- Since
-
0.1.0
-
object
SaveMode
SaveMode configures the behavior when data is written from a DataFrame to a data source using a DataFrameWriter instance.
SaveMode configures the behavior when data is written from a DataFrame to a data source using a DataFrameWriter instance.
- Since
-
0.1.0
-
object
Session
extends
Logging
Companion object to Session that you use to build and create a session.
Companion object to Session that you use to build and create a session.
- Since
-
0.1.0
-
object
Window
Contains functions to form WindowSpec .
Contains functions to form WindowSpec .
- Since
-
0.1.0
-
object
functions
Provides utility functions that generate Column expressions that you can pass to DataFrame transformation methods.
Provides utility functions that generate Column expressions that you can pass to DataFrame transformation methods. These functions generate references to columns, literals, and SQL expressions (e.g. "c + 1").
This object also provides functions that correspond to Snowflake system-defined functions (built-in functions), including functions for aggregation and window functions.
The following examples demonstrate the use of some of these functions:
// Use columns and literals in expressions. df.select(col("c") + lit(1)) // Call system-defined (built-in) functions. // This example calls the function that corresponds to the ADD_MONTHS() SQL function. df.select(add_months(col("d"), lit(3))) // Call system-defined functions that have no corresponding function in the functions object. // This example calls the RADIANS() SQL function, passing in values from the column "e". df.select(callBuiltin("radians", col("e"))) // Call a user-defined function (UDF) by name. df.select(callUDF("some_func", col("c"))) // Register and call an anonymous UDF. val myudf = udf((x:Int) => x + x) df.select(myudf(col("c"))) // Evaluate an SQL expression df.select(sqlExpr("c + 1"))
For functions that accept scala types, e.g. callUdf, callBuiltin, lit(), the mapping from scala types to Snowflake types is as follows:
String => String Byte => TinyInt Int => Int Short => SmallInt Long => BigInt Float => Float Double => Double Decimal => Number Boolean => Boolean Array => Array Timestamp => Timestamp Date => Date
- Since
-
0.1.0
-
object
tableFunctions
Provides utility functions that generate table function expressions that can be passed to DataFrame join method and Session tableFunction method.
Provides utility functions that generate table function expressions that can be passed to DataFrame join method and Session tableFunction method.
This object also provides functions that correspond to Snowflake system-defined table functions .
The following examples demonstrate the use of some of these functions:
import com.snowflake.snowpark.functions.parse_json // Creates DataFrame from Session.tableFunction session.tableFunction(tableFunctions.flatten, Map("input" -> parse_json(lit("[1,2]")))) session.tableFunction(tableFunctions.split_to_table, "split by space", " ") // DataFrame joins table function df.join(tableFunctions.flatten, Map("input" -> parse_json(df("a")))) df.join(tableFunctions.split_to_table, df("a"), ",") // Invokes any table function including user-defined table function df.join(tableFunctions.tableFunction("flatten"), Map("input" -> parse_json(df("a")))) session.tableFunction(tableFunctions.tableFunction("split_to_table"), "split by space", " ")
- Since
-
0.4.0