Snowpark Connect for Spark compatibility guide¶
This guide documents the compatibility between the Snowpark Connect for Spark implementation of the Spark DataFrame APIs and native Apache Spark. It is intended to help users understand the key differences, unsupported features, and migration considerations when moving Spark workloads to Snowpark Connect for Spark.
Snowpark Connect for Spark aims to provide a familiar Spark DataFrame API experience on top of the Snowflake execution engine. However, there are the compatibility gaps described in this topic. This guide highlights those differences to help you plan and adapt your migration. These might be addressed in a future release.
DataTypes¶
Unsupported data types¶
Implicit data type conversion¶
When using Snowpark Connect for Spark, keep in mind how data types are handled. Snowpark Connect for Spark implicitly represents ByteType,
ShortType, and IntegerType as LongType. This means that while you might define columns or data with
ByteType, ShortType, or IntegerType, the data will be represented and returned by Snowpark Connect for Spark as
LongType. Similarly, implicit conversion might also occur for FloatType and
DoubleType depending on the specific operations and context. The Snowflake execution engine will internally handle data
type compression and may in fact store the data as Byte or Short, but these are considered implementation details and not exposed to the
end user.
Semantically, this representation will not impact the correctness of your Spark queries.
Data type from native PySpark |
Data type from Snowpark Connect for Spark |
|---|---|
|
|
|
|
|
|
|
|
The following example shows a difference in how Spark and Snowpark Connect for Spark handle data types in query results.
Query¶
query = """
SELECT * FROM VALUES
(float(1.0), double(1.0), 1.0, "1", true, :code:`NULL`),
(float(2.0), double(2.0), 2.0, "2", false, :code:`NULL`),
(float(3.0), double(3.0), :code:`NULL`, "3", false, :code:`NULL`)
AS tab(a, b, c, d, e, f)
"""
Spark¶
spark.sql(query).printSchema()
root
|-- a: float (nullable = false)
|-- b: double (nullable = false)
|-- c: decimal(2,1) (nullable = true)
|-- d: string (nullable = false)
|-- e: boolean (nullable = false)
|-- f: void (nullable = true)
Snowpark Connect for Spark¶
snowpark_connect_spark.sql(query).printSchema()
root
|-- a: double (nullable = false)
|-- b: double (nullable = false)
|-- c: decimal (nullable = true)
|-- d: string (nullable = false)
|-- e: boolean (nullable = true)
|-- f: string (nullable = true)
NullType nuance¶
Snowpark Connect for Spark doesn’t support the NullType
datatype, which is a supported data type in Spark. This causes behavior changes when using Null or None in dataframes.
In Spark, a literal NULL (for example, with lit(None)) is automatically inferred as a NullType. In Snowpark Connect for Spark, it is inferred as a
StringType during schema inference.
df = self.spark.range(1).select(lit(None).alias("null_col"))
field = df.schema["null_col"]
# Spark: StructField('null_col', :code:`NullType`(), True)
# Snowpark Connect for Spark: StructField('null_col', :code:`StringType`(), True)
Structured data types in ArrayType, MapType, and ObjectType¶
While structured type support is not available by default in Snowpark Connect for Spark, ARRAY, MAP and Object datatypes are
treated as generic, untyped collections. This means there is no enforcement of element types, field names, schema, or nullability, unlike
what would be provided by structured type support.
If you have a dependency on this support, please work with your account team to enable this feature for your account.
Unsupported Spark APIs¶
The following are the APIs supported by classic Spark and Spark Connect but not supported in Snowpark Connect for Spark.
Dataframe.hint: Snowpark Connect for Spark ignores any hint that is set on a dataframe. The Snowflake query optimizer automatically determines the most efficient execution strategy.
DataFrame.repartition: This is a no-op in Snowpark Connect for Spark. Snowflake automatically manages data distribution and partitioning across its distributed computing infrastructure.
pyspark.RDD: RDD APIs are not supported in Spark Connect (including Snowpark Connect for Spark).
UDF differences¶
StructType differences¶
When Spark converts a StructType to be used in a user-defined function (UDF), it converts it to a tuple type in Python. Snowpark Connect for Spark will convert a
StructType into a dict type in Python. This has fundamental differences in element access and output.
Spark will access indexes with 0, 1, 2, 3, and so on.
Snowpark Connect for Spark will access indexes using ‘_1’, ‘_2’, and so on.
def f(e):
return e[0]
df = self.spark.createDataFrame([((1.0, 1.0), (1, 1))], ["c1", "c2"])
result = df.select("*", udf(f, DoubleType())("c1"))
# This results in an index access issue. Workaround is to use _1, _2 as indicies.
# Workaround:
def f(e):
return e['_1']
row = (
self.spark.range(1)
.selectExpr("struct(1, 2) as struct")
.select(
udf(lambda x: x, "struct<col1:int,col2:int>")("struct"),
)
.first()
)
self.assertEquals(row[0], Row(col1=1, col2=2))
# Spark: Row(col1=1, col2=2)
# Snowpark Connect for Spark: {'col1': 1, 'col2': 2}
Iterator Type in UDFs¶
Iterator isn’t supported as a return type or as an input type.
# This will not work
def func(iterator):
for _ in iterator:
...
df = self.spark.range(10)
actual = df.repartition(1).mapInArrow(func, "a long").collect()
Importing files to a Python UDF¶
With Snowpark Connect for Spark, you can specify external libraries and files in Python UDFs. Snowflake includes Python files and archives in your code’s execution context. You can import functions from these included files in a UDF without additional steps. This dependency-handling behavior works as described in Creating a Python UDF with code uploaded from a stage.
To include external libraries and files, you provide stage paths to the files as the value of the configuration setting
snowpark.connect.udf.imports. The configuration value should be an array of stage paths to the files, where the paths are
separated by commas.
Code in the following example includes two files in the UDF’s execution context. The UDF imports functions from these files and uses them in its logic.
# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.imports", "[@stage/library.py, @other_lib.zip]")
@udf(returnType = StringType())
def import_example(input: str) -> str:
from library import first_function
from other_lib.custom import second_function
return first_function(input) + second_function(input)
spark.range(1).select(import_read_example("example_string")).show()
You can use the snowpark.connect.udf.imports setting to include other kinds of files as well, such as those with data your code
needs to read. Note that when you do this, your code should only read from the included files; any writes to such files will be lost after
the function’s execution ends.
# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.imports", "[@stage/data.csv]")
@udf(returnType = StringType())
def import_read_example(file_name: str) -> str:
with open(file_name) as f:
return f.read()
spark.range(1).select(import_read_example("data.csv")).show()
Lambda function limitations¶
User-defined functions (UDFs) are not supported within lambda expressions. This includes both custom UDFs and certain built-in functions whose underlying implementation relies on Snowflake UDFs. Attempting to use a UDF inside a lambda expression will result in an error.
df = spark.createDataFrame([({"a": 123},)], ("data",))
df.select(map_filter("data", lambda _, v: bit_count(v) > 3)).show() # does not work, since `bit_count` is implemented with UDF
Temporary views¶
By default, Snowpark Connect for Spark does not create a temporary view in Snowflake. You can specify that Snowpark Connect for Spark creates a temporary view by
setting the configuration parameter snowpark.connect.temporary.views.create_in_snowflake to true.
If the parameter is set to false, Snowpark Connect for Spark stores views as DataFrames without creating a Snowflake view. This helps to prevent
the issue that can occur when the view definition SQL created from Spark Connect request exceeds Snowflake view size limit (95KB).
Temporary views are normally visible when using Spark Connect Catalog API. However, they are not accessible when called from SQL statements
with configuration snowpark.connect.sql.passthrough set to true. To create Snowflake temporary views, set configuration
snowpark.connect.temporary.views.create_in_snowflake to true.
Data sources¶
Data source |
Compatibility issues compared with PySpark |
|---|---|
Avro |
File type is not supported. |
CSV |
Save mode is not supported for the following: The following options are not supported: |
JSON |
Save mode not supported for the following: The following options not supported: Difference in |
Orc |
File type is not supported. |
Parquet |
Save mode is not supported for the following: The following options are not supported: Configuration not supported: (ALL) |
Text |
Write mode not supported for the following: The following options are not supported: The |
XML |
File type is not supported. |
Snowflake table |
Write to table doesn’t need a provider format. Bucketing and partitioning are not supported. Storage format and versioning are not supported. |
Catalog¶
Snowflake Horizon Catalog provider support¶
Only Snowflake is supported as a catalog provider.
Unsupported catalog APIs¶
registerFunctionlistFunctionsgetFunctionfunctionExistscreateExternalTable
Partially supported catalog APIs¶
createTable(no external table support)
Iceberg¶
Snowflake managed iceberg table¶
Snowpark Connect for Spark works with Apache Iceberg™ tables, including externally managed Iceberg tables and catalog-linked databases.
Read¶
Time travel is not supported, including historical snapshot, branch, and incremental read.
Write¶
Using Spark SQL to create tables is not supported.
Schema merge is not supported.
To create the table, you must:
Create an external volume.
Link the external volume needs to the table creation in either of the following ways:
Set the EXTERNAL_VOLUME to the database.
Set
snowpark.connect.iceberg.external_volumeto Spark configuration.
External managed Iceberg table¶
Read¶
You must create a Snowflake unmanaged table entity.
Time travel is not supported, including historical snapshot, branch, and incremental read.
Write¶
Table creation is not supported.
Writing to the existing Iceberg table is supported.
Duplication column names¶
Snowflake does not support duplicate column names.
The following code fails at the view creation step with the following SQL compilation error: duplicate column name 'foo'.
df = spark.createDataFrame([
(1, 1),
(2, 2)
], ["foo", "foo"])
df.show() # works
df.createTempView("df_view") # Fails with SQL compilation error: duplicate column name 'foo'
To work around this, set the snowpark.connect.views.duplicate_column_names_handling_mode configuration option to one the following
values:
rename: A suffix such as_dedup_1,_dedup_2, and so on will be appended to all of the duplicate column names after the first one.drop: All of the duplicate columns except one will be dropped. This might lead to incorrect results if the columns have different values.