Snowpark Connect for Spark-Kompatibilitätshandbuch

Diese Anleitung dokumentiert die Kompatibilität zwischen der Snowpark Connect for Spark-Implementierung der Spark-DataFrame-APIs und dem nativen Apache Spark. Sie soll Benutzern helfen, die wichtigsten Unterschiede, nicht unterstützten Features und Migrationsüberlegungen zu verstehen, wenn sie Spark-Workloads nach Snowpark Connect for Spark verschieben.

Snowpark Connect for Spark zielt darauf ab, eine vertraute Spark-DataFrame-API-Erfahrung zusätzlich zum Snowflake-Ausführungsmodul bereitzustellen. Es gibt jedoch die unter diesem Thema beschriebenen Kompatibilitätslücken. In diesem Leitfaden werden diese Unterschiede hervorgehoben, damit Sie Ihre Migration planen und anpassen können. Die Unterschiede werden eventuell in einem zukünftigen Release behoben.

DataTypes

Nicht unterstützte Datentypen

Implizite Datentypkonvertierung

Beachten Sie bei Verwendung von Snowpark Connect for Spark, wie mit Datentypen umgegangen werden soll. Snowpark Connect for Spark steht implizit für ByteType, ShortType`und :code:`IntegerType als LongType. Das bedeutet, dass Sie zwar Spalten oder Daten mit ByteType, ShortType oder IntegerType definieren können, die Daten aber durch Snowpark Connect for Spark als LongType dargestellt und zurückgegeben werden. Ebenso kann eine implizite Konvertierung auch für FloatType und DoubleType abhängig von den spezifischen Operationen und dem Kontext erfolgen. Das Snowflake-Ausführungsmodul übernimmt intern die Datentypkomprimierung und kann die Daten tatsächlich als Byte oder Short speichern. Diese gelten jedoch als Implementierungsdetails und sind für den Endbenutzer nicht zugänglich.

Semantisch hat diese Darstellung keinen Einfluss auf die Korrektheit Ihrer Spark-Abfragen.

Datentyp von nativem PySpark

Datentyp von Snowpark Connect for Spark

ByteType

LongType

ShortType

LongType

IntegerType

LongType

LongType

LongType

Das folgende Beispiel zeigt einen Unterschied bei der Behandlung von Datentypen in Abfrageergebnissen zwischen Spark und Snowpark Connect for Spark.

Abfrage

query = """
    SELECT * FROM VALUES
    (float(1.0), double(1.0), 1.0, "1", true, :code:`NULL`),
    (float(2.0), double(2.0), 2.0, "2", false, :code:`NULL`),
    (float(3.0), double(3.0), :code:`NULL`, "3", false, :code:`NULL`)
    AS tab(a, b, c, d, e, f)
    """
Copy

Spark

spark.sql(query).printSchema()
Copy
root
 |-- a: float (nullable = false)
 |-- b: double (nullable = false)
 |-- c: decimal(2,1) (nullable = true)
 |-- d: string (nullable = false)
 |-- e: boolean (nullable = false)
 |-- f: void (nullable = true)

Snowpark Connect for Spark

snowpark_connect_spark.sql(query).printSchema()
Copy
root
 |-- a: double (nullable = false)
 |-- b: double (nullable = false)
 |-- c: decimal (nullable = true)
 |-- d: string (nullable = false)
 |-- e: boolean (nullable = true)
 |-- f: string (nullable = true)

NullType-Nuance

Snowpark Connect for Spark unterstützt nicht den Datentyp NullType, der in Spark unterstützt wird. Dies führt zu Verhaltensänderungen bei der Verwendung von Null oder None in Datenframes.

In Spark wird ein Literal NULL (zum Beispiel mit lit(None)) automatisch als NullType abgeleitet. In Snowpark Connect for Spark wird es während der Schema-Inferenz als StringType abgeleitet.

df = self.spark.range(1).select(lit(None).alias("null_col"))
field = df.schema["null_col"]

# Spark: StructField('null_col', :code:`NullType`(), True)
# |spconnect|: StructField('null_col', :code:`StringType`(), True)
Copy

Strukturierte Datentypen in ArrayType, MapType und ObjectType

Die Unterstützung strukturierter Typen ist zwar standardmäßig in Snowpark Connect for Spark nicht verfügbar, aber ARRAY-, MAP- und Object-Datentypen werden als generische, nicht typisierte Sammlungen behandelt. Dies bedeutet, dass Elementtypen, Feldnamen, Schemas oder Null-Zulässigkeiten nicht erzwungen werden, im Gegensatz zu dem, was die Unterstützung strukturierter Typen bieten würde.

Wenn Sie von diesem Support abhängig sind, arbeiten Sie mit Ihrem Kundenbetreuer-Team zusammen, um dieses Feature für Ihr Konto zu aktivieren.

Nicht unterstützte Spark-APIs

Im Folgenden finden Sie die APIs, die von den klassischen Spark- und Spark Connect-Optionen, jedoch nicht in Snowpark Connect for Spark unterstützt werden.

  • Dataframe.hint: Snowpark Connect for Spark ignoriert jeden auf einem Datenframe festgelegten Hinweis. Die Snowflake-Abfrageoptimierung ermittelt automatisch die effizienteste Ausführungsstrategie.

  • DataFrame.repartition: Dies ist eine Nichtoperation in Snowpark Connect for Spark. Snowflake verwaltet automatisch die Verteilung und Partitionierung von Daten über seine verteilte Computing-Infrastruktur.

  • pysparkRDD: RDD-APIs werden von Spark Connect nicht unterstützt (einschließlich Snowpark Connect for Spark).

  • pyspark.ml

  • pyspark streaming

UDF-Unterschiede

StructType-Unterschiede

Wenn Spark einen StructType zur Verwendung in einer benutzerdefinierten Funktion (UDF) konvertiert, wird er in einen tuple -Typ in Python umgewandelt. Snowpark Connect for Spark konvertiert einen StructType in einen dict-Typ in Python. Es gibt grundlegende Unterschiede in Bezug auf den Elementzugriff und die Ausgabe.

  • Spark greift auf Indizes mit 0, 1, 2, 3 usw. zu.

  • Snowpark Connect for Spark greift mit „_1“, „_2“ usw. auf Indizes zu.

def f(e):
    return e[0]

    df = self.spark.createDataFrame([((1.0, 1.0), (1, 1))], ["c1", "c2"])
    result = df.select("*", udf(f, DoubleType())("c1"))

# This results in an index access issue. Workaround is to use _1, _2 as indicies.
# Workaround:

def f(e):
    return e['_1']

row = (
    self.spark.range(1)
    .selectExpr("struct(1, 2) as struct")
    .select(
        udf(lambda x: x, "struct<col1:int,col2:int>")("struct"),
    )
    .first()
)

self.assertEquals(row[0], Row(col1=1, col2=2))

# Spark: Row(col1=1, col2=2)

# |spconnect|: {'col1': 1, 'col2': 2}
Copy

Iteratortyp in UDFs

Der Iterator wird nicht als Rückgabetyp oder als Eingabetyp unterstützt.

# This will not work
def func(iterator):
  for _ in iterator:
              ...

df = self.spark.range(10)
actual = df.repartition(1).mapInArrow(func, "a long").collect()
Copy

Einschränkungen der Lambda-Funktion

Während Snowpark Connect for Spark Lambda-Ausdrücke und Funktionen höherer Ordnung (wie die transform-Funktion) unterstützt, wird das Referenzieren von äußeren Spalten oder Ausdrücken innerhalb des Lambda-Textkörpers nicht unterstützt.

Diese Einschränkung wird durch -Einschränkungen für Lambda-Ausdrücke in Snowflake verursacht.

data = [
    (1, [1, 2, 3]),
    (2, [4, 5]),
    (3, [])
]
df = spark.createDataFrame(data, ["id", "numbers"])

df.select(transform(df.numbers, lambda el: el + array_size(df.numbers))).show() # this fails in Spark Connect for Snowpark, because of a
reference to df.numbers which is outside of the lambda context

df.select(transform(df.numbers, lambda el: negative(el) + 1)).show() # this works
Copy

Eine weitere Einschränkung ist, dass benutzerdefinierte Funktionen (UDFs) in Lambda-Ausdrücken nicht unterstützt werden. Dies gilt sowohl für benutzerdefinierte UDFs als auch für bestimmte integrierte Funktionen, deren zugrunde liegende Implementierung auf Snowflake-UDFs basiert. Der Versuch, eine UDF innerhalb eines Lambda-Ausdrucks zu verwenden, führt zu einem Fehler.

spark.range(1).select(split(lit('a,b,c'), ',')).show() # works
spark.range(1).select(transform(array(lit('a,b,c')), lambda x: split(x, ','))).show() # does not work, since `split` is implemented with UDF
Copy

Datenquellen

Datenquelle

Kompatibilitätsprobleme im Vergleich zu PySpark

Avro

Dateityp wird nicht unterstützt.

CSV

Der Speichermodus wird für folgende Optionen nicht unterstützt: Append, Ignore.

Die folgenden Optionen werden nicht unterstützt: encoding, quote, quoteAll, escape, escapeQuotes, comment, preferDate, enforceSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nanValue, positiveInf, negativeInf, timestampNTZFormat, enableDateTimeParsingFallback, maxColumns, maxCharsPerColumn, mode, columnNameOfCorruptRecord, charToEscapeQuoteEscaping, samplingRatio, emptyValue, locale, lineSep, unescapedQuoteHandling, compression.

JSON

Der Speichermodus wird für folgende Optionen nicht unterstützt: Append, Ignore.

Die folgenden Optionen werden nicht unterstützt: timeZone, primitivesAsString, prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, allowNumericLeadingZeros, allowBackslashEscapingAnyCharacter, mode, columnNameOfCorruptRecord, timestampNTZFormat, enableDateTimeParsingFallback, allowUnquotedControlChars, encoding, lineSep, samplingRatio, dropFieldIfAllNull, locale, allowNonNumericNumbers, compression, ignoreNullFields.

Unterschied bei Show: Wenn der Wert des Feldes eine Zeichenfolge ist, wird er in Anführungszeichen gesetzt. Im Ergebnis wird ein zusätzliches „n“-Zeichen angezeigt.

Orc

Dateityp wird nicht unterstützt.

Parquet

Der Speichermodus wird für folgende Optionen nicht unterstützt: Append, Ignore.

Die folgenden Optionen werden nicht unterstützt: datetimeRebaseMode, int96RebaseMode, mergeSchema, compression.

Konfiguration nicht unterstützt: (ALL)

Text

Der Schreibmodus wird für folgende Optionen nicht unterstützt: Append, Ignore.

Die folgenden Optionen werden nicht unterstützt: compression.

Der lineSep-Parameter wird beim Schreiben nicht unterstützt.

XML

Dateityp wird nicht unterstützt.

Snowflake-Tabelle

Für das Schreiben in Tabelle ist kein Anbieterformat erforderlich.

Bucketing und Partitionierung werden nicht unterstützt.

Speicherformat und Versionierung werden nicht unterstützt.

Katalog

Unterstützung von Snowflake Horizon Catalog-Anbietern

  • Nur Snowflake wird als Kataloganbieter unterstützt.

Nicht unterstützte Katalog-APIs

  • registerFunction

  • listFunctions

  • getFunction

  • functionExists

  • createExternalTable

Teilweise unterstützte Katalog-APIs

  • createTable (keine Unterstützung externer Tabellen)

Iceberg

Snowflake-verwaltete Iceberg-Tabelle

Snowpark Connect für Spark funktioniert mit Apache Iceberg™-Tabellen, einschließlich extern verwalteter Iceberg-Tabellen und katalogverknüpfter Datenbanken.

Lesen

Time Travel wird nicht unterstützt. Dies umfasst auch historische Snapshots, Verzweigungen und inkrementelles Lesen.

Schreiben

  • Spark SQL zum Erstellen von Tabellen wird nicht unterstützt.

  • Schemazusammenführung wird nicht unterstützt.

  • Gehen Sie wie folgt vor, um die Tabelle zu erstellen:

    • Erstellen Sie ein externes Volume.

    • Verknüpfen Sie das externe Volume auf eine der folgenden Arten mit der Tabellenerstellung:

      • Legen Sie als EXTERNAL_VOLUME die Datenbank fest.

      • Legen Sie für snowpark.connect.iceberg.external_volume die Spark-Konfiguration fest.

Externe verwaltete Iceberg-Tabelle

Lesen

  • Sie müssen eine nicht verwaltete Tabellenentität von Snowflake erstellen.

  • Time Travel wird nicht unterstützt. Dies umfasst auch historische Snapshots, Verzweigungen und inkrementelles Lesen.

Schreiben

  • Das Erstellen von Tabellen wird nicht unterstützt.

  • Das Schreiben in die bestehende Iceberg-Tabelle wird unterstützt.

Doppelte Spaltennamen

Snowflake unterstützt keine doppelten Spaltennamen.

Der folgende Code schlägt beim Schritt „Ansicht erstellen“ mit den folgenden SQL-Kompilierungsfehler fehl: duplicate column name 'foo'.

df = spark.createDataFrame([
(1, 1),
(2, 2)
], ["foo", "foo"])

df.show() # works

df.createTempView("df_view") # Fails with SQL compilation error: duplicate column name 'foo'
Copy

Um dies zu umgehen, setzen Sie die Konfigurationsoption snowpark.connect.views.duplicate_column_names_handling_mode auf einen der folgenden Werte:

  • rename: Ein Suffix wie _dedup_1, _dedup_2 usw. wird an alle doppelten Spaltennamen mit Ausnahme des ersten Vorkommens des Namens angehängt.

  • drop: Alle doppelten Spalten bis auf eine werden gelöscht. Dies kann zu falschen Ergebnissen führen, wenn die Spalten unterschiedliche Werte enthalten.