Creating User-Defined Functions (UDFs) for DataFrames

The Snowpark API provides methods that you can use to create a user-defined function from a lambda or function in Scala. This topic explains how to create these types of functions.

In this Topic:

With Snowpark, you can create user-defined functions (UDFs) for your custom lambdas and functions, and you can call these UDFs to process the data in your DataFrame.

When you use the Snowpark API to create an UDF, the Snowpark library uploads the code for your function to an internal stage. When you call the UDF, the Snowpark library executes your function on the server, where the data is. As a result, the data doesn’t need to be transferred to the client in order for the function to process the data.

In your custom code, you can also call code that is packaged in JAR files (for example, Java classes for a third-party library).

You can create a UDF for your custom code in one of two ways:

  • You can create an anonymous UDF and assign the function to a variable. As long as this variable is in scope, you can use this variable to call the UDF.

  • You can create a named UDF and call the UDF by name. You can use this if, for example, you need to call a UDF by name or use the UDF in a subsequent session.

The next sections explain how to create these UDFs.

Note that if you defined a UDF by running the CREATE FUNCTION command, you can call that UDF in Snowpark. For details, see Calling User-Defined Functions (UDFs).

Caveat About Creating UDFs in an Object With the App Trait

Scala provides an App trait that you can extend in order to turn your Scala object into an executable program. The App trait provides a main method that automatically executes all the code in the body of your object definition. (The code in your object definition effectively becomes the main method.)

One effect of extending the App trait is that the fields in your object won’t be initialized until the main method is called. If your object extends App and you define a UDF that uses an object field that you initialized earlier, the UDF definition uploaded to the server won’t include the initialized value of the object field.

For example, suppose that you define and initialize a field named myConst in the object and use that field in a UDF:

object Main extends App {
  ...
  // Initialize a field.
  val myConst = "Prefix "
  // Use the field in a UDF.
  // Because the App trait delays the initialization of the object fields,
  // myConst in the UDF definition resolves to null.
  val myUdf = udf((s : String) =>  myConst + s )
  ...
}

When Snowpark serializes and uploads the UDF definition to Snowflake, myConst is not initialized and resolves to null. As a result, calling the UDF returns null for myConst.

To work around this, change your object so that it does not extend the App trait, and implement a separate main method for your code:

object Main {
  ...
  def main(args: Array[String]): Unit = {
    ... // Your code ...
  }
  ...
}

Specifying Dependencies for a UDF

In order to define a UDF through the Snowpark API, you must call Session.addDependency() for any files that contain any classes and resources that your UDF depends on (e.g. JAR files, resource files, etc.). (For details on reading resources from a UDF, see Reading Files from a UDF.)

The Snowpark library uploads these files to an internal stage and adds the files to the classpath when executing your UDF.

Tip

If you don’t want the library to upload the file every time you run your application, upload the file to a stage. When calling addDependency, pass the path to the file in the stage.

If you are using the Scala REPL, you must add the directory of classes generated by the REPL as a dependency. For example, if you used the run.sh script to start the REPL, call the following method, which adds the repl_classes directory created by the script:

// If you used the run.sh script to start the Scala REPL, call this to add the REPL classes directory as a dependency.
session.addDependency("<path_to_directory_where_you_ran_run.sh>/repl_classes/")

The following example demonstrates how to add a JAR file in a stage as a dependency:

// Add a JAR file that you uploaded to a stage.
session.addDependency("@my_stage/<path>/my-library.jar")

The following examples demonstrate how to add dependencies for JAR files and resource files:

// Add a JAR file on your local machine.
session.addDependency("/<path>/my-library.jar")

// Add a directory of resource files.
session.addDependency("/<path>/my-resource-dir/")

// Add a resource file.
session.addDependency("/<path>/my-resource.xml")

You should not need to specify the following dependencies:

  • Your Scala runtime libraries.

    These libraries are already available in the runtime environment on the server where your UDFs are executed.

  • The Snowpark JAR file.

    The Snowpark library automatically attempts to detect and upload the Snowpark JAR file to the server.

    To prevent the library from repeatedly uploading the Snowpark JAR file to the server:

    1. Upload the Snowpark JAR file to a stage.

      For example, the following command uploads the Snowpark JAR file to the stage @mystage. The PUT command compresses the JAR file and names the resulting file snowpark-0.5.0.jar.gz.

      -- Put the Snowpark JAR file in a stage.
      PUT file:///<path>/snowpark-0.5.0.jar @mystage
      
    2. Call addDependency to add the Snowpark JAR file in the stage as a dependency.

      For example, to add the Snowpark JAR file uploaded by the previous command:

      // Add the Snowpark JAR file that you uploaded to a stage.
      session.addDependency("@mystage/snowpark-0.5.0.jar.gz")
      

      Note that the specified path to the JAR file includes the .gz filename extension, which was added by the PUT command.

  • The JAR file or directory with the currently running application.

    The Snowpark library automatically attempts to detect and upload these dependencies.

    If the Snowpark library is unable to detect these dependencies automatically, the library reports an error, and you must call addDependency to add these dependencies manually.

Creating an Anonymous UDF

To create an anonymous UDF, you can either:

  • Call the udf function in the com.snowflake.snowpark.functions object, passing in the definition of the anonymous function.

  • Call the registerTemporary method in the UDFRegistration class, passing in the definition of the anonymous function. Because you are registering an anonymous UDF, you must use the method signatures that don’t have a name parameter.

Note

When writing multi-threaded code (e.g. when using parallel collections), use the registerTemporary method to register UDFs, rather than using the udf function. This can prevent errors in which the default Snowflake Session object cannot be found.

The following example creates an anonymous UDF that uses a custom class (LanguageDetector, which detects the language used in text). The example calls the anonymous UDF to detect the language in the text_data column in a DataFrame and creates a new DataFrame that includes an additional lang column with the language used.

// Import the udf function from the functions object.
import com.snowflake.snowpark.functions._

// Import the package for your custom code.
// The custom code in this example detects the language of textual data.
import com.mycompany.LanguageDetector

// If the custom code is packaged in a JAR file, add that JAR file as
// a dependency.
session.addDependency("$HOME/language-detector.jar")

// Create a detector
val detector = new LanguageDetector()

// Create an anonymous UDF that takes a string of text and returns the language used in that string.
// Note that this captures the detector object created above.
// Assign the UDF to the langUdf variable, which will be used to call the UDF.
val langUdf = udf((s: String) =>
     Option(detector.detect(s)).getOrElse("UNKNOWN"))

// Create a new DataFrame that contains an additional "lang" column that contains the language
// detected by the UDF.
val dfEmailsWithLangCol =
    dfEmails.withColumn("lang", langUdf(col("text_data")))

Note

If you are creating a UDF in a Jupyter notebook, you must set up the notebook to work with Snowpark (see Setting Up a Jupyter Notebook for Snowpark) and follow the guidelines for writing UDFs in a notebook (see Creating UDFs in Jupyter Notebooks).

When you run your client code, the Snowpark library serializes the lambda closure and sends it to the server for execution. If an object captured by the lambda closure is not serializable, you can either:

  • Make the object serializable, or

  • Declare the object as a lazy val or use the @transient annotation to avoid serializing the object.

For example:

// Declare the detector object as lazy.
lazy val detector = new LanguageDetector("en")
// The detector object is not serialized but is instead reconstructed on the server.
val langUdf = udf(...)
val dfEmailsWithLangCol = ...

Creating and Registering a Named UDF

If you want to call a UDF by name (e.g. by using the callUDF function in the functions object) or if you need to use a UDF in subsequent sessions, you can create and register a named UDF. To do this, use one of the following methods in the UDFRegistration class:

  • registerTemporary, if you just plan to use the UDF in the current session

  • registerPermanent, if you plan to use the UDF in subsequent sessions

To access an object of the UDFRegistration class, call the udf method of the Session class.

registerTemporary creates a temporary UDF that you can use in the current session.

// Create and register a temporary named UDF.
session.udf.registerTemporary("doubleUdf", (x: Int) => x + x)
// Call the named UDF, passing in the num column and returning the result in a new column named doubleNum.
val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))

registerPermanent creates a UDF that you can use in the current and subsequent sessions. When you call registerPermanent, you must also specify a location in an internal stage location where the JAR files for the UDF and its dependencies will be uploaded.

Note

registerPermanent does not support external stages.

For example:

// Create and register a permanent named UDF.
// Specify that the UDF and dependent JAR files should be uploaded to
// the internal stage named mystage.
session.udf.registerPermanent("doubleUdf", (x: Int) => x + x, "mystage")
// Call the named UDF, passing in the num column and returning the result in a new column named doubleNum.
val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))

Note

If you are creating a UDF in a Jupyter notebook, you must set up the notebook to work with Snowpark (see Setting Up a Jupyter Notebook for Snowpark) and follow the guidelines for writing UDFs in a notebook (see Creating UDFs in Jupyter Notebooks).

Creating UDFs in Jupyter Notebooks

If you are creating UDFs in a Jupyter notebook, you must follow these additional steps:

Writing the Implementation of a UDF

Define the implementation of your function in a class that extends Serializable. For example:

// Class containing a function that implements your UDF.
class MyUDFCode( ... ) extends Serializable {
  val myUserDefinedFunc = (s: String) => {
    ...
  }
}
val myUdf = udf((new MyUDFCode(resourceName)).myUserDefinedFunc)

Accessing a Variable Defined in Another Cell

If you need to use a variable defined in another cell in your UDF, you must pass the variable as an argument to the class constructor. For example, suppose that in cell 1, you’ve defined a variable:

In [1]:
val prefix = "Hello"

and you want to use that variable in a UDF that you’ve defined in cell 2. In the class constructor for the UDF, add an argument for this variable. Then, when calling the class constructor to create the UDF, pass in the variable defined in cell 1:

In [2]:
// resourceName is the argument for the variable defined in another cell.
class UDFCode(var prefix: String) extends Serializable {
  val prependPrefixFunc = (s: String) => {
    s"$prefix $s"
  }
}

// When constructing UDFCode, pass in the variable (resourceName) that is defined in another cell.
val prependPrefixUdf = udf((new UDFCode(prefix)).prependPrefixFunc)
val myDf = session.sql("select 'Raymond' NAME")
myDf.withColumn("CONCAT", prependPrefixUdf(col("NAME"))).show()

Writing Initialization Code for a UDF

If your UDF requires initialization code or context, you can provide this through values captured as part of the UDF closure.

The following example uses a separate class to initialize the context needed by three UDFs.

  • The first UDF creates a new instance of the class within the lambda, so the initialization is performed every time the UDF is invoked.

  • The second UDF captures an instance of the class generated in your client program. The context generated on the client is serialized and is used by the UDF. Note that the context class must be serializable for this approach to work.

  • The third UDF captures a lazy val, so the context is instantiated lazily on the first UDF invocation and is reused in subsequent invocations. This approach works even when the context is not serializable. However, there is no guarantee that ALL UDF invocations within a dataframe will use the same lazily generated context.

import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._
import scala.util.Random

// Context needed for a UDF.
class Context {
  val randomInt = Random.nextInt
}

// Serializable context needed for the UDF.
class SerContext extends Serializable {
  val randomInt = Random.nextInt
}

object TestUdf {
  def main(args: Array[String]): Unit = {
    // Create the session.
    val session = Session.builder.configFile("/<path>/profile.properties").create
    import session.implicits._
    session.range(1, 10, 2).show()

    // Create a DataFrame with two columns ("c" and "d").
    val dummy = session.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("c", "d")
    dummy.show()

    // Initialize the context once per invocation.
    val udfRepeatedInit = udf((i: Int) => (new Context).randomInt)
    dummy.select(udfRepeatedInit('c)).show()

    // Initialize the serializable context only once,
    // regardless of the number of times that the UDF is invoked.
    val sC = new SerContext
    val udfOnceInit = udf((i: Int) => sC.randomInt)
    dummy.select(udfOnceInit('c)).show()

    // Initialize the non-serializable context only once,
    // regardless of the number of times that the UDF is invoked.
    lazy val unserC = new Context
    val udfOnceInitU = udf((i: Int) => unserC.randomInt)
    dummy.select(udfOnceInitU('c)).show()
  }
}

Reading Files from a UDF

As mentioned earlier, the Snowpark library uploads and executes UDFs on the server. If your UDF needs to read data from a file, you must ensure that the file is uploaded with the UDF.

In addition, if the content of the file remains the same between calls to the UDF, you can write your code to load the file once during the first call and not on subsequent calls. This can imporve the performance of your UDF calls.

To set up a UDF to read a file:

  1. Add the file to a JAR file.

    For example, if your UDF needs to use a file in a data/ subdirectory (data/hello.txt), run the jar command to add this file to a JAR file:

    # Create a new JAR file containing data/hello.txt.
    $ jar cvf <path>/myJar.jar data/hello.txt
    
  2. Specify that the JAR file is a dependency, which uploads the file to the server and adds the file to the classpath. See Specifying Dependencies for a UDF.

    For example:

    // Specify that myJar.jar contains files that your UDF depends on.
    session.addDependency("<path>/myJar.jar")
    
  3. In the UDF, call Class.getResourceAsStream to find the file in the classpath and read the file.

    To avoid adding a dependency on this. you can use classOf[com.snowflake.snowpark.DataFrame] (rather than getClass) to get the Class object,

    For example, to read the data/hello.txt file:

    // Read data/hello.txt from myJar.jar.
    val resourceName = "/data/hello.txt"
    val inputStream = classOf[com.snowflake.snowpark.DataFrame].getResourceAsStream(resourceName)
    

    In this example, the resource name starts with a /, which indicates that this is the full path of the file in the JAR file. (In this case, the location of the file is not relative to the package of the class.)

Note

If you don’t expect the content of the file to change between UDF calls, read the file into a lazy val. This causes the file loading code to execute only on the first call to the UDF and not on subsequent calls.

The following example defines an object (UDFCode) with a function that will be used as a UDF (readFileFunc). The function reads the file data/hello.txt, which is expected to contain the string hello,. The function prepends this string to the string passed in as an argument.

// Create a function object that reads a file.
object UDFCode extends Serializable {

  // The code in this block reads the file. To prevent this code from executing each time that the UDF is called,
  // the code is used in the definition of a lazy val. The code for a lazy val is executed only once when the variable is
  // first accessed.
  lazy val prefix = {
    import java.io._
    val resourceName = "/data/hello.txt"
    val inputStream = classOf[com.snowflake.snowpark.DataFrame]
      .getResourceAsStream(resourceName)
    if (inputStream == null) {
      throw new Exception("Can't find file " + resourceName)
    }
    scala.io.Source.fromInputStream(inputStream).mkString
  }

  val readFileFunc = (s: String) => prefix + " : " + s
}

The next part of the example registers the function as an anonymous UDF. The example calls the UDF on the NAME column in a DataFrame. The example assumes that the data/hello.txt file is packaged in the JAR file myJar.jar.

// Add the JAR file as a dependency.
session.addDependency("<path>/myJar.jar")

// Create a new DataFrame with one column (NAME)
// that contains the name "Raymond".
val myDf = session.sql("select 'Raymond' NAME")

// Register the function that you defined earlier as an anonymous UDF.
val readFileUdf = udf(UDFCode.readFileFunc)

// Call UDF for the values in the NAME column of the DataFrame.
myDf.withColumn("CONCAT", readFileUdf(col("NAME"))).show()