Snowpark Connect for Spark compatibility guide

This guide documents the compatibility between the Snowpark Connect for Spark implementation of the Spark DataFrame APIs and native Apache Spark. It is intended to help users understand the key differences, unsupported features, and migration considerations when moving Spark workloads to Snowpark Connect for Spark.

Snowpark Connect for Spark aims to provide a familiar Spark DataFrame API experience on top of the Snowflake execution engine. However, there are the compatibility gaps described in this topic. This guide highlights those differences to help you plan and adapt your migration. These might be addressed in a future release.

DataTypes

Unsupported data types

Implicit data type conversion

When using Snowpark Connect for Spark, keep in mind how data types are handled. Snowpark Connect for Spark implicitly represents ByteType, ShortType, and IntegerType as LongType. This means that while you might define columns or data with ByteType, ShortType, or IntegerType, the data will be represented and returned by Snowpark Connect for Spark as LongType. Similarly, implicit conversion might also occur for FloatType and DoubleType depending on the specific operations and context. The Snowflake execution engine will internally handle data type compression and may in fact store the data as Byte or Short, but these are considered implementation details and not exposed to the end user.

Semantically, this representation will not impact the correctness of your Spark queries.

Data type from native PySpark

Data type from Snowpark Connect for Spark

ByteType

LongType

ShortType

LongType

IntegerType

LongType

LongType

LongType

The following example shows a difference in how Spark and Snowpark Connect for Spark handle data types in query results.

Query

query = """
    SELECT * FROM VALUES
    (float(1.0), double(1.0), 1.0, "1", true, :code:`NULL`),
    (float(2.0), double(2.0), 2.0, "2", false, :code:`NULL`),
    (float(3.0), double(3.0), :code:`NULL`, "3", false, :code:`NULL`)
    AS tab(a, b, c, d, e, f)
    """
Copy

Spark

spark.sql(query).printSchema()
Copy
root
 |-- a: float (nullable = false)
 |-- b: double (nullable = false)
 |-- c: decimal(2,1) (nullable = true)
 |-- d: string (nullable = false)
 |-- e: boolean (nullable = false)
 |-- f: void (nullable = true)

Snowpark Connect for Spark

snowpark_connect_spark.sql(query).printSchema()
Copy
root
 |-- a: double (nullable = false)
 |-- b: double (nullable = false)
 |-- c: decimal (nullable = true)
 |-- d: string (nullable = false)
 |-- e: boolean (nullable = true)
 |-- f: string (nullable = true)

NullType nuance

Snowpark Connect for Spark doesn’t support the NullType datatype, which is a supported data type in Spark. This causes behavior changes when using Null or None in dataframes.

In Spark, a literal NULL (for example, with lit(None)) is automatically inferred as a NullType. In Snowpark Connect for Spark, it is inferred as a StringType during schema inference.

df = self.spark.range(1).select(lit(None).alias("null_col"))
field = df.schema["null_col"]

# Spark: StructField('null_col', :code:`NullType`(), True)
# Snowpark Connect for Spark: StructField('null_col', :code:`StringType`(), True)
Copy

Structured data types in ArrayType, MapType, and ObjectType

While structured type support is not available by default in Snowpark Connect for Spark, ARRAY, MAP and Object datatypes are treated as generic, untyped collections. This means there is no enforcement of element types, field names, schema, or nullability, unlike what would be provided by structured type support.

If you have a dependency on this support, please work with your account team to enable this feature for your account.

Unsupported Spark APIs

The following are the APIs supported by classic Spark and Spark Connect but not supported in Snowpark Connect for Spark.

  • Dataframe.hint: Snowpark Connect for Spark ignores any hint that is set on a dataframe. The Snowflake query optimizer automatically determines the most efficient execution strategy.

  • DataFrame.repartition: This is a no-op in Snowpark Connect for Spark. Snowflake automatically manages data distribution and partitioning across its distributed computing infrastructure.

  • pyspark.RDD: RDD APIs are not supported in Spark Connect (including Snowpark Connect for Spark).

  • pyspark.ml

  • pyspark streaming

UDF differences

StructType differences

When Spark converts a StructType to be used in a user-defined function (UDF), it converts it to a tuple type in Python. Snowpark Connect for Spark will convert a StructType into a dict type in Python. This has fundamental differences in element access and output.

  • Spark will access indexes with 0, 1, 2, 3, and so on.

  • Snowpark Connect for Spark will access indexes using ‘_1’, ‘_2’, and so on.

def f(e):
    return e[0]

    df = self.spark.createDataFrame([((1.0, 1.0), (1, 1))], ["c1", "c2"])
    result = df.select("*", udf(f, DoubleType())("c1"))

# This results in an index access issue. Workaround is to use _1, _2 as indicies.
# Workaround:

def f(e):
    return e['_1']

row = (
    self.spark.range(1)
    .selectExpr("struct(1, 2) as struct")
    .select(
        udf(lambda x: x, "struct<col1:int,col2:int>")("struct"),
    )
    .first()
)

self.assertEquals(row[0], Row(col1=1, col2=2))

# Spark: Row(col1=1, col2=2)

# Snowpark Connect for Spark: {'col1': 1, 'col2': 2}
Copy

Iterator Type in UDFs

Iterator isn’t supported as a return type or as an input type.

# This will not work
def func(iterator):
  for _ in iterator:
              ...

df = self.spark.range(10)
actual = df.repartition(1).mapInArrow(func, "a long").collect()
Copy

Importing files to a Python UDF

With Snowpark Connect for Spark, you can specify external libraries and files in Python UDFs. Snowflake includes Python files and archives in your code’s execution context. You can import functions from these included files in a UDF without additional steps. This dependency-handling behavior works as described in Creating a Python UDF with code uploaded from a stage.

To include external libraries and files, you provide stage paths to the files as the value of the configuration setting snowpark.connect.udf.imports. The configuration value should be an array of stage paths to the files, where the paths are separated by commas.

Code in the following example includes two files in the UDF’s execution context. The UDF imports functions from these files and uses them in its logic.

# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.imports", "[@stage/library.py, @other_lib.zip]")

@udf(returnType = StringType())
def import_example(input: str) -> str:
  from library import first_function
  from other_lib.custom import second_function

  return first_function(input) + second_function(input)

spark.range(1).select(import_read_example("example_string")).show()
Copy

You can use the snowpark.connect.udf.imports setting to include other kinds of files as well, such as those with data your code needs to read. Note that when you do this, your code should only read from the included files; any writes to such files will be lost after the function’s execution ends.

# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.imports", "[@stage/data.csv]")

@udf(returnType = StringType())
def import_read_example(file_name: str) -> str:
  with open(file_name) as f:
    return f.read()

spark.range(1).select(import_read_example("data.csv")).show()
Copy

Lambda function limitations

User-defined functions (UDFs) are not supported within lambda expressions. This includes both custom UDFs and certain built-in functions whose underlying implementation relies on Snowflake UDFs. Attempting to use a UDF inside a lambda expression will result in an error.

df = spark.createDataFrame([({"a": 123},)], ("data",))
df.select(map_filter("data", lambda _, v: bit_count(v) > 3)).show() # does not work, since `bit_count` is implemented with UDF
Copy

Temporary views

By default, Snowpark Connect for Spark does not create a temporary view in Snowflake. You can specify that Snowpark Connect for Spark creates a temporary view by setting the configuration parameter snowpark.connect.temporary.views.create_in_snowflake to true.

If the parameter is set to false, Snowpark Connect for Spark stores views as DataFrames without creating a Snowflake view. This helps to prevent the issue that can occur when the view definition SQL created from Spark Connect request exceeds Snowflake view size limit (95KB).

Temporary views are normally visible when using Spark Connect Catalog API. However, they are not accessible when called from SQL statements with configuration snowpark.connect.sql.passthrough set to true. To create Snowflake temporary views, set configuration snowpark.connect.temporary.views.create_in_snowflake to true.

Data sources

Data source

Compatibility issues compared with PySpark

Avro

File type is not supported.

CSV

Save mode is not supported for the following: Append, Ignore.

The following options are not supported: encoding, quote, quoteAll, escape, escapeQuotes, comment, preferDate, enforceSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nanValue, positiveInf, negativeInf, timestampNTZFormat, enableDateTimeParsingFallback, maxColumns, maxCharsPerColumn, mode, columnNameOfCorruptRecord, charToEscapeQuoteEscaping, samplingRatio, emptyValue, locale, lineSep, unescapedQuoteHandling, compression.

JSON

Save mode not supported for the following: Append, Ignore.

The following options not supported: timeZone, primitivesAsString, prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, allowNumericLeadingZeros, allowBackslashEscapingAnyCharacter, mode, columnNameOfCorruptRecord, timestampNTZFormat, enableDateTimeParsingFallback, allowUnquotedControlChars, encoding, lineSep, samplingRatio, dropFieldIfAllNull, locale, allowNonNumericNumbers, compression, ignoreNullFields.

Difference in Show: If the value of field is string, it would be quoted. An extra “n” character would be shown in result.

Orc

File type is not supported.

Parquet

Save mode is not supported for the following: Append, Ignore.

The following options are not supported: datetimeRebaseMode, int96RebaseMode, mergeSchema, compression.

Configuration not supported: (ALL)

Text

Write mode not supported for the following: Append, Ignore.

The following options are not supported: compression.

The lineSep parameter not supported in write.

XML

File type is not supported.

Snowflake table

Write to table doesn’t need a provider format.

Bucketing and partitioning are not supported.

Storage format and versioning are not supported.

Catalog

Snowflake Horizon Catalog provider support

  • Only Snowflake is supported as a catalog provider.

Unsupported catalog APIs

  • registerFunction

  • listFunctions

  • getFunction

  • functionExists

  • createExternalTable

Partially supported catalog APIs

  • createTable (no external table support)

Iceberg

Snowflake managed iceberg table

Snowpark Connect for Spark works with Apache Iceberg™ tables, including externally managed Iceberg tables and catalog-linked databases.

Read

Time travel is not supported, including historical snapshot, branch, and incremental read.

Write

  • Using Spark SQL to create tables is not supported.

  • Schema merge is not supported.

  • To create the table, you must:

    • Create an external volume.

    • Link the external volume needs to the table creation in either of the following ways:

      • Set the EXTERNAL_VOLUME to the database.

      • Set snowpark.connect.iceberg.external_volume to Spark configuration.

External managed Iceberg table

Read

  • You must create a Snowflake unmanaged table entity.

  • Time travel is not supported, including historical snapshot, branch, and incremental read.

Write

  • Table creation is not supported.

  • Writing to the existing Iceberg table is supported.

Duplication column names

Snowflake does not support duplicate column names.

The following code fails at the view creation step with the following SQL compilation error: duplicate column name 'foo'.

df = spark.createDataFrame([
(1, 1),
(2, 2)
], ["foo", "foo"])

df.show() # works

df.createTempView("df_view") # Fails with SQL compilation error: duplicate column name 'foo'
Copy

To work around this, set the snowpark.connect.views.duplicate_column_names_handling_mode configuration option to one the following values:

  • rename: A suffix such as _dedup_1, _dedup_2, and so on will be appended to all of the duplicate column names after the first one.

  • drop: All of the duplicate columns except one will be dropped. This might lead to incorrect results if the columns have different values.