Guide de compatibilité Snowpark Connect for Spark

Ce guide documente la compatibilité entre la mise en œuvre Snowpark Connect for Spark des APIs DataFrame Spark et Apache Spark natif. Il est destiné à aider les utilisateurs à comprendre les différences clés, les fonctionnalités non prises en charge et les considérations de migration lors du déplacement de charges de travail Spark vers Snowpark Connect for Spark.

Snowpark Connect for Spark vise à fournir une expérience d’API DataFrame Spark habituelle en plus du moteur d’exécution Snowflake. Toutefois, il existe des écarts de compatibilité décrits dans cette rubrique. Ce guide met en évidence ces différences pour vous aider à planifier et à adapter votre migration. Celles-ci pourraient être résolues dans une prochaine version.

DataTypes

Types de données non pris en charge

Conversion implicite de type de données

Lorsque vous utilisez Snowpark Connect for Spark, gardez à l’esprit la façon dont les types de données sont traités. Snowpark Connect for Spark représente implicitement ByteType, ShortType et IntegerType comme LongType. Cela signifie que si vous pouvez définir des colonnes ou des données avec ByteType, ShortType ou IntegerType, les données seront représentées et renvoyées par Snowpark Connect for Spark comme LongType. De même, une conversion implicite peut également se produire pour FloatType et DoubleType en fonction des opérations et du contexte spécifiques. Le moteur d’exécution Snowflake gérera en interne la compression des types de données et peut stocker les données sous la forme Byte ou Short, mais ceux-ci sont considérés comme des détails de mise en œuvre et ne sont pas exposés à l’utilisateur final.

Sémantiquement, cette représentation n’aura pas d’impact sur l’exactitude de vos requêtes Spark.

Type de données de PySpark natif

Type de données de Snowpark Connect for Spark

ByteType

LongType

ShortType

LongType

IntegerType

LongType

LongType

LongType

L’exemple suivant montre une différence dans la manière dont Spark et Snowpark Connect for Spark gèrent les types de données dans les résultats des requêtes.

Requête

query = """
    SELECT * FROM VALUES
    (float(1.0), double(1.0), 1.0, "1", true, :code:`NULL`),
    (float(2.0), double(2.0), 2.0, "2", false, :code:`NULL`),
    (float(3.0), double(3.0), :code:`NULL`, "3", false, :code:`NULL`)
    AS tab(a, b, c, d, e, f)
    """
Copy

Spark

spark.sql(query).printSchema()
Copy
root
 |-- a: float (nullable = false)
 |-- b: double (nullable = false)
 |-- c: decimal(2,1) (nullable = true)
 |-- d: string (nullable = false)
 |-- e: boolean (nullable = false)
 |-- f: void (nullable = true)

Snowpark Connect for Spark

snowpark_connect_spark.sql(query).printSchema()
Copy
root
 |-- a: double (nullable = false)
 |-- b: double (nullable = false)
 |-- c: decimal (nullable = true)
 |-- d: string (nullable = false)
 |-- e: boolean (nullable = true)
 |-- f: string (nullable = true)

Nuance NullType

Snowpark Connect for Spark ne prend pas en charge le type de données NullType, qui est un type de données pris en charge dans Spark. Cela entraîne des changements de comportement lors de l’utilisation de Null ou None dans les dataframes.

Dans Spark, un NULL littéral (par exemple, avec lit(None)) est automatiquement déduit comme un NullType. Dans Snowpark Connect for Spark, il est déduit comme un StringType lors de l’inférence de schéma.

df = self.spark.range(1).select(lit(None).alias("null_col"))
field = df.schema["null_col"]

# Spark: StructField('null_col', :code:`NullType`(), True)
# Snowpark Connect for Spark: StructField('null_col', :code:`StringType`(), True)
Copy

Types de données structurés dans ArrayType, MapType et ObjectType

Bien que la prise en charge des types structurés ne soit pas disponible par défaut dans Snowpark Connect for Spark, les types de données ARRAY, MAP et Object sont traités comme des collections génériques et non typées. Cela signifie qu’il n’y a pas d’application des types d’éléments, des noms de champs, du schéma ou de la nullabilité, contrairement à ce qui serait fourni par la prise en charge des types structurés.

Si vous dépendez de cette prise en charge, veuillez contacter votre équipe de compte afin d’activer cette fonctionnalité pour votre compte.

APIs Spark non prises en charge

Les APIs suivantes sont prises en charge par Spark classique et Spark Connect mais non prises en charge par Snowpark Connect for Spark.

  • Dataframe.hint : Snowpark Connect for Spark ignore toute indication définie sur un dataframe. L’optimiseur de requêtes Snowflake détermine automatiquement la stratégie d’exécution la plus efficace.

  • DataFrame.repartition : Il s’agit d’une opération sans effet dans Snowpark Connect for Spark. Snowflake gère automatiquement la distribution et le partitionnement des données sur son infrastructure de calcul distribuée.

  • pyspark.RDD : les APIs RDD ne sont pas prises en charge dans Spark Connect (y compris Snowpark Connect for Spark).

  • pyspark.ml

  • pyspark streaming

Différences UDF

Différences StructType

Lorsque Spark convertit un StructType à utiliser dans une fonction définie par l’utilisateur (UDF), il le convertit en un type tuple en Python. Snowpark Connect for Spark convertira un StructType en un type dict en Python. Cela présente des différences fondamentales en matière de sortie et d’accès aux éléments.

  • Spark accédera aux index avec 0, 1, 2, 3, et ainsi de suite.

  • Snowpark Connect for Spark accédera aux index en utilisant “_1”, “_2”, etc.

def f(e):
    return e[0]

    df = self.spark.createDataFrame([((1.0, 1.0), (1, 1))], ["c1", "c2"])
    result = df.select("*", udf(f, DoubleType())("c1"))

# This results in an index access issue. Workaround is to use _1, _2 as indicies.
# Workaround:

def f(e):
    return e['_1']

row = (
    self.spark.range(1)
    .selectExpr("struct(1, 2) as struct")
    .select(
        udf(lambda x: x, "struct<col1:int,col2:int>")("struct"),
    )
    .first()
)

self.assertEquals(row[0], Row(col1=1, col2=2))

# Spark: Row(col1=1, col2=2)

# Snowpark Connect for Spark: {'col1': 1, 'col2': 2}
Copy

Type d’itérateur dans les UDFs

L’itérateur n’est pas pris en charge en tant que type de retour ou en tant que type d’entrée.

# This will not work
def func(iterator):
  for _ in iterator:
              ...

df = self.spark.range(10)
actual = df.repartition(1).mapInArrow(func, "a long").collect()
Copy

Importation de fichiers dans une UDF Python.

Avec Snowpark Connect for Spark, vous pouvez spécifier des bibliothèques externes et des fichiers dans les UDFs Python. Snowflake inclut des fichiers et des archives Python dans le contexte d’exécution de votre code. Vous pouvez importer des fonctions à partir de ces fichiers inclus dans une UDF sans étapes supplémentaires. Ce comportement de gestion des dépendances fonctionne comme décrit dans Création d’une UDF Python avec du code téléchargé à partir d’une zone de préparation.

Pour inclure des bibliothèques et des fichiers externes, vous fournissez des chemins d’accès aux zones de préparation aux fichiers en tant que valeur du paramètre de configuration snowpark.connect.udf.imports. La valeur de la configuration doit être un tableau de chemins d’accès aux zones de préparation, où les chemins sont séparés par des virgules.

Le code de l’exemple suivant inclut deux fichiers dans le contexte d’exécution de l’UDF. L’UDF importe des fonctions de ces fichiers et les utilise dans sa logique.

# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.imports", "[@stage/library.py, @other_lib.zip]")

@udf(returnType = StringType())
def import_example(input: str) -> str:
  from library import first_function
  from other_lib.custom import second_function

  return first_function(input) + second_function(input)

spark.range(1).select(import_read_example("example_string")).show()
Copy

Vous pouvez utiliser le paramètre snowpark.connect.udf.imports pour inclure d’autres types de fichiers, tels que ceux contenant des données que votre code doit lire. Notez que lorsque vous faites cela, votre code ne doit lire qu’à partir des fichiers inclus ; toutes les écritures dans ces fichiers seront perdues après la fin de l’exécution de la fonction.

# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.imports", "[@stage/data.csv]")

@udf(returnType = StringType())
def import_read_example(file_name: str) -> str:
  with open(file_name) as f:
    return f.read()

spark.range(1).select(import_read_example("data.csv")).show()
Copy

Limitations des fonctions lambda

User-defined functions (UDFs) are not supported within lambda expressions. This includes both custom UDFs and certain built-in functions whose underlying implementation relies on Snowflake UDFs. Attempting to use a UDF inside a lambda expression will result in an error.

df = spark.createDataFrame([({"a": 123},)], ("data",))
df.select(map_filter("data", lambda _, v: bit_count(v) > 3)).show() # does not work, since `bit_count` is implemented with UDF
Copy

Vues temporaires

Par défaut, Snowpark Connect for Spark ne crée pas de vue temporaire dans Snowflake. Vous pouvez spécifier que Snowpark Connect for Spark crée une vue temporaire en définissant le paramètre de configuration snowpark.connect.temporary.views.create_in_snowflake sur true.

Si le paramètre est défini sur false, Snowpark Connect for Spark stocke les vues sous forme de DataFrames sans créer de vue Snowflake. Ceci aide à éviter le problème qui peut survenir lorsque le SQL de la définition de la vue créé à partir de la requête Spark Connect dépasse la limite de taille de vue Snowflake (95KB).

Les vues temporaires sont normalement visibles lors de l’utilisation de l’API du catalogue Spark Connect. Cependant, elles ne sont pas accessibles lorsqu’elles sont appelées depuis des instructions SQL avec la configuration snowpark.connect.sql.passthrough définie sur true. Pour créer des vues temporaires Snowflake, définissez la configuration snowpark.connect.temporary.views.create_in_snowflake sur true.

Sources de données

Source de données

Problèmes de compatibilité par rapport à PySpark

Avro

Le type de fichier n’est pas pris en charge.

CSV

Le mode de sauvegarde n’est pas pris en charge dans les cas suivants : Append, Ignore.

Les options suivantes ne sont pas prises en charge : encoding, quote, quoteAll, escape, escapeQuotes, comment, preferDate, enforceSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nanValue, positiveInf, negativeInf, timestampNTZFormat, enableDateTimeParsingFallback, maxColumns, maxCharsPerColumn, mode, columnNameOfCorruptRecord, charToEscapeQuoteEscaping, samplingRatio, emptyValue, locale, lineSep, unescapedQuoteHandling, compression.

JSON

Le mode de sauvegarde n’est pas pris en charge dans les cas suivants : Append, Ignore.

Les options suivantes sont prises en charge : timeZone, primitivesAsString, prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, allowNumericLeadingZeros, allowBackslashEscapingAnyCharacter, mode, columnNameOfCorruptRecord, timestampNTZFormat, enableDateTimeParsingFallback, allowUnquotedControlChars, encoding, lineSep, samplingRatio, dropFieldIfAllNull, locale, allowNonNumericNumbers, compression, ignoreNullFields.

Différence dans Show : Si la valeur du champ est une chaîne, elle sera entre guillemets. Un caractère « n » supplémentaire serait affiché dans le résultat.

Orc

Le type de fichier n’est pas pris en charge.

Parquet

Le mode de sauvegarde n’est pas pris en charge dans les cas suivants : Append, Ignore.

Les options suivantes sont prises en charge : datetimeRebaseMode, int96RebaseMode, mergeSchema, compression.

Configuration non prise en charge : (ALL)

Texte

Le mode écriture n’est pas pris en charge dans les cas suivants : Append, Ignore.

Les options suivantes ne sont pas prises en charge : compression.

Le paramètre lineSep n’est pas pris en charge en mode écriture.

XML

Le type de fichier n’est pas pris en charge.

Table Snowflake

L’écriture dans la table ne nécessite pas de format fournisseur.

La compartimentation et le partitionnement ne sont pas pris en charge.

Le format de stockage et la gestion des versions ne sont pas pris en charge.

Catalogue

Prise en charge du fournisseur de catalogue Snowflake Horizon

  • Seul Snowflake est pris en charge en tant que fournisseur de catalogue.

APIs de catalogue non prises en charge

  • registerFunction

  • listFunctions

  • getFunction

  • functionExists

  • createExternalTable

APIs de catalogue partiellement prises en charge

  • createTable (pas de prise en charge des tables externes)

Iceberg

Table Iceberg gérée par Snowflake

Snowpark Connect pour Spark fonctionne avec les tables Apache Iceberg™, y compris les tables Iceberg gérées en externe et les bases de données liées à des catalogues.

Lire

Time Travel n’est pas pris en charge, y compris l’instantané historique, la branche et la lecture incrémentielle.

Écriture

  • L’utilisation du SQL Spark pour créer des tables n’est pas pris en charge.

  • La fusion de schémas n’est pas prise en charge.

  • Pour créer la table, vous devez :

    • Créer un volume externe.

    • Associez les besoins du volume externe à la création de la table de l’une des manières suivantes :

      • Définir le EXTERNAL_VOLUME dans la base de données.

      • Définir snowpark.connect.iceberg.external_volume dans la configuration Spark.

Table Iceberg gérée en externe

Lire

  • Vous devez créer une entité de table non gérée Snowflake.

  • Time Travel n’est pas pris en charge, y compris l’instantané historique, la branche et la lecture incrémentielle.

Écriture

  • La création de la table n’est pas prise en charge.

  • L’écriture dans la table Iceberg existante est prise en charge.

Noms de colonne de duplication

Snowflake ne prend pas en charge les noms de colonne dupliqués.

Le code suivant échoue à l’étape de création de la vue avec l’erreur de compilation SQL suivante : duplicate column name 'foo'.

df = spark.createDataFrame([
(1, 1),
(2, 2)
], ["foo", "foo"])

df.show() # works

df.createTempView("df_view") # Fails with SQL compilation error: duplicate column name 'foo'
Copy

Pour contourner ce problème, définissez l’option de configuration snowpark.connect.views.duplicate_column_names_handling_mode sur l’une des valeurs suivantes :

  • rename : Un suffixe tel que _dedup_1, _dedup_2, et ainsi de suite, sera ajouté à tous les noms de colonnes dupliqués après la première.

  • drop : Toutes les colonnes en double, sauf une, seront supprimées. Cela peut entraîner des résultats incorrects si les colonnes ont des valeurs différentes.