Local Testing Framework

This topic explains how to test your code locally when working with the Snowpark library.

The Snowpark Python local testing framework allows you to create and operate on Snowpark Python DataFrames locally without connecting to a Snowflake account. You can use the local testing framework to test your DataFrame operations locally, on your development machine or in a CI (continuous integration) pipeline, before deploying code changes to your account. The API is the same, so you can either run your tests locally or against a Snowflake account, without making code changes.

Prerequisites

To use the local testing framework:

  • You must use version 1.11.1 or higher of the Snowpark Python library with the optional dependency pandas. Install by running pip install "snowflake-snowpark-python[pandas]"

  • The supported versions of Python are:

    • 3.8

    • 3.9

    • 3.10

    • 3.11

Creating a Session and Enabling Local Testing

To get started, create a Snowpark Session and set the local testing configuration to True.

from snowflake.snowpark import Session

session = Session.builder.config('local_testing', True).create()
Copy

After the session is created, you can use it to create and operate on DataFrames.

df = session.create_dataframe([[1,2],[3,4]],['a','b'])
df.with_column('c', df['a']+df['b']).show()
Copy

Loading Data

You can create Snowpark DataFrames from Python primitives, files, and Pandas DataFrames. This is useful for specifying the input and expected output of test cases. By doing it this way, the data is in source control, which makes it easier to keep the test data in sync with the test cases.

Loading CSV Data

You can load CSV files into a Snowpark DataFrame by first calling Session.file.put() to load the file to the in-memory stage, then using Session.read() to read the contents. Assume there is a file, data.csv, and the file has the following contents:

col1,col2,col3,col4
1,a,true,1.23
2,b,false,4.56
Copy

You can load data.csv into a Snowpark DataFrame using the following code. You need to put the file onto a stage first. Otherwise, you will get a file can not be found error.

from snowflake.snowpark.types import StructType, StructField, IntegerType, BooleanType, StringType, DoubleType


# Put file onto stage
session.file.put("data.csv", "@mystage", auto_compress=False)
schema = StructType(
    [
        StructField("col1", IntegerType()),
        StructField("col2", StringType()),
        StructField("col3", BooleanType()),
        StructField("col4", DoubleType()),
    ]
)

# with option SKIP_HEADER set to 1, the header will be skipped when the csv file is loaded
dataframe = session.read.schema(schema).option("SKIP_HEADER", 1).csv("@mystage/data.csv")
dataframe.show()
Copy

The output of dataframe.show() will be:

-------------------------------------
|"COL1"  |"COL2"  |"COL3"  |"COL4"  |
-------------------------------------
|1       |a       |True    |1.23    |
|2       |b       |False   |4.56    |
-------------------------------------

Loading Pandas Data

You can create a Snowpark Python DataFrame from a Pandas DataFrame by calling the create_dataframe method and passing the data as a Pandas DataFrame.

import pandas as pd

pandas_df = pd.DataFrame(
    data={
        "col1": pd.Series(["value1", "value2"]),
        "col2": pd.Series([1.23, 4.56]),
        "col3": pd.Series([123, 456]),
        "col4": pd.Series([True, False]),
    }
)

dataframe = session.create_dataframe(data=pandas_df)
dataframe.show()
Copy

dataframe.show() outputs the following:

-------------------------------------
|"col1"  |"col2"  |"col3"  |"col4"  |
-------------------------------------
|value1  |1.23    |123     |True    |
|value2  |4.56    |456     |False   |
-------------------------------------

A Snowpark Python DataFrame can be converted to a Pandas DataFrame as well by calling the to_pandas method on the DataFrame.

from snowflake.snowpark.types import StructType, StructField, StringType, DoubleType, LongType, BooleanType

dataframe = session.create_dataframe(
    data=[
        ["value1", 1.23, 123, True],
        ["value2", 4.56, 456, False],
    ],
    schema=StructType([
        StructField("col1", StringType()),
        StructField("col2", DoubleType()),
        StructField("col3", LongType()),
        StructField("col4", BooleanType()),
    ])
)

pandas_dataframe = dataframe.to_pandas()
print(pandas_dataframe.to_string())
Copy

The call to print(pandas_dataframe.to_string()) outputs the following:

    COL1  COL2  COL3   COL4
0  value1  1.23   123   True
1  value2  4.56   456  False

Creating a PyTest Fixture for Session

PyTest fixtures are functions that are executed before a test (or module of tests), typically to provide data or connections to tests. In this case, create a fixture which returns a Snowpark Session object. First, create a test directory if you do not already have one. Then, in the test directory, create a file conftest.py with the following contents, where connection_parameters is a dictionary with your Snowflake account credentials. For more information about the dictionary format, see Creating a Session.

# test/conftest.py
import pytest
from snowflake.snowpark.session import Session

def pytest_addoption(parser):
    parser.addoption("--snowflake-session", action="store", default="live")

@pytest.fixture(scope='module')
def session(request) -> Session:
    if request.config.getoption('--snowflake-session') == 'local':
        return Session.builder.config('local_testing', True).create()
    else:
        return Session.builder.configs(CONNECTION_PARAMETERS).create()
Copy

The call to pytest_addoption adds a command line option named snowflake-session to the pytest command. The Session fixture checks this command line option, and creates a local or live Session depending on its value. This allows you to easily switch between local and live modes for testing.

# Using local mode:
pytest --snowflake-session local

# Using live mode
pytest
Copy

SQL Operations

Session.sql(...) is not supported in the local testing framework. Use Snowpark’s DataFrame APIs whenever possible, and in cases where you must use Session.sql(...), you can mock the tabular return value using Python’s unittest.mock.patch to patch the expected response from a given Session.sql() call.

In the example below, mock_sql() maps the SQL query text to the desired DataFrame response. The following conditional statement checks if the current session is using local testing, and if so, applies the patch to the Session.sql() method.

from unittest import mock
from functools import partial

def test_something(pytestconfig, session):

    def mock_sql(session, sql_string):  # patch for SQL operations
        if sql_string == "select 1,2,3":
            return session.create_dataframe([[1,2,3]])
        else:
            raise RuntimeError(f"Unexpected query execution: {sql_string}")

    if pytestconfig.getoption('--snowflake-session') == 'local':
        with mock.patch.object(session, 'sql', wraps=partial(mock_sql, session)): # apply patch for SQL operations
            assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
    else:
        assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
Copy

When local testing is enabled, all tables created by DataFrame.save_as_table() are saved as temporary tables in memory and can be retrieved using Session.table(). You can use the supported DataFrame operations on the table as usual.

Patching Built-In Functions

Not all of the built-in functions under snowflake.snowpark.functions are supported in the local testing framework. If you use a function that is not supported, you need to use the @patch decorator from snowflake.snowpark.mock to create a patch.

To define and implement the patched function, the signature (parameter list) must align with the built-in function’s parameters. The local testing framework passes parameters to the patched function using the following rules:

  • For parameters of type ColumnOrName in the signature of built-in functions, ColumnEmulator is passed as the parameter of the patched functions. ColumnEmulator is similar to a pandas.Series object that contains the column data.

  • For parameters of type LiteralType in the signature of built-in functions, the literal value is passed as the parameter of the patched functions.

  • Otherwise, the raw value is passed as the parameter of the patched functions.

As for the returning type of the patched functions, returning an instance of ColumnEmulator is expected in correspondence with the returning type of Column of built-in functions.

For example, the built-in function to_timestamp() could be patched like this:

import datetime
from snowflake.snowpark.mock import patch, ColumnEmulator, ColumnType
from snowflake.snowpark.functions import to_timestamp
from snowflake.snowpark.types import TimestampType

@patch(to_timestamp)
def mock_to_timestamp(column: ColumnEmulator, format = None) -> ColumnEmulator:
    ret_column = ColumnEmulator(data=[datetime.datetime.strptime(row, '%Y-%m-%dT%H:%M:%S%z') for row in column])
    ret_column.sf_type = ColumnType(TimestampType(), True)
    return ret_column
Copy

Skipping Test Cases

If your PyTest test suite contains a test case that is not well supported by local testing, you can skip those cases using PyTest’s mark.skipif decorator. The example below assumes that you configured your session and parameters as described earlier. The condition checks if the local_testing_mode is set to local, and if so, the test case is skipped with a message explaining why it was skipped.

import pytest

@pytest.mark.skipif(
    condition="config.getvalue('local_testing_mode') == 'local'",
reason="Test case disabled for local testing"
)
def test_case(session):
    ...
Copy

Limitations

The following functionality is not supported:

  • Raw SQL strings and operations that require parsing SQL strings. For example, session.sql and DataFrame.filter("col1 > 12") are not supported.

  • UDFs, UDTFs, and stored procedures.

  • Table functions.

  • AsyncJobs.

  • Session operations, like altering warehouses, schemas, and other session properties.

  • Geometry and Geography data types.

  • Aggregating window functions.

    # Selecting window function expressions is supported
    df.select("key", "value", sum_("value").over(), avg("value").over())
    
    # Aggregating window function expressions is NOT supported
    df.group_by("key").agg([sum_("value"), sum_(sum_("value")).over(window) - sum_("value")])
    
    Copy

Other limitations are:

  • Variant, Array, and Object data types are only supported with standard JSON encoding and decoding. Expressions like [1,2,,3,] are considered valid JSON in Snowflake but not in local testing, where Python’s built-in JSON functionalities are used. You can specify the module-level variables snowflake.snowpark.mock.CUSTOM_JSON_ENCODER and snowflake.snowpark.mock.CUSTOM_JSON_DECODER to override the default settings.

  • Only a subset of Snowflake’s functions (including window functions) are implemented. See Patching Built-In Functions to learn how to inject your own function definition.

  • Patching rank related functions is currently not supported.

  • Selecting columns with the same name will only return one column. As a workaround, rename the columns to have distinct names using Column.alias.

    df.select(lit(1), lit(1)).show() # col("a"), col("a")
    #---------
    #|"'1'"  |
    #---------
    #|1      |
    #|...    |
    #---------
    
    # Workaround: Column.alias
    DataFrame.select(lit(1).alias("col1_1"), lit(1).alias("col1_2"))
    # "col1_1", "col1_2"
    
    Copy
  • Explicit type casting using Column.cast has the limitation that format strings are not supported for inputs: to_decimal, to_number, to_numeric, to_double, to_date, to_time, to_timestamp and outputs: to_char, to_varchar, to_binary.

  • JSON strings stored in VariantType cannot be converted to Datetime types.

  • For Table.merge and Table.update the implementation only supports behavior when the session parameters ERROR_ON_NONDETERMINISTIC_UPDATE and ERROR_ON_NONDETERMINISTIC_MERGE are set to False. This means for multi-joins, it updates one of the matched rows.

List of Supported APIs

Snowpark Session

Session.createDataFrame

Session.create_dataframe

Session.flatten

Session.range

Session.table

Input/Output

DataFrameReader.csv

DataFrameReader.table

DataFrameWriter.saveAsTable

DataFrameWriter.save_as_table

DataFrame

DataFrame.agg

DataFrame.cache_result

DataFrame.col

DataFrame.collect

DataFrame.collect_nowait

DataFrame.copy_into_table

DataFrame.count

DataFrame.createOrReplaceTempView

DataFrame.createOrReplaceView

DataFrame.create_or_replace_temp_view

DataFrame.create_or_replace_view

DataFrame.crossJoin

DataFrame.cross_join

DataFrame.distinct

DataFrame.drop

DataFrame.dropDuplicates

DataFrame.drop_duplicates

DataFrame.dropna

DataFrame.except_

DataFrame.explain

DataFrame.fillna

DataFrame.filter

DataFrame.first

DataFrame.groupBy

DataFrame.group_by

DataFrame.intersect

DataFrame.join

DataFrame.limit

DataFrame.minus

DataFrame.natural_join

DataFrame.orderBy

DataFrame.order_by

DataFrame.rename

DataFrame.replace

DataFrame.rollup

DataFrame.sample

DataFrame.select

DataFrame.show

DataFrame.sort

DataFrame.subtract

DataFrame.take

DataFrame.toDF

DataFrame.toLocalIterator

DataFrame.toPandas

DataFrame.to_df

DataFrame.to_local_iterator

DataFrame.to_pandas

DataFrame.to_pandas_batches

DataFrame.union

DataFrame.unionAll

DataFrame.unionAllByName

DataFrame.unionByName

DataFrame.union_all

DataFrame.union_all_by_name

DataFrame.union_by_name

DataFrame.unpivot

DataFrame.where

DataFrame.withColumn

DataFrame.withColumnRenamed

DataFrame.with_column

DataFrame.with_column_renamed

DataFrame.with_columns

DataFrameNaFunctions.drop

DataFrameNaFunctions.fill

DataFrameNaFunctions.replace

Column

Column.alias

Column.as_

Column.asc

Column.asc_nulls_first

Column.asc_nulls_last

Column.astype

Column.between

Column.bitand

Column.bitor

Column.bitwiseAnd

Column.bitwiseOR

Column.bitwiseXOR

Column.bitxor

Column.cast

Column.collate

Column.desc

Column.desc_nulls_first

Column.desc_nulls_last

Column.endswith

Column.eqNullSafe

Column.equal_nan

Column.equal_null

Column.getItem

Column.getName

Column.get_name

Column.in_

Column.isNotNull

Column.isNull

Column.is_not_null

Column.is_null

Column.isin

Column.like

Column.name

Column.over

Column.regexp

Column.rlike

Column.startswith

Column.substr

Column.substring

Column.try_cast

Column.within_group

CaseExpr.when

CaseExpr.otherwise

Data Types

ArrayType

BinaryType

BooleanType

ByteType

ColumnIdentifier

DataType

DateType

DecimalType

DoubleType

FloatType

IntegerType

LongType

MapType

NullType

ShortType

StringType

StructField

StructType

Timestamp

TimestampType

TimeType

Variant

VariantType

Row

Row.asDict

Row.as_dict

Row.count

Row.index

Function

abs

avg

coalesce

contains

count

count_distinct

covar_pop

endswith

first_value

iff

lag

last_value

lead

list_agg

max

median

min

parse_json

row_number

startswith

substring

sum

to_array

to_binary

to_boolean

to_char

to_date

to_decimal

to_double

to_object

to_time

to_timestamp

to_variant

Window

Window.orderBy

Window.order_by

Window.partitionBy

Window.partition_by

Window.rangeBetween

Window.range_between

Window.rowsBetween

Window.rows_between

WindowSpec.orderBy

WindowSpec.order_by

WindowSpec.partitionBy

WindowSpec.partition_by

WindowSpec.rangeBetween

WindowSpec.range_between

WindowSpec.rowsBetween

WindowSpec.rows_between

Grouping

RelationalGroupedDataFrame.agg

RelationalGroupedDataFrame.apply_in_pandas

RelationalGroupedDataFrame.applyInPandas

RelationalGroupedDataFrame.avg

RelationalGroupedDataFrame.builtin

RelationalGroupedDataFrame.count

RelationalGroupedDataFrame.function

RelationalGroupedDataFrame.max

RelationalGroupedDataFrame.mean

RelationalGroupedDataFrame.median

RelationalGroupedDataFrame.min

RelationalGroupedDataFrame.sum

Table

Table.delete

Table.drop_table

Table.merge

Table.sample

Table.update

WhenMatchedClause.delete

WhenMatchedClause.update

WhenNotMatchedClause.insert