Local Testing Framework¶
This topic explains how to test your code locally when working with the Snowpark library.
The Snowpark Python local testing framework allows you to create and operate on Snowpark Python DataFrames locally without connecting to a Snowflake account. You can use the local testing framework to test your DataFrame operations locally, on your development machine or in a CI (continuous integration) pipeline, before deploying code changes to your account. The API is the same, so you can either run your tests locally or against a Snowflake account, without making code changes.
Prerequisites¶
To use the local testing framework:
You must use version 1.11.1 or higher of the Snowpark Python library with the optional dependency
pandas
. Install by runningpip install "snowflake-snowpark-python[pandas]"
The supported versions of Python are:
3.8
3.9
3.10
3.11
Creating a Session and Enabling Local Testing¶
To get started, create a Snowpark Session
and set the local testing configuration to True
.
from snowflake.snowpark import Session
session = Session.builder.config('local_testing', True).create()
After the session is created, you can use it to create and operate on DataFrames.
df = session.create_dataframe([[1,2],[3,4]],['a','b'])
df.with_column('c', df['a']+df['b']).show()
Loading Data¶
You can create Snowpark DataFrames from Python primitives, files, and Pandas DataFrames. This is useful for specifying the input and expected output of test cases. By doing it this way, the data is in source control, which makes it easier to keep the test data in sync with the test cases.
Loading CSV Data¶
You can load CSV files into a Snowpark DataFrame by first calling Session.file.put()
to load the file to
the in-memory stage, then using Session.read()
to read the contents. Assume there is a file, data.csv
,
and the file has the following contents:
col1,col2,col3,col4
1,a,true,1.23
2,b,false,4.56
You can load data.csv
into a Snowpark DataFrame using the following code.
You need to put the file onto a stage first. Otherwise, you will get a file can not be found error.
from snowflake.snowpark.types import StructType, StructField, IntegerType, BooleanType, StringType, DoubleType
# Put file onto stage
session.file.put("data.csv", "@mystage", auto_compress=False)
schema = StructType(
[
StructField("col1", IntegerType()),
StructField("col2", StringType()),
StructField("col3", BooleanType()),
StructField("col4", DoubleType()),
]
)
# with option SKIP_HEADER set to 1, the header will be skipped when the csv file is loaded
dataframe = session.read.schema(schema).option("SKIP_HEADER", 1).csv("@mystage/data.csv")
dataframe.show()
The output of dataframe.show()
will be:
-------------------------------------
|"COL1" |"COL2" |"COL3" |"COL4" |
-------------------------------------
|1 |a |True |1.23 |
|2 |b |False |4.56 |
-------------------------------------
Loading Pandas Data¶
You can create a Snowpark Python DataFrame from a Pandas DataFrame by calling the create_dataframe
method and
passing the data as a Pandas DataFrame.
import pandas as pd
pandas_df = pd.DataFrame(
data={
"col1": pd.Series(["value1", "value2"]),
"col2": pd.Series([1.23, 4.56]),
"col3": pd.Series([123, 456]),
"col4": pd.Series([True, False]),
}
)
dataframe = session.create_dataframe(data=pandas_df)
dataframe.show()
dataframe.show()
outputs the following:
-------------------------------------
|"col1" |"col2" |"col3" |"col4" |
-------------------------------------
|value1 |1.23 |123 |True |
|value2 |4.56 |456 |False |
-------------------------------------
A Snowpark Python DataFrame can be converted to a Pandas DataFrame as well by calling the to_pandas
method on the DataFrame.
from snowflake.snowpark.types import StructType, StructField, StringType, DoubleType, LongType, BooleanType
dataframe = session.create_dataframe(
data=[
["value1", 1.23, 123, True],
["value2", 4.56, 456, False],
],
schema=StructType([
StructField("col1", StringType()),
StructField("col2", DoubleType()),
StructField("col3", LongType()),
StructField("col4", BooleanType()),
])
)
pandas_dataframe = dataframe.to_pandas()
print(pandas_dataframe.to_string())
The call to print(pandas_dataframe.to_string())
outputs the following:
COL1 COL2 COL3 COL4
0 value1 1.23 123 True
1 value2 4.56 456 False
Creating a PyTest Fixture for Session¶
PyTest fixtures are functions that are executed before a test (or module of tests),
typically to provide data or connections to tests. In this case, create a fixture which returns a Snowpark Session
object.
First, create a test
directory if you do not already have one. Then, in the test
directory, create a file conftest.py
with the following contents, where connection_parameters
is a dictionary with your Snowflake account credentials.
For more information about the dictionary format, see Creating a Session.
# test/conftest.py
import pytest
from snowflake.snowpark.session import Session
def pytest_addoption(parser):
parser.addoption("--snowflake-session", action="store", default="live")
@pytest.fixture(scope='module')
def session(request) -> Session:
if request.config.getoption('--snowflake-session') == 'local':
return Session.builder.config('local_testing', True).create()
else:
return Session.builder.configs(CONNECTION_PARAMETERS).create()
The call to pytest_addoption
adds a command line option named snowflake-session
to the pytest
command.
The Session
fixture checks this command line option, and creates a local or live Session
depending on its value.
This allows you to easily switch between local and live modes for testing.
# Using local mode:
pytest --snowflake-session local
# Using live mode
pytest
SQL Operations¶
Session.sql(...)
is not supported in the local testing framework. Use Snowpark’s DataFrame APIs whenever possible,
and in cases where you must use Session.sql(...)
, you can mock the tabular return value using Python’s
unittest.mock.patch
to patch the expected response from a given Session.sql()
call.
In the example below, mock_sql()
maps the SQL query text to the desired DataFrame response.
The following conditional statement checks if the current session is using local testing, and if so, applies the patch to the Session.sql()
method.
from unittest import mock
from functools import partial
def test_something(pytestconfig, session):
def mock_sql(session, sql_string): # patch for SQL operations
if sql_string == "select 1,2,3":
return session.create_dataframe([[1,2,3]])
else:
raise RuntimeError(f"Unexpected query execution: {sql_string}")
if pytestconfig.getoption('--snowflake-session') == 'local':
with mock.patch.object(session, 'sql', wraps=partial(mock_sql, session)): # apply patch for SQL operations
assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
else:
assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
When local testing is enabled, all tables created by DataFrame.save_as_table()
are saved as temporary tables in memory and can be
retrieved using Session.table()
. You can use the supported DataFrame operations on the table as usual.
Patching Built-In Functions¶
Not all of the built-in functions under snowflake.snowpark.functions
are supported in the local testing framework.
If you use a function that is not supported, you need to use the @patch
decorator from snowflake.snowpark.mock
to create a patch.
To define and implement the patched function, the signature (parameter list) must align with the built-in function’s parameters. The local testing framework passes parameters to the patched function using the following rules:
For parameters of type
ColumnOrName
in the signature of built-in functions,ColumnEmulator
is passed as the parameter of the patched functions.ColumnEmulator
is similar to apandas.Series
object that contains the column data.For parameters of type
LiteralType
in the signature of built-in functions, the literal value is passed as the parameter of the patched functions.Otherwise, the raw value is passed as the parameter of the patched functions.
As for the returning type of the patched functions, returning an instance of ColumnEmulator
is expected in correspondence with the returning type of Column
of built-in functions.
For example, the built-in function to_timestamp()
could be patched like this:
import datetime
from snowflake.snowpark.mock import patch, ColumnEmulator, ColumnType
from snowflake.snowpark.functions import to_timestamp
from snowflake.snowpark.types import TimestampType
@patch(to_timestamp)
def mock_to_timestamp(column: ColumnEmulator, format = None) -> ColumnEmulator:
ret_column = ColumnEmulator(data=[datetime.datetime.strptime(row, '%Y-%m-%dT%H:%M:%S%z') for row in column])
ret_column.sf_type = ColumnType(TimestampType(), True)
return ret_column
Skipping Test Cases¶
If your PyTest test suite contains a test case that is not well supported by local testing, you can skip those cases using PyTest’s mark.skipif
decorator.
The example below assumes that you configured your session and parameters as described earlier. The condition checks if the local_testing_mode
is set to local
,
and if so, the test case is skipped with a message explaining why it was skipped.
import pytest
@pytest.mark.skipif(
condition="config.getvalue('local_testing_mode') == 'local'",
reason="Test case disabled for local testing"
)
def test_case(session):
...
Limitations¶
The following functionality is not supported:
Raw SQL strings and operations that require parsing SQL strings. For example,
session.sql
andDataFrame.filter("col1 > 12")
are not supported.UDFs, UDTFs, and stored procedures.
Table functions.
AsyncJobs.
Session operations, like altering warehouses, schemas, and other session properties.
Geometry
andGeography
data types.Aggregating window functions.
# Selecting window function expressions is supported df.select("key", "value", sum_("value").over(), avg("value").over()) # Aggregating window function expressions is NOT supported df.group_by("key").agg([sum_("value"), sum_(sum_("value")).over(window) - sum_("value")])
Other limitations are:
Variant
,Array
, andObject
data types are only supported with standard JSON encoding and decoding. Expressions like [1,2,,3,] are considered valid JSON in Snowflake but not in local testing, where Python’s built-in JSON functionalities are used. You can specify the module-level variablessnowflake.snowpark.mock.CUSTOM_JSON_ENCODER
andsnowflake.snowpark.mock.CUSTOM_JSON_DECODER
to override the default settings.Only a subset of Snowflake’s functions (including window functions) are implemented. See Patching Built-In Functions to learn how to inject your own function definition.
Patching rank related functions is currently not supported.
Selecting columns with the same name will only return one column. As a workaround, rename the columns to have distinct names using
Column.alias
.df.select(lit(1), lit(1)).show() # col("a"), col("a") #--------- #|"'1'" | #--------- #|1 | #|... | #--------- # Workaround: Column.alias DataFrame.select(lit(1).alias("col1_1"), lit(1).alias("col1_2")) # "col1_1", "col1_2"
Explicit type casting using
Column.cast
has the limitation that format strings are not supported for inputs:to_decimal
,to_number
,to_numeric
,to_double
,to_date
,to_time
,to_timestamp
and outputs:to_char
,to_varchar
,to_binary
.JSON strings stored in
VariantType
cannot be converted toDatetime
types.For
Table.merge
andTable.update
the implementation only supports behavior when the session parametersERROR_ON_NONDETERMINISTIC_UPDATE
andERROR_ON_NONDETERMINISTIC_MERGE
are set toFalse
. This means for multi-joins, it updates one of the matched rows.
List of Supported APIs¶
Snowpark Session¶
Session.createDataFrame
Session.create_dataframe
Session.flatten
Session.range
Session.table
Input/Output¶
DataFrameReader.csv
DataFrameReader.table
DataFrameWriter.saveAsTable
DataFrameWriter.save_as_table
DataFrame¶
DataFrame.agg
DataFrame.cache_result
DataFrame.col
DataFrame.collect
DataFrame.collect_nowait
DataFrame.copy_into_table
DataFrame.count
DataFrame.createOrReplaceTempView
DataFrame.createOrReplaceView
DataFrame.create_or_replace_temp_view
DataFrame.create_or_replace_view
DataFrame.crossJoin
DataFrame.cross_join
DataFrame.distinct
DataFrame.drop
DataFrame.dropDuplicates
DataFrame.drop_duplicates
DataFrame.dropna
DataFrame.except_
DataFrame.explain
DataFrame.fillna
DataFrame.filter
DataFrame.first
DataFrame.groupBy
DataFrame.group_by
DataFrame.intersect
DataFrame.join
DataFrame.limit
DataFrame.minus
DataFrame.natural_join
DataFrame.orderBy
DataFrame.order_by
DataFrame.rename
DataFrame.replace
DataFrame.rollup
DataFrame.sample
DataFrame.select
DataFrame.show
DataFrame.sort
DataFrame.subtract
DataFrame.take
DataFrame.toDF
DataFrame.toLocalIterator
DataFrame.toPandas
DataFrame.to_df
DataFrame.to_local_iterator
DataFrame.to_pandas
DataFrame.to_pandas_batches
DataFrame.union
DataFrame.unionAll
DataFrame.unionAllByName
DataFrame.unionByName
DataFrame.union_all
DataFrame.union_all_by_name
DataFrame.union_by_name
DataFrame.unpivot
DataFrame.where
DataFrame.withColumn
DataFrame.withColumnRenamed
DataFrame.with_column
DataFrame.with_column_renamed
DataFrame.with_columns
DataFrameNaFunctions.drop
DataFrameNaFunctions.fill
DataFrameNaFunctions.replace
Column¶
Column.alias
Column.as_
Column.asc
Column.asc_nulls_first
Column.asc_nulls_last
Column.astype
Column.between
Column.bitand
Column.bitor
Column.bitwiseAnd
Column.bitwiseOR
Column.bitwiseXOR
Column.bitxor
Column.cast
Column.collate
Column.desc
Column.desc_nulls_first
Column.desc_nulls_last
Column.endswith
Column.eqNullSafe
Column.equal_nan
Column.equal_null
Column.getItem
Column.getName
Column.get_name
Column.in_
Column.isNotNull
Column.isNull
Column.is_not_null
Column.is_null
Column.isin
Column.like
Column.name
Column.over
Column.regexp
Column.rlike
Column.startswith
Column.substr
Column.substring
Column.try_cast
Column.within_group
CaseExpr.when
CaseExpr.otherwise
Data Types¶
ArrayType
BinaryType
BooleanType
ByteType
ColumnIdentifier
DataType
DateType
DecimalType
DoubleType
FloatType
IntegerType
LongType
MapType
NullType
ShortType
StringType
StructField
StructType
Timestamp
TimestampType
TimeType
Variant
VariantType
Row¶
Row.asDict
Row.as_dict
Row.count
Row.index
Function¶
abs
avg
coalesce
contains
count
count_distinct
covar_pop
endswith
first_value
iff
lag
last_value
lead
list_agg
max
median
min
parse_json
row_number
startswith
substring
sum
to_array
to_binary
to_boolean
to_char
to_date
to_decimal
to_double
to_object
to_time
to_timestamp
to_variant
Window¶
Window.orderBy
Window.order_by
Window.partitionBy
Window.partition_by
Window.rangeBetween
Window.range_between
Window.rowsBetween
Window.rows_between
WindowSpec.orderBy
WindowSpec.order_by
WindowSpec.partitionBy
WindowSpec.partition_by
WindowSpec.rangeBetween
WindowSpec.range_between
WindowSpec.rowsBetween
WindowSpec.rows_between
Grouping¶
RelationalGroupedDataFrame.agg
RelationalGroupedDataFrame.apply_in_pandas
RelationalGroupedDataFrame.applyInPandas
RelationalGroupedDataFrame.avg
RelationalGroupedDataFrame.builtin
RelationalGroupedDataFrame.count
RelationalGroupedDataFrame.function
RelationalGroupedDataFrame.max
RelationalGroupedDataFrame.mean
RelationalGroupedDataFrame.median
RelationalGroupedDataFrame.min
RelationalGroupedDataFrame.sum
Table¶
Table.delete
Table.drop_table
Table.merge
Table.sample
Table.update
WhenMatchedClause.delete
WhenMatchedClause.update
WhenNotMatchedClause.insert