Snowpark Connect for Spark compatibility guide

This guide documents the compatibility between the Snowpark Connect for Spark implementation of the Spark DataFrame APIs and native Apache Spark. It is intended to help users understand the key differences, unsupported features, and migration considerations when moving Spark workloads to Snowpark Connect for Spark.

Snowpark Connect for Spark aims to provide a familiar Spark DataFrame API experience on top of the Snowflake execution engine. However, there are the compatibility gaps described in this topic. This guide highlights those differences to help you plan and adapt your migration. These might be addressed in a future release.

DataTypes

Unsupported data types

Implicit data type conversion

When using Snowpark Connect for Spark, keep in mind how data types are handled. Snowpark Connect for Spark implicitly represents ByteType, ShortType, and IntegerType as LongType. This means that while you might define columns or data with ByteType, ShortType, or IntegerType, the data will be represented and returned by Snowpark Connect for Spark as LongType. Similarly, implicit conversion might also occur for FloatType and DoubleType depending on the specific operations and context. The Snowflake execution engine will internally handle data type compression and may in fact store the data as Byte or Short, but these are considered implementation details and not exposed to the end user.

Semantically, this representation will not impact the correctness of your Spark queries.

Data type from native PySpark

Data type from Snowpark Connect for Spark

ByteType

LongType

ShortType

LongType

IntegerType

LongType

LongType

LongType

The following example shows a difference in how Spark and Snowpark Connect for Spark handle data types in query results.

Query

query = """
    SELECT * FROM VALUES
    (float(1.0), double(1.0), 1.0, "1", true, :code:`NULL`),
    (float(2.0), double(2.0), 2.0, "2", false, :code:`NULL`),
    (float(3.0), double(3.0), :code:`NULL`, "3", false, :code:`NULL`)
    AS tab(a, b, c, d, e, f)
    """
Copy

Spark

spark.sql(query).printSchema()
Copy
root
 |-- a: float (nullable = false)
 |-- b: double (nullable = false)
 |-- c: decimal(2,1) (nullable = true)
 |-- d: string (nullable = false)
 |-- e: boolean (nullable = false)
 |-- f: void (nullable = true)

Snowpark Connect for Spark

snowpark_connect_spark.sql(query).printSchema()
Copy
root
 |-- a: double (nullable = false)
 |-- b: double (nullable = false)
 |-- c: decimal (nullable = true)
 |-- d: string (nullable = false)
 |-- e: boolean (nullable = true)
 |-- f: string (nullable = true)

NullType nuance

Snowpark Connect for Spark doesn’t support the NullType datatype, which is a supported data type in Spark. This causes behavior changes when using Null or None in dataframes.

In Spark, a literal NULL (for example, with lit(None)) is automatically inferred as a NullType. In Snowpark Connect for Spark, it is inferred as a StringType during schema inference.

df = self.spark.range(1).select(lit(None).alias("null_col"))
field = df.schema["null_col"]

# Spark: StructField('null_col', :code:`NullType`(), True)
# |spconnect|: StructField('null_col', :code:`StringType`(), True)
Copy

Structured data types in ArrayType, MapType, and ObjectType

While structured type support is not available by default in Snowpark Connect for Spark, ARRAY, MAP and Object datatypes are treated as generic, untyped collections. This means there is no enforcement of element types, field names, schema, or nullability, unlike what would be provided by structured type support.

If you have a dependency on this support, please work with your account team to enable this feature for your account.

Unsupported Spark APIs

The following are the APIs supported by classic Spark and Spark Connect but not supported in Snowpark Connect for Spark.

  • Dataframe.hint: Snowpark Connect for Spark ignores any hint that is set on a dataframe. The Snowflake query optimizer automatically determines the most efficient execution strategy.

  • DataFrame.repartition: This is a no-op in Snowpark Connect for Spark. Snowflake automatically manages data distribution and partitioning across its distributed computing infrastructure.

  • pyspark.RDD: RDD APIs are not supported in Spark Connect (including Snowpark Connect for Spark).

  • pyspark.ml

  • pyspark streaming

UDF differences

StructType differences

When Spark converts a StructType to be used in a user-defined function (UDF), it converts it to a tuple type in Python. Snowpark Connect for Spark will convert a StructType into a dict type in Python. This has fundamental differences in element access and output.

  • Spark will access indexes with 0, 1, 2, 3, and so on.

  • Snowpark Connect for Spark will access indexes using ‘_1’, ‘_2’, and so on.

def f(e):
    return e[0]

    df = self.spark.createDataFrame([((1.0, 1.0), (1, 1))], ["c1", "c2"])
    result = df.select("*", udf(f, DoubleType())("c1"))

# This results in an index access issue. Workaround is to use _1, _2 as indicies.
# Workaround:

def f(e):
    return e['_1']

row = (
    self.spark.range(1)
    .selectExpr("struct(1, 2) as struct")
    .select(
        udf(lambda x: x, "struct<col1:int,col2:int>")("struct"),
    )
    .first()
)

self.assertEquals(row[0], Row(col1=1, col2=2))

# Spark: Row(col1=1, col2=2)

# |spconnect|: {'col1': 1, 'col2': 2}
Copy

Iterator Type in UDFs

Iterator isn’t supported as a return type or as an input type.

# This will not work
def func(iterator):
  for _ in iterator:
              ...

df = self.spark.range(10)
actual = df.repartition(1).mapInArrow(func, "a long").collect()
Copy

Lambda function limitations

While Snowpark Connect for Spark supports lambda expressions and higher-order functions (such as the transform function), it does not support referencing outer columns or expressions from within the lambda body.

This limitation is caused by restrictions on lambda expressions in Snowflake.

data = [
    (1, [1, 2, 3]),
    (2, [4, 5]),
    (3, [])
]
df = spark.createDataFrame(data, ["id", "numbers"])

df.select(transform(df.numbers, lambda el: el + array_size(df.numbers))).show() # this fails in Spark Connect for Snowpark, because of a
reference to df.numbers which is outside of the lambda context

df.select(transform(df.numbers, lambda el: negative(el) + 1)).show() # this works
Copy

Another limitation is that user-dDefined functions (UDFs) are not supported within lambda expressions. This includes both custom UDFs and certain built-in functions whose underlying implementation relies on Snowflake UDFs. Attempting to use a UDF inside a lambda expression will result in an error.

spark.range(1).select(split(lit('a,b,c'), ',')).show() # works
spark.range(1).select(transform(array(lit('a,b,c')), lambda x: split(x, ','))).show() # does not work, since `split` is implemented with UDF
Copy

Data sources

Data source

Compatibility issues compared with PySpark

Avro

File type is not supported.

CSV

Save mode is not supported for the following: Append, Ignore.

The following options are not supported: encoding, quote, quoteAll, escape, escapeQuotes, comment, preferDate, enforceSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nanValue, positiveInf, negativeInf, timestampNTZFormat, enableDateTimeParsingFallback, maxColumns, maxCharsPerColumn, mode, columnNameOfCorruptRecord, charToEscapeQuoteEscaping, samplingRatio, emptyValue, locale, lineSep, unescapedQuoteHandling, compression.

JSON

Save mode not supported for the following: Append, Ignore.

The following options not supported: timeZone, primitivesAsString, prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, allowNumericLeadingZeros, allowBackslashEscapingAnyCharacter, mode, columnNameOfCorruptRecord, timestampNTZFormat, enableDateTimeParsingFallback, allowUnquotedControlChars, encoding, lineSep, samplingRatio, dropFieldIfAllNull, locale, allowNonNumericNumbers, compression, ignoreNullFields.

Difference in Show: If the value of field is string, it would be quoted. An extra “n” character would be shown in result.

Orc

File type is not supported.

Parquet

Save mode is not supported for the following: Append, Ignore.

The following options are not supported: datetimeRebaseMode, int96RebaseMode, mergeSchema, compression.

Configuration not supported: (ALL)

Text

Write mode not supported for the following: Append, Ignore.

The following options are not supported: compression.

The lineSep parameter not supported in write.

XML

File type is not supported.

Snowflake table

Write to table doesn’t need a provider format.

Bucketing and partitioning are not supported.

Storage format and versioning are not supported.

Catalog

Snowflake Horizon Catalog provider support

  • Only Snowflake is supported as a catalog provider.

Unsupported catalog APIs

  • registerFunction

  • listFunctions

  • getFunction

  • functionExists

  • createExternalTable

Partially supported catalog APIs

  • createTable (no external table support)

Iceberg

Snowflake managed iceberg table

Snowpark Connect for Spark works with Apache Iceberg™ tables, including externally managed Iceberg tables and catalog-linked databases.

Read

Time travel is not supported, including historical snapshot, branch, and incremental read.

Write

  • Using Spark SQL to create tables is not supported.

  • Schema merge is not supported.

  • To create the table, you must:

    • Create an external volume.

    • Link the external volume needs to the table creation in either of the following ways:

      • Set the EXTERNAL_VOLUME to the database.

      • Set snowpark.connect.iceberg.external_volume to Spark configuration.

External managed Iceberg table

Read

  • You must create a Snowflake unmanaged table entity.

  • Time travel is not supported, including historical snapshot, branch, and incremental read.

Write

  • Table creation is not supported.

  • Writing to the existing Iceberg table is supported.

Duplication column names

Snowflake does not support duplicate column names.

The following code fails at the view creation step with the following SQL compilation error: duplicate column name 'foo'.

df = spark.createDataFrame([
(1, 1),
(2, 2)
], ["foo", "foo"])

df.show() # works

df.createTempView("df_view") # Fails with SQL compilation error: duplicate column name 'foo'
Copy

To work around this, set the snowpark.connect.views.duplicate_column_names_handling_mode configuration option to one the following values:

  • rename: A suffix such as _dedup_1, _dedup_2, and so on will be appended to all of the duplicate column names after the first one.

  • drop: All of the duplicate columns except one will be dropped. This might lead to incorrect results if the columns have different values.