Snowpark Connect for Spark compatibility guide¶
This guide documents the compatibility between the Snowpark Connect for Spark implementation of the Spark DataFrame APIs and native Apache Spark. It is intended to help users understand the key differences, unsupported features, and migration considerations when moving Spark workloads to Snowpark Connect for Spark.
Snowpark Connect for Spark aims to provide a familiar Spark DataFrame API experience on top of the Snowflake execution engine. However, there are the compatibility gaps described in this topic. This guide highlights those differences to help you plan and adapt your migration. These might be addressed in a future release.
DataTypes¶
Unsupported data types¶
Implicit data type conversion¶
When using Snowpark Connect for Spark, keep in mind how data types are handled. Snowpark Connect for Spark implicitly represents ByteType
,
ShortType
, and IntegerType
as LongType
. This means that while you might define columns or data with
ByteType
, ShortType
, or IntegerType
, the data will be represented and returned by Snowpark Connect for Spark as
LongType
. Similarly, implicit conversion might also occur for FloatType
and
DoubleType
depending on the specific operations and context. The Snowflake execution engine will internally handle data
type compression and may in fact store the data as Byte
or Short
, but these are considered implementation details and not exposed to the
end user.
Semantically, this representation will not impact the correctness of your Spark queries.
Data type from native PySpark |
Data type from Snowpark Connect for Spark |
---|---|
|
|
|
|
|
|
|
|
The following example shows a difference in how Spark and Snowpark Connect for Spark handle data types in query results.
Query¶
query = """
SELECT * FROM VALUES
(float(1.0), double(1.0), 1.0, "1", true, :code:`NULL`),
(float(2.0), double(2.0), 2.0, "2", false, :code:`NULL`),
(float(3.0), double(3.0), :code:`NULL`, "3", false, :code:`NULL`)
AS tab(a, b, c, d, e, f)
"""
Spark¶
spark.sql(query).printSchema()
root
|-- a: float (nullable = false)
|-- b: double (nullable = false)
|-- c: decimal(2,1) (nullable = true)
|-- d: string (nullable = false)
|-- e: boolean (nullable = false)
|-- f: void (nullable = true)
Snowpark Connect for Spark¶
snowpark_connect_spark.sql(query).printSchema()
root
|-- a: double (nullable = false)
|-- b: double (nullable = false)
|-- c: decimal (nullable = true)
|-- d: string (nullable = false)
|-- e: boolean (nullable = true)
|-- f: string (nullable = true)
NullType
nuance¶
Snowpark Connect for Spark doesn’t support the NullType
datatype, which is a supported data type in Spark. This causes behavior changes when using Null
or None
in dataframes.
In Spark, a literal NULL
(for example, with lit(None)
) is automatically inferred as a NullType
. In Snowpark Connect for Spark, it is inferred as a
StringType
during schema inference.
df = self.spark.range(1).select(lit(None).alias("null_col"))
field = df.schema["null_col"]
# Spark: StructField('null_col', :code:`NullType`(), True)
# |spconnect|: StructField('null_col', :code:`StringType`(), True)
Structured data types in ArrayType
, MapType
, and ObjectType
¶
While structured type support is not available by default in Snowpark Connect for Spark, ARRAY
, MAP
and Object
datatypes are
treated as generic, untyped collections. This means there is no enforcement of element types, field names, schema, or nullability, unlike
what would be provided by structured type support.
If you have a dependency on this support, please work with your account team to enable this feature for your account.
Unsupported Spark APIs¶
The following are the APIs supported by classic Spark and Spark Connect but not supported in Snowpark Connect for Spark.
Dataframe.hint: Snowpark Connect for Spark ignores any hint that is set on a dataframe. The Snowflake query optimizer automatically determines the most efficient execution strategy.
DataFrame.repartition: This is a no-op in Snowpark Connect for Spark. Snowflake automatically manages data distribution and partitioning across its distributed computing infrastructure.
pyspark.RDD: RDD APIs are not supported in Spark Connect (including Snowpark Connect for Spark).
UDF differences¶
StructType
differences¶
When Spark converts a StructType
to be used in a user-defined function (UDF), it converts it to a tuple
type in Python. Snowpark Connect for Spark will convert a
StructType
into a dict
type in Python. This has fundamental differences in element access and output.
Spark will access indexes with 0, 1, 2, 3, and so on.
Snowpark Connect for Spark will access indexes using ‘_1’, ‘_2’, and so on.
def f(e):
return e[0]
df = self.spark.createDataFrame([((1.0, 1.0), (1, 1))], ["c1", "c2"])
result = df.select("*", udf(f, DoubleType())("c1"))
# This results in an index access issue. Workaround is to use _1, _2 as indicies.
# Workaround:
def f(e):
return e['_1']
row = (
self.spark.range(1)
.selectExpr("struct(1, 2) as struct")
.select(
udf(lambda x: x, "struct<col1:int,col2:int>")("struct"),
)
.first()
)
self.assertEquals(row[0], Row(col1=1, col2=2))
# Spark: Row(col1=1, col2=2)
# |spconnect|: {'col1': 1, 'col2': 2}
Iterator Type in UDFs¶
Iterator isn’t supported as a return type or as an input type.
# This will not work
def func(iterator):
for _ in iterator:
...
df = self.spark.range(10)
actual = df.repartition(1).mapInArrow(func, "a long").collect()
Lambda function limitations¶
While Snowpark Connect for Spark supports lambda expressions and higher-order functions (such as the transform
function), it does not support
referencing outer columns or expressions from within the lambda body.
This limitation is caused by restrictions on lambda expressions in Snowflake.
data = [
(1, [1, 2, 3]),
(2, [4, 5]),
(3, [])
]
df = spark.createDataFrame(data, ["id", "numbers"])
df.select(transform(df.numbers, lambda el: el + array_size(df.numbers))).show() # this fails in Spark Connect for Snowpark, because of a
reference to df.numbers which is outside of the lambda context
df.select(transform(df.numbers, lambda el: negative(el) + 1)).show() # this works
Another limitation is that user-dDefined functions (UDFs) are not supported within lambda expressions. This includes both custom UDFs and certain built-in functions whose underlying implementation relies on Snowflake UDFs. Attempting to use a UDF inside a lambda expression will result in an error.
spark.range(1).select(split(lit('a,b,c'), ',')).show() # works
spark.range(1).select(transform(array(lit('a,b,c')), lambda x: split(x, ','))).show() # does not work, since `split` is implemented with UDF
Data sources¶
Data source |
Compatibility issues compared with PySpark |
---|---|
Avro |
File type is not supported. |
CSV |
Save mode is not supported for the following: The following options are not supported: |
JSON |
Save mode not supported for the following: The following options not supported: Difference in |
Orc |
File type is not supported. |
Parquet |
Save mode is not supported for the following: The following options are not supported: Configuration not supported: (ALL) |
Text |
Write mode not supported for the following: The following options are not supported: The |
XML |
File type is not supported. |
Snowflake table |
Write to table doesn’t need a provider format. Bucketing and partitioning are not supported. Storage format and versioning are not supported. |
Catalog¶
Snowflake Horizon Catalog provider support¶
Only Snowflake is supported as a catalog provider.
Unsupported catalog APIs¶
registerFunction
listFunctions
getFunction
functionExists
createExternalTable
Partially supported catalog APIs¶
createTable
(no external table support)
Iceberg¶
Snowflake managed iceberg table¶
Snowpark Connect for Spark works with Apache Iceberg™ tables, including externally managed Iceberg tables and catalog-linked databases.
Read¶
Time travel is not supported, including historical snapshot, branch, and incremental read.
Write¶
Using Spark SQL to create tables is not supported.
Schema merge is not supported.
To create the table, you must:
Create an external volume.
Link the external volume needs to the table creation in either of the following ways:
Set the EXTERNAL_VOLUME to the database.
Set
snowpark.connect.iceberg.external_volume
to Spark configuration.
External managed Iceberg table¶
Read¶
You must create a Snowflake unmanaged table entity.
Time travel is not supported, including historical snapshot, branch, and incremental read.
Write¶
Table creation is not supported.
Writing to the existing Iceberg table is supported.
Duplication column names¶
Snowflake does not support duplicate column names.
The following code fails at the view creation step with the following SQL compilation error: duplicate column name 'foo'
.
df = spark.createDataFrame([
(1, 1),
(2, 2)
], ["foo", "foo"])
df.show() # works
df.createTempView("df_view") # Fails with SQL compilation error: duplicate column name 'foo'
To work around this, set the snowpark.connect.views.duplicate_column_names_handling_mode
configuration option to one the following
values:
rename
: A suffix such as_dedup_1
,_dedup_2
, and so on will be appended to all of the duplicate column names after the first one.drop
: All of the duplicate columns except one will be dropped. This might lead to incorrect results if the columns have different values.