Snowpark Connect for Spark-Kompatibilitätshandbuch

Diese Anleitung dokumentiert die Kompatibilität zwischen der Snowpark Connect for Spark-Implementierung der Spark-DataFrame-APIs und dem nativen Apache Spark. Sie soll Benutzern helfen, die wichtigsten Unterschiede, nicht unterstützten Features und Migrationsüberlegungen zu verstehen, wenn sie Spark-Workloads nach Snowpark Connect for Spark verschieben.

Snowpark Connect for Spark zielt darauf ab, eine vertraute Spark-DataFrame-API-Erfahrung zusätzlich zum Snowflake-Ausführungsmodul bereitzustellen. Es gibt jedoch die unter diesem Thema beschriebenen Kompatibilitätslücken. In diesem Leitfaden werden diese Unterschiede hervorgehoben, damit Sie Ihre Migration planen und anpassen können. Die Unterschiede werden eventuell in einem zukünftigen Release behoben.

DataTypes

Nicht unterstützte Datentypen

Implizite Datentypkonvertierung

Beachten Sie bei Verwendung von Snowpark Connect for Spark, wie mit Datentypen umgegangen werden soll. Snowpark Connect for Spark steht implizit für ByteType, ShortType`und :code:`IntegerType als LongType. Das bedeutet, dass Sie zwar Spalten oder Daten mit ByteType, ShortType oder IntegerType definieren können, die Daten aber durch Snowpark Connect for Spark als LongType dargestellt und zurückgegeben werden. Ebenso kann eine implizite Konvertierung auch für FloatType und DoubleType abhängig von den spezifischen Operationen und dem Kontext erfolgen. Das Snowflake-Ausführungsmodul übernimmt intern die Datentypkomprimierung und kann die Daten tatsächlich als Byte oder Short speichern. Diese gelten jedoch als Implementierungsdetails und sind für den Endbenutzer nicht zugänglich.

Semantisch hat diese Darstellung keinen Einfluss auf die Korrektheit Ihrer Spark-Abfragen.

Datentyp von nativem PySpark

Datentyp von Snowpark Connect for Spark

ByteType

LongType

ShortType

LongType

IntegerType

LongType

LongType

LongType

Das folgende Beispiel zeigt einen Unterschied bei der Behandlung von Datentypen in Abfrageergebnissen zwischen Spark und Snowpark Connect for Spark.

Abfrage

query = """
    SELECT * FROM VALUES
    (float(1.0), double(1.0), 1.0, "1", true, :code:`NULL`),
    (float(2.0), double(2.0), 2.0, "2", false, :code:`NULL`),
    (float(3.0), double(3.0), :code:`NULL`, "3", false, :code:`NULL`)
    AS tab(a, b, c, d, e, f)
    """
Copy

Spark

spark.sql(query).printSchema()
Copy
root
 |-- a: float (nullable = false)
 |-- b: double (nullable = false)
 |-- c: decimal(2,1) (nullable = true)
 |-- d: string (nullable = false)
 |-- e: boolean (nullable = false)
 |-- f: void (nullable = true)

Snowpark Connect for Spark

snowpark_connect_spark.sql(query).printSchema()
Copy
root
 |-- a: double (nullable = false)
 |-- b: double (nullable = false)
 |-- c: decimal (nullable = true)
 |-- d: string (nullable = false)
 |-- e: boolean (nullable = true)
 |-- f: string (nullable = true)

NullType-Nuance

Snowpark Connect for Spark unterstützt nicht den Datentyp NullType, der in Spark unterstützt wird. Dies führt zu Verhaltensänderungen bei der Verwendung von Null oder None in Datenframes.

In Spark wird ein Literal NULL (zum Beispiel mit lit(None)) automatisch als NullType abgeleitet. In Snowpark Connect for Spark wird es während der Schema-Inferenz als StringType abgeleitet.

df = self.spark.range(1).select(lit(None).alias("null_col"))
field = df.schema["null_col"]

# Spark: StructField('null_col', :code:`NullType`(), True)
# Snowpark Connect for Spark: StructField('null_col', :code:`StringType`(), True)
Copy

Strukturierte Datentypen in ArrayType, MapType und ObjectType

Die Unterstützung strukturierter Typen ist zwar standardmäßig in Snowpark Connect for Spark nicht verfügbar, aber ARRAY-, MAP- und Object-Datentypen werden als generische, nicht typisierte Sammlungen behandelt. Dies bedeutet, dass Elementtypen, Feldnamen, Schemas oder Null-Zulässigkeiten nicht erzwungen werden, im Gegensatz zu dem, was die Unterstützung strukturierter Typen bieten würde.

Wenn Sie von diesem Support abhängig sind, arbeiten Sie mit Ihrem Kundenbetreuer-Team zusammen, um dieses Feature für Ihr Konto zu aktivieren.

Nicht unterstützte Spark-APIs

Im Folgenden finden Sie die APIs, die von den klassischen Spark- und Spark Connect-Optionen, jedoch nicht in Snowpark Connect for Spark unterstützt werden.

  • Dataframe.hint: Snowpark Connect for Spark ignoriert jeden auf einem Datenframe festgelegten Hinweis. Die Snowflake-Abfrageoptimierung ermittelt automatisch die effizienteste Ausführungsstrategie.

  • DataFrame.repartition: Dies ist eine Nichtoperation in Snowpark Connect for Spark. Snowflake verwaltet automatisch die Verteilung und Partitionierung von Daten über seine verteilte Computing-Infrastruktur.

  • pysparkRDD: RDD-APIs werden von Spark Connect nicht unterstützt (einschließlich Snowpark Connect for Spark).

  • pyspark.ml

  • pyspark streaming

UDF-Unterschiede

StructType-Unterschiede

Wenn Spark einen StructType zur Verwendung in einer benutzerdefinierten Funktion (UDF) konvertiert, wird er in einen tuple -Typ in Python umgewandelt. Snowpark Connect for Spark konvertiert einen StructType in einen dict-Typ in Python. Es gibt grundlegende Unterschiede in Bezug auf den Elementzugriff und die Ausgabe.

  • Spark greift auf Indizes mit 0, 1, 2, 3 usw. zu.

  • Snowpark Connect for Spark greift mit „_1“, „_2“ usw. auf Indizes zu.

def f(e):
    return e[0]

    df = self.spark.createDataFrame([((1.0, 1.0), (1, 1))], ["c1", "c2"])
    result = df.select("*", udf(f, DoubleType())("c1"))

# This results in an index access issue. Workaround is to use _1, _2 as indicies.
# Workaround:

def f(e):
    return e['_1']

row = (
    self.spark.range(1)
    .selectExpr("struct(1, 2) as struct")
    .select(
        udf(lambda x: x, "struct<col1:int,col2:int>")("struct"),
    )
    .first()
)

self.assertEquals(row[0], Row(col1=1, col2=2))

# Spark: Row(col1=1, col2=2)

# Snowpark Connect for Spark: {'col1': 1, 'col2': 2}
Copy

Iteratortyp in UDFs

Der Iterator wird nicht als Rückgabetyp oder als Eingabetyp unterstützt.

# This will not work
def func(iterator):
  for _ in iterator:
              ...

df = self.spark.range(10)
actual = df.repartition(1).mapInArrow(func, "a long").collect()
Copy

Importieren von Dateien in eine Python-UDF

Mit Snowpark Connect for Spark können Sie externe Bibliotheken und Dateien in Python-UDFs angeben. Snowflake schließt Python-Dateien und -Archive in den Ausführungskontext Ihres Codes ein. Sie können Funktionen aus diesen enthaltenen Dateien ohne zusätzliche Schritte in eine UDF importieren. Dieses Verhalten bei der Behandlung von Abhängigkeiten funktioniert wie unter Erstellen eines Python-UDF mit aus einem Stagingbereich hochgeladenem Code beschrieben.

Um externe Bibliotheken und Dateien einzubeziehen, geben Sie Stagingbereichspfade zu den Dateien als Wert der Konfigurationseinstellung snowpark.connect.udf.imports an. Der Wert der Konfiguration sollte ein Array von Stagingbereichspfaden zu den Dateien sein, wobei die Pfade durch Kommas getrennt sind.

Der Code im folgenden Beispiel enthält zwei Dateien im UDF-Ausführungskontext. Die UDF importiert Funktionen aus diesen Dateien und verwendet sie in ihrer Logik.

# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.imports", "[@stage/library.py, @other_lib.zip]")

@udf(returnType = StringType())
def import_example(input: str) -> str:
  from library import first_function
  from other_lib.custom import second_function

  return first_function(input) + second_function(input)

spark.range(1).select(import_read_example("example_string")).show()
Copy

Sie können die snowpark.connect.udf.imports-Einstellung verwenden, um auch andere Arten von Dateien einzuschließen, z. B. Dateien mit Daten, die Ihr Code lesen muss. Beachten Sie, dass Ihr Code in diesem Fall nur aus den eingebundenen Dateien lesen sollte. Alle Schreibvorgänge in solche Dateien gehen nach Beendigung der Ausführung der Funktion verloren.

# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.imports", "[@stage/data.csv]")

@udf(returnType = StringType())
def import_read_example(file_name: str) -> str:
  with open(file_name) as f:
    return f.read()

spark.range(1).select(import_read_example("data.csv")).show()
Copy

Einschränkungen der Lambda-Funktion

User-defined functions (UDFs) are not supported within lambda expressions. This includes both custom UDFs and certain built-in functions whose underlying implementation relies on Snowflake UDFs. Attempting to use a UDF inside a lambda expression will result in an error.

df = spark.createDataFrame([({"a": 123},)], ("data",))
df.select(map_filter("data", lambda _, v: bit_count(v) > 3)).show() # does not work, since `bit_count` is implemented with UDF
Copy

Temporäre Ansichten

Standardmäßig erstellt Snowpark Connect for Spark keine temporäre Ansicht in Snowflake. Sie können angeben, dass Snowpark Connect for Spark eine temporäre Ansicht erstellt, indem der Konfigurationsparameter snowpark.connect.temporary.views.create_in_snowflake auf true gesetzt wird.

Wenn Parameter auf false gesetzt ist, speichert Snowpark Connect for Spark Ansichten als DataFrames, ohne eine Snowflake-Ansicht zu erstellen. Dies hilft, das Problem zu vermeiden, das auftreten kann, wenn die aus der Spark Connect-Anfrage von SQL erstellte Ansichtsdefinition die Größenbeschränkung der Snowflake-Ansicht (95KB) überschreitet.

Temporäre Ansichten sind normalerweise sichtbar, wenn Sie die Spark Connect-Katalog-API verwenden. Sie sind jedoch nicht zugänglich, wenn sie von SQL-Anweisungen mit der Konfigurationseinstellung für snowpark.connect.sql.passthrough auf true aufgerufen werden. Um temporäre Snowflake-Ansichten zu erstellen, stellen Sie die Konfigurationseinstellung snowpark.connect.temporary.views.create_in_snowflake auf true ein.

Datenquellen

Datenquelle

Kompatibilitätsprobleme im Vergleich zu PySpark

Avro

Dateityp wird nicht unterstützt.

CSV

Der Speichermodus wird für folgende Optionen nicht unterstützt: Append, Ignore.

Die folgenden Optionen werden nicht unterstützt: encoding, quote, quoteAll, escape, escapeQuotes, comment, preferDate, enforceSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nanValue, positiveInf, negativeInf, timestampNTZFormat, enableDateTimeParsingFallback, maxColumns, maxCharsPerColumn, mode, columnNameOfCorruptRecord, charToEscapeQuoteEscaping, samplingRatio, emptyValue, locale, lineSep, unescapedQuoteHandling, compression.

JSON

Der Speichermodus wird für folgende Optionen nicht unterstützt: Append, Ignore.

Die folgenden Optionen werden nicht unterstützt: timeZone, primitivesAsString, prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, allowNumericLeadingZeros, allowBackslashEscapingAnyCharacter, mode, columnNameOfCorruptRecord, timestampNTZFormat, enableDateTimeParsingFallback, allowUnquotedControlChars, encoding, lineSep, samplingRatio, dropFieldIfAllNull, locale, allowNonNumericNumbers, compression, ignoreNullFields.

Unterschied bei Show: Wenn der Wert des Feldes eine Zeichenfolge ist, wird er in Anführungszeichen gesetzt. Im Ergebnis wird ein zusätzliches „n“-Zeichen angezeigt.

Orc

Dateityp wird nicht unterstützt.

Parquet

Der Speichermodus wird für folgende Optionen nicht unterstützt: Append, Ignore.

Die folgenden Optionen werden nicht unterstützt: datetimeRebaseMode, int96RebaseMode, mergeSchema, compression.

Konfiguration nicht unterstützt: (ALL)

Text

Der Schreibmodus wird für folgende Optionen nicht unterstützt: Append, Ignore.

Die folgenden Optionen werden nicht unterstützt: compression.

Der lineSep-Parameter wird beim Schreiben nicht unterstützt.

XML

Dateityp wird nicht unterstützt.

Snowflake-Tabelle

Für das Schreiben in Tabelle ist kein Anbieterformat erforderlich.

Bucketing und Partitionierung werden nicht unterstützt.

Speicherformat und Versionierung werden nicht unterstützt.

Katalog

Unterstützung von Snowflake Horizon Catalog-Anbietern

  • Nur Snowflake wird als Kataloganbieter unterstützt.

Nicht unterstützte Katalog-APIs

  • registerFunction

  • listFunctions

  • getFunction

  • functionExists

  • createExternalTable

Teilweise unterstützte Katalog-APIs

  • createTable (keine Unterstützung externer Tabellen)

Iceberg

Snowflake-verwaltete Iceberg-Tabelle

Snowpark Connect für Spark funktioniert mit Apache Iceberg™-Tabellen, einschließlich extern verwalteter Iceberg-Tabellen und katalogverknüpfter Datenbanken.

Lesen

Time Travel wird nicht unterstützt. Dies umfasst auch historische Snapshots, Verzweigungen und inkrementelles Lesen.

Schreiben

  • Spark SQL zum Erstellen von Tabellen wird nicht unterstützt.

  • Schemazusammenführung wird nicht unterstützt.

  • Gehen Sie wie folgt vor, um die Tabelle zu erstellen:

    • Erstellen Sie ein externes Volume.

    • Verknüpfen Sie das externe Volume auf eine der folgenden Arten mit der Tabellenerstellung:

      • Legen Sie als EXTERNAL_VOLUME die Datenbank fest.

      • Legen Sie für snowpark.connect.iceberg.external_volume die Spark-Konfiguration fest.

Externe verwaltete Iceberg-Tabelle

Lesen

  • Sie müssen eine nicht verwaltete Tabellenentität von Snowflake erstellen.

  • Time Travel wird nicht unterstützt. Dies umfasst auch historische Snapshots, Verzweigungen und inkrementelles Lesen.

Schreiben

  • Das Erstellen von Tabellen wird nicht unterstützt.

  • Das Schreiben in die bestehende Iceberg-Tabelle wird unterstützt.

Doppelte Spaltennamen

Snowflake unterstützt keine doppelten Spaltennamen.

Der folgende Code schlägt beim Schritt „Ansicht erstellen“ mit den folgenden SQL-Kompilierungsfehler fehl: duplicate column name 'foo'.

df = spark.createDataFrame([
(1, 1),
(2, 2)
], ["foo", "foo"])

df.show() # works

df.createTempView("df_view") # Fails with SQL compilation error: duplicate column name 'foo'
Copy

Um dies zu umgehen, setzen Sie die Konfigurationsoption snowpark.connect.views.duplicate_column_names_handling_mode auf einen der folgenden Werte:

  • rename: Ein Suffix wie _dedup_1, _dedup_2 usw. wird an alle doppelten Spaltennamen mit Ausnahme des ersten Vorkommens des Namens angehängt.

  • drop: Alle doppelten Spalten bis auf eine werden gelöscht. Dies kann zu falschen Ergebnissen führen, wenn die Spalten unterschiedliche Werte enthalten.