Writing stored procedures in Python

This topic explains how to write a stored procedure in Python. You can use the Snowpark library within your stored procedure to perform queries, updates, and other work on tables in Snowflake.

Introduction

With Snowpark Stored Procedures, you can build and run your data pipeline within Snowflake, using a Snowflake warehouse as the compute framework. Build your data pipeline by using the Snowpark API for Python to write stored procedures. To schedule the execution of these stored procedures, you use tasks.

For information on machine learning models and Snowpark Python, see Training Machine Learning Models with Snowpark Python.

You can write Snowpark Stored Procedures for Python using a Python worksheet, or using a local development environment.

You can capture log and trace data as your handler code executes. For more information, refer to Logging and Tracing Overview.

Note

To both create and call an anonymous procedure, use CALL (with anonymous procedure). Creating and calling an anonymous procedure does not require a role with CREATE PROCEDURE schema privileges.

Prerequisites for writing stored procedures locally

To write Python stored procedures in your local development environment, meet the following prerequisites:

  • You must use version 0.4.0 or a more recent version of the Snowpark library.

  • Enable Anaconda Packages so that Snowpark Python can load the required third party dependencies. Refer to Using third-party packages from Anaconda.

  • The supported versions of Python are:

    • 3.8

    • 3.9

    • 3.10

    • 3.11

Be sure to set up your development environment to use the Snowpark library. Refer to Setting Up Your Development Environment for Snowpark.

Writing the Python code for the stored procedure

For your procedure’s logic, you write handler code that executes when the procedure is called. This section describes the design of a handler.

You can create a stored procedure from the handler code in several ways:

Limitations

Snowpark Stored Procedures have the following limitations:

  • Creating processes is not supported in stored procedures.

  • Running concurrent queries is not supported in stored procedures.

  • You cannot use APIs that execute PUT and GET commands, including Session.sql("PUT ...") and Session.sql("GET ...").

  • When you download files from a stage using session.file.get, pattern matching is not supported.

  • If you execute your stored procedure from a task, you must specify a warehouse when creating the task. You cannot use serverless compute resources to run the task.

  • Creating named temp objects is not supported in an owner’s rights stored procedure. An owner’s rights stored procedure is a stored procedure that runs with the privileges of the stored procedure owner. For more information, refer to caller’s rights or owner’s rights.

Planning to write your stored procedure

Stored procedures run inside Snowflake, and so you must plan the code that you write with that in mind.

  • Limit the amount of memory consumed. Snowflake places limits on a method in terms of the amount of memory needed. For guidance, refer to Designing Handlers that Stay Within Snowflake-Imposed Constraints.

  • Make sure that your handler method or function is thread safe.

  • Follow the rules and security restrictions. Refer to Security Practices for UDFs and Procedures.

  • Decide whether you want the stored procedure to run with caller’s rights or owner’s rights.

  • Consider the snowflake-snowpark-python version used to run stored procedures. Due to limitations in the stored procedures release process, the snowflake-snowpark-python library available in the Python Stored Procedure environment is usually one version behind the publicly released version. Use the following SQL to find out the latest available version:

    select * from information_schema.packages where package_name = 'snowflake-snowpark-python' order by version desc;
    
    Copy

Writing the method or function

When writing the method or function for the stored procedure, note the following:

  • Specify the Snowpark Session object as the first argument of your method or function. When you call your stored procedure, Snowflake automatically creates a Session object and passes it to your stored procedure. (You cannot create the Session object yourself.)

  • For the rest of the arguments and for the return value, use the Python types that correspond to Snowflake data types. Snowflake supports the Python data types listed in SQL-Python Data Type Mappings for Parameters and Return Types.

  • When you run an asynchronous child job from within a procedure’s handler – such as by using DataFrame.collect_nowait – “fire and forget” is not supported.

    In other words, if the handler issues a child query that is still running when the parent procedure job completes, the child job is canceled automatically.

Handling errors

You can use the normal Python exception-handling techniques to catch errors within the procedure.

If an uncaught exception occurs inside the method, Snowflake raises an error that includes the stack trace for the exception. When logging of unhandled exceptions is enabled, Snowflake logs data about unhandled exceptions in an event table.

Making dependencies available to your code

If your handler code depends on code defined outside the handler itself (such as code defined in a module) or on resource files, you can make those dependencies available to your code by uploading them to a stage. Refer to Making Dependencies Available to Your Code, or for Python worksheets, refer to Add a Python File from a Stage to a Worksheet.

If you create your stored procedure using SQL, use the IMPORTS clause when writing the CREATE PROCEDURE statement, to point to the dependency files.

Accessing data in Snowflake from your stored procedure

To access data in Snowflake, use the Snowpark library APIs.

When handling a call to your Python stored procedure, Snowflake creates a Snowpark Session object and passes the object to the method or function for your stored procedure.

As is the case with stored procedures in other languages, the context for the session (e.g. the privileges, current database and schema, etc.) is determined by whether the stored procedure runs with caller’s rights or owner’s rights. For details, see Accessing and Setting the Session State.

You can use this Session object to call APIs in the Snowpark library. For example, you can create a DataFrame for a table or execute an SQL statement.

See the Snowpark Developer Guide for more information.

Data access example

The following is an example of a Python method that copies a specified number of rows from one table to another table. The method takes the following arguments:

  • A Snowpark Session object

  • The name of the table to copy the rows from

  • The name of the table to save the rows to

  • The number of rows to copy

The method in this example returns a string. If you run this example in a Python worksheet, change the return type for the worksheet to a String

def run(session, from_table, to_table, count):

  session.table(from_table).limit(count).write.save_as_table(to_table)

  return "SUCCESS"
Copy

Reading files

Using the SnowflakeFile class in the Snowpark snowflake.snowpark.files module, your Python handler can dynamically read a file from one of the following Snowflake stages:

  • A named internal stage.

  • A specified table’s internal stage.

  • The current user’s internal stage.

Snowflake supports reading files with SnowflakeFile for both stored procedures and user-defined functions. For more information about reading files in your handler code, as well as more examples, refer to Reading a File with a Python UDF Handler.

This example demonstrates how to create and call an owner’s rights stored procedure that reads a file using the SnowflakeFile class.

Create the stored procedure with an in-line handler, specifying the input mode as binary by passing rb for the mode argument:

CREATE OR REPLACE PROCEDURE calc_phash(file_path string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','imagehash','pillow')
HANDLER = 'run'
AS
$$
from PIL import Image
import imagehash
from snowflake.snowpark.files import SnowflakeFile

def run(ignored_session, file_path):
    with SnowflakeFile.open(file_path, 'rb') as f:
        return imagehash.average_hash(Image.open(f))
$$;
Copy

Call the stored procedure:

CALL calc_phash(build_scoped_file_url(@my_files, 'my_image.jpg'));
Copy

Using third-party packages from Anaconda

You can specify Anaconda packages to install when you create Python stored procedures. To view the list of third-party packages from Anaconda, see the Anaconda Snowflake channel. These third-party packages are built and provided by Anaconda. You may use the Snowflake conda channel for local testing and development at no cost under the Supplemental Embedded Software Terms to Anaconda’s Terms of Service.

For limitations, see Limitations.

Getting started

Before you start using the packages provided by Anaconda inside Snowflake, you must acknowledge the External Offerings Terms.

Note

You must be the organization administrator (use the ORGADMIN role) to accept the terms. You only need to accept the terms once for your Snowflake account. See Enabling the ORGADMIN role in an account.

  1. Sign in to Snowsight.

  2. Select Admin » Billing & Terms.

  3. In the Anaconda section, select Enable.

  4. In the Anaconda Packages dialog, click the link to review the External Offerings Terms page.

  5. If you agree to the terms, select Acknowledge & Continue.

If you see an error when attempting to accept the terms of service, your user profile might be missing a first name, last name, or email address. If you have an administrator role, refer to Add user details to your user profile to update your profile using Snowsight. Otherwise, contact an administrator to update your account.

Note

If you don’t acknowledge the Snowflake External Offerings Terms as described above, you can still use stored procedures, but with these limitations:

  • You can’t use any third-party packages from Anaconda.

  • You can still specify Snowpark Python as a package in a stored procedure, but you can’t specify a specific version.

  • You can’t use the to_pandas method when interacting with a DataFrame object.

Displaying and using packages

You can display all available packages and their version information by querying the PACKAGES view in the Information Schema:

select * from information_schema.packages where language = 'python';
Copy

For more information, see Using Third-Party Packages in the Snowflake Python UDF documentation.

Creating the stored procedure

You can create a stored procedure from a Python worksheet, or using SQL.

Creating a Python stored procedure to automate your Python worksheet code

Create a Python stored procedure from your Python worksheet to automate your code. For details on writing Python worksheets, see Writing Snowpark Code in Python Worksheets.

Prerequisites

Your role must have OWNERSHIP or CREATE PROCEDURE privileges on the database schema in which you run your Python worksheet to deploy it as a stored procedure.

Deploy a Python worksheet as a stored procedure

To create a Python stored procedure to automate the code in your Python worksheet, do the following:

  1. Sign in to Snowsight.

  2. Open Projects » Worksheets.

  3. Open the Python worksheet that you want to deploy as a stored procedure.

  4. Select Deploy.

  5. Enter a name for the stored procedure.

  6. (Optional) Enter a comment with details about the stored procedure.

  7. (Optional) Select Replace if exists to replace an existing stored procedure with the same name.

  8. For Handler, select the handler function for your stored procedure. For example, main.

  9. Review the arguments used by your handler function and if needed, override the SQL data type mapping for a typed argument. For details about how Python types are mapped to SQL types, see SQL-Python Data Type Mappings.

  10. (Optional) Select Open in Worksheets to open the stored procedure definition in a SQL worksheet.

  11. Select Deploy to create the stored procedure.

  12. After the stored procedure is created, you can go to the procedure details or select Done.

You can create multiple stored procedures from one Python worksheet.

After you create a stored procedure, you can automate it as part of a task. Refer to Introduction to tasks.

Returning tabular data

You can write a procedure that returns data in tabular form. To write a procedure that returns tabular data, do the following:

  • Specify TABLE(...) as the procedure’s return type in your CREATE PROCEDURE statement.

    As TABLE parameters, you can specify the returned data’s column names and types if you know them. If you don’t know the returned columns when defining the procedure – such as when they’re specified at run time – you can leave out the TABLE parameters. When you do, the procedure’s return value columns will be converted from the columns in the DataFrame returned by its handler. Column data types will be converted to SQL according to the mapping specified in SQL-Python Data Type Mappings.

  • Write the handler so that it returns the tabular result in a Snowpark DataFrame.

    For more information about dataframes, see Working with DataFrames in Snowpark Python.

Example

The examples in this section illustrate returning tabular values from a procedure that filters for rows where a column matches a string.

Defining the data

Code in the following example creates a table of employees.

CREATE OR REPLACE TABLE employees(id NUMBER, name VARCHAR, role VARCHAR);
INSERT INTO employees (id, name, role) VALUES (1, 'Alice', 'op'), (2, 'Bob', 'dev'), (3, 'Cindy', 'dev');
Copy

Specifying return column names and types

This example specifies column names and types in the RETURNS TABLE() statement.

CREATE OR REPLACE PROCEDURE filterByRole(tableName VARCHAR, role VARCHAR)
RETURNS TABLE(id NUMBER, name VARCHAR, role VARCHAR)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'filter_by_role'
AS
$$
from snowflake.snowpark.functions import col

def filter_by_role(session, table_name, role):
   df = session.table(table_name)
   return df.filter(col("role") == role)
$$;
Copy

Omitting return column names and types

Code in the following example declares a procedure that allows return value column names and types to be extrapolated from columns in the handler’s return value. It omits the column names and types from the RETURNS TABLE() statement.

CREATE OR REPLACE PROCEDURE filterByRole(tableName VARCHAR, role VARCHAR)
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'filter_by_role'
AS
$$
from snowflake.snowpark.functions import col

def filter_by_role(session, table_name, role):
  df = session.table(table_name)
  return df.filter(col("role") == role)
$$;
Copy

Calling the procedure

The following example calls the stored procedure:

CALL filterByRole('employees', 'dev');
Copy

The procedure call produces the following output:

+----+-------+------+
| ID | NAME  | ROLE |
+----+-------+------+
| 2  | Bob   | dev  |
| 3  | Cindy | dev  |
+----+-------+------+

Calling your stored procedure

After creating a stored procedure, you can call it from SQL or as part of a scheduled task.

Examples

Running concurrent tasks with worker processes

You can run concurrent tasks using Python worker processes. You might find this useful when you need to run parallel tasks that take advantage of multiple CPU cores on warehouse nodes.

Note

Snowflake recommends that you not use the built-in Python multiprocessing module.

To work around cases where the Python Global Interpreter Lock prevents a multi-tasking approach from scaling across all CPU cores, you can execute concurrent tasks using separate worker processes, rather than threads.

You can do this on Snowflake warehouses by using the joblib library’s Parallel class, as in the following example.

CREATE OR REPLACE PROCEDURE joblib_multiprocessing_proc(i INT)
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.8
  HANDLER = 'joblib_multiprocessing'
  PACKAGES = ('snowflake-snowpark-python', 'joblib')
AS $$
import joblib
from math import sqrt

def joblib_multiprocessing(session, i):
  result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
  return str(result)
$$;
Copy

Note

The default backend used for joblib.Parallel differs between Snowflake standard and Snowpark-optimized warehouses.

  • Standard warehouse default: threading

  • Snowpark-optimized warehouse default: loky (multiprocessing)

You can override the default backend setting by calling the joblib.parallel_backend function, as in the following example.

import joblib
joblib.parallel_backend('loky')
Copy

Using Snowpark APIs for asynchrononous processing

The following examples illustrate how you can use Snowpark APIs to begin asynchronous child jobs, as well as how those jobs behave under different conditions.

Example 1: Checking the status of an asynchronous child job

In the following example, the checkStatus procedure executes an asynchronous child job that waits 60 seconds. The procedure then checks on the status of the job before it can have finished, so the check returns False.

CREATE OR REPLACE PROCEDURE checkStatus()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(60)").collect_nowait()
    return async_job.is_done()
$$;
Copy

The following code calls the procedure.

CALL checkStatus();
Copy
+------------+
| checkStatus |
|------------|
| False      |
+------------+

Example 2: Cancelling an asynchronous child job

In the following example, the cancelJob procedure uses SQL to insert data into the test_tb table with an asynchronous child job that would take 10 seconds to finish. It then cancels the child job before it finishes and the data has been inserted.

CREATE OR REPLACE TABLE test_tb(c1 STRING);
Copy
CREATE OR REPLACE PROCEDURE cancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
    async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
    return async_job.cancel()
$$;

CALL cancelJob();
Copy

The following code queries the test_tb table, but returns no results because no data has been inserted.

SELECT * FROM test_tb;
Copy
+----+
| C1 |
|----|
+----+

Example 3: Waiting and blocking while an asynchronous child job runs

In the following example, the blockUntilDone procedure executes an asynchronous child job that takes 5 seconds to finish. Using the snowflake.snowpark.AsyncJob.result method, the procedure waits and returns when the job has finished.

CREATE OR REPLACE PROCEDURE blockUntilDone()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(5)").collect_nowait()
    return async_job.result()
$$;
Copy

The following code calls the blockUntilDone procedure, which returns after waiting 5 seconds.

CALL blockUntilDone();
Copy
+------------------------------------------+
| blockUntilDone                               |
|------------------------------------------|
| [Row(SYSTEM$WAIT(5)='waited 5 seconds')] |
+------------------------------------------+

Example 4: Returning an error after requesting results from an unfinished asynchronous child job

In the following example, the earlyReturn procedure executes an asynchronous child job that takes 60 seconds to finish. The procedure then attempts to return a DataFrame from the job’s result before it can have finished. The result is an error.

CREATE OR REPLACE PROCEDURE earlyReturn()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(60)").collect_nowait()
    df = async_job.to_df()
    try:
        df.collect()
    except Exception as ex:
        return 'Error: (02000): Result for query <UUID> has expired'
$$;
Copy

The following code calls the earlyReturn procedure, returning the error.

CALL earlyReturn();
Copy
+------------------------------------------------------------+
| earlyReturn                                                 |
|------------------------------------------------------------|
| Error: (02000): Result for query <UUID> has expired        |
+------------------------------------------------------------+

Example 5: Finishing a parent job before a child job finishes, canceling the child job

In the following example, the earlyCancelJob procedure executes an asynchronous child job to insert data into a table and takes 10 seconds to finish. However, the parent job — async_handler — returns before the child job finishes, which cancels the child job.

CREATE OR REPLACE PROCEDURE earlyCancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
    async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
$$;
Copy

The following code calls the earlyCancelJob procedure. It then queries the test_tb table, which returns no result because no data was inserted by the canceled child job.

CALL earlyCancelJob();
SELECT * FROM test_tb;
Copy
+----+
| C1 |
|----|
+----+