Guide de compatibilité Snowpark Connect for Spark¶
Ce guide documente la compatibilité entre la mise en œuvre Snowpark Connect for Spark des APIs DataFrame Spark et Apache Spark natif. Il est destiné à aider les utilisateurs à comprendre les différences clés, les fonctionnalités non prises en charge et les considérations de migration lors du déplacement de charges de travail Spark vers Snowpark Connect for Spark.
Snowpark Connect for Spark vise à fournir une expérience d’API DataFrame Spark habituelle en plus du moteur d’exécution Snowflake. Toutefois, il existe des écarts de compatibilité décrits dans cette rubrique. Ce guide met en évidence ces différences pour vous aider à planifier et à adapter votre migration. Celles-ci pourraient être résolues dans une prochaine version.
DataTypes¶
Types de données non pris en charge¶
Conversion implicite de type de données¶
Lorsque vous utilisez Snowpark Connect for Spark, gardez à l’esprit la façon dont les types de données sont traités. Snowpark Connect for Spark représente implicitement ByteType
, ShortType
et IntegerType
comme LongType
. Cela signifie que si vous pouvez définir des colonnes ou des données avec ByteType
, ShortType
ou IntegerType
, les données seront représentées et renvoyées par Snowpark Connect for Spark comme LongType
. De même, une conversion implicite peut également se produire pour FloatType
et DoubleType
en fonction des opérations et du contexte spécifiques. Le moteur d’exécution Snowflake gérera en interne la compression des types de données et peut stocker les données sous la forme Byte
ou Short
, mais ceux-ci sont considérés comme des détails de mise en œuvre et ne sont pas exposés à l’utilisateur final.
Sémantiquement, cette représentation n’aura pas d’impact sur l’exactitude de vos requêtes Spark.
Type de données de PySpark natif |
Type de données de Snowpark Connect for Spark |
---|---|
|
|
|
|
|
|
|
|
L’exemple suivant montre une différence dans la manière dont Spark et Snowpark Connect for Spark gèrent les types de données dans les résultats des requêtes.
Requête¶
query = """
SELECT * FROM VALUES
(float(1.0), double(1.0), 1.0, "1", true, :code:`NULL`),
(float(2.0), double(2.0), 2.0, "2", false, :code:`NULL`),
(float(3.0), double(3.0), :code:`NULL`, "3", false, :code:`NULL`)
AS tab(a, b, c, d, e, f)
"""
Spark¶
spark.sql(query).printSchema()
root
|-- a: float (nullable = false)
|-- b: double (nullable = false)
|-- c: decimal(2,1) (nullable = true)
|-- d: string (nullable = false)
|-- e: boolean (nullable = false)
|-- f: void (nullable = true)
Snowpark Connect for Spark¶
snowpark_connect_spark.sql(query).printSchema()
root
|-- a: double (nullable = false)
|-- b: double (nullable = false)
|-- c: decimal (nullable = true)
|-- d: string (nullable = false)
|-- e: boolean (nullable = true)
|-- f: string (nullable = true)
Nuance NullType
¶
Snowpark Connect for Spark ne prend pas en charge le type de données NullType, qui est un type de données pris en charge dans Spark. Cela entraîne des changements de comportement lors de l’utilisation de Null
ou None
dans les dataframes.
Dans Spark, un NULL
littéral (par exemple, avec lit(None)
) est automatiquement déduit comme un NullType
. Dans Snowpark Connect for Spark, il est déduit comme un StringType
lors de l’inférence de schéma.
df = self.spark.range(1).select(lit(None).alias("null_col"))
field = df.schema["null_col"]
# Spark: StructField('null_col', :code:`NullType`(), True)
# |spconnect|: StructField('null_col', :code:`StringType`(), True)
Types de données structurés dans ArrayType
, MapType
et ObjectType
¶
Bien que la prise en charge des types structurés ne soit pas disponible par défaut dans Snowpark Connect for Spark, les types de données ARRAY
, MAP
et Object
sont traités comme des collections génériques et non typées. Cela signifie qu’il n’y a pas d’application des types d’éléments, des noms de champs, du schéma ou de la nullabilité, contrairement à ce qui serait fourni par la prise en charge des types structurés.
Si vous dépendez de cette prise en charge, veuillez contacter votre équipe de compte afin d’activer cette fonctionnalité pour votre compte.
APIs Spark non prises en charge¶
Les APIs suivantes sont prises en charge par Spark classique et Spark Connect mais non prises en charge par Snowpark Connect for Spark.
Dataframe.hint : Snowpark Connect for Spark ignore toute indication définie sur un dataframe. L’optimiseur de requêtes Snowflake détermine automatiquement la stratégie d’exécution la plus efficace.
DataFrame.repartition : Il s’agit d’une opération sans effet dans Snowpark Connect for Spark. Snowflake gère automatiquement la distribution et le partitionnement des données sur son infrastructure de calcul distribuée.
pyspark.RDD : les APIs RDD ne sont pas prises en charge dans Spark Connect (y compris Snowpark Connect for Spark).
Différences UDF¶
Différences StructType
¶
Lorsque Spark convertit un StructType
à utiliser dans une fonction définie par l’utilisateur (UDF), il le convertit en un type tuple
en Python. Snowpark Connect for Spark convertira un StructType
en un type dict
en Python. Cela présente des différences fondamentales en matière de sortie et d’accès aux éléments.
Spark accédera aux index avec 0, 1, 2, 3, et ainsi de suite.
Snowpark Connect for Spark accédera aux index en utilisant “_1”, “_2”, etc.
def f(e):
return e[0]
df = self.spark.createDataFrame([((1.0, 1.0), (1, 1))], ["c1", "c2"])
result = df.select("*", udf(f, DoubleType())("c1"))
# This results in an index access issue. Workaround is to use _1, _2 as indicies.
# Workaround:
def f(e):
return e['_1']
row = (
self.spark.range(1)
.selectExpr("struct(1, 2) as struct")
.select(
udf(lambda x: x, "struct<col1:int,col2:int>")("struct"),
)
.first()
)
self.assertEquals(row[0], Row(col1=1, col2=2))
# Spark: Row(col1=1, col2=2)
# |spconnect|: {'col1': 1, 'col2': 2}
Type d’itérateur dans les UDFs¶
L’itérateur n’est pas pris en charge en tant que type de retour ou en tant que type d’entrée.
# This will not work
def func(iterator):
for _ in iterator:
...
df = self.spark.range(10)
actual = df.repartition(1).mapInArrow(func, "a long").collect()
Limitations des fonctions lambda¶
Alors que Snowpark Connect for Spark prend en charge les expressions lambda et les fonctions d’ordre supérieur (telles que la fonction transform
), il ne prend pas en charge la référence à des colonnes ou expressions externes à partir du corps lambda.
Cette limitation est causée par des restrictions sur les expressions lambda dans Snowflake.
data = [
(1, [1, 2, 3]),
(2, [4, 5]),
(3, [])
]
df = spark.createDataFrame(data, ["id", "numbers"])
df.select(transform(df.numbers, lambda el: el + array_size(df.numbers))).show() # this fails in Spark Connect for Snowpark, because of a
reference to df.numbers which is outside of the lambda context
df.select(transform(df.numbers, lambda el: negative(el) + 1)).show() # this works
Une autre limitation réside dans le fait que les fonctions définies par l’utilisateur (UDFs) ne sont pas prises en charge dans les expressions lambda. Cela inclut à la fois les UDFs personnalisées et certaines fonctions intégrées dont l’implémentation sous-jacente s’appuie sur les UDFs Snowflake. Tenter d’utiliser une UDF dans une expression lambda entraînera une erreur.
spark.range(1).select(split(lit('a,b,c'), ',')).show() # works
spark.range(1).select(transform(array(lit('a,b,c')), lambda x: split(x, ','))).show() # does not work, since `split` is implemented with UDF
Sources de données¶
Source de données |
Problèmes de compatibilité par rapport à PySpark |
---|---|
Avro |
Le type de fichier n’est pas pris en charge. |
CSV |
Le mode de sauvegarde n’est pas pris en charge dans les cas suivants : Les options suivantes ne sont pas prises en charge : |
JSON |
Le mode de sauvegarde n’est pas pris en charge dans les cas suivants : Les options suivantes sont prises en charge : Différence dans |
Orc |
Le type de fichier n’est pas pris en charge. |
Parquet |
Le mode de sauvegarde n’est pas pris en charge dans les cas suivants : Les options suivantes sont prises en charge : Configuration non prise en charge : (ALL) |
Texte |
Le mode écriture n’est pas pris en charge dans les cas suivants : Les options suivantes ne sont pas prises en charge : Le paramètre |
XML |
Le type de fichier n’est pas pris en charge. |
Table Snowflake |
L’écriture dans la table ne nécessite pas de format fournisseur. La compartimentation et le partitionnement ne sont pas pris en charge. Le format de stockage et la gestion des versions ne sont pas pris en charge. |
Catalogue¶
Prise en charge du fournisseur de catalogue Snowflake Horizon¶
Seul Snowflake est pris en charge en tant que fournisseur de catalogue.
APIs de catalogue non prises en charge¶
registerFunction
listFunctions
getFunction
functionExists
createExternalTable
APIs de catalogue partiellement prises en charge¶
createTable
(pas de prise en charge des tables externes)
Iceberg¶
Table Iceberg gérée par Snowflake¶
Snowpark Connect pour Spark fonctionne avec les tables Apache Iceberg™, y compris les tables Iceberg gérées en externe et les bases de données liées à des catalogues.
Lire¶
Time Travel n’est pas pris en charge, y compris l’instantané historique, la branche et la lecture incrémentielle.
Écriture¶
L’utilisation du SQL Spark pour créer des tables n’est pas pris en charge.
La fusion de schémas n’est pas prise en charge.
Pour créer la table, vous devez :
Créer un volume externe.
Associez les besoins du volume externe à la création de la table de l’une des manières suivantes :
Définir le EXTERNAL_VOLUME dans la base de données.
Définir
snowpark.connect.iceberg.external_volume
dans la configuration Spark.
Table Iceberg gérée en externe¶
Lire¶
Vous devez créer une entité de table non gérée Snowflake.
Time Travel n’est pas pris en charge, y compris l’instantané historique, la branche et la lecture incrémentielle.
Écriture¶
La création de la table n’est pas prise en charge.
L’écriture dans la table Iceberg existante est prise en charge.
Noms de colonne de duplication¶
Snowflake ne prend pas en charge les noms de colonne dupliqués.
Le code suivant échoue à l’étape de création de la vue avec l’erreur de compilation SQL suivante : duplicate column name 'foo'
.
df = spark.createDataFrame([
(1, 1),
(2, 2)
], ["foo", "foo"])
df.show() # works
df.createTempView("df_view") # Fails with SQL compilation error: duplicate column name 'foo'
Pour contourner ce problème, définissez l’option de configuration snowpark.connect.views.duplicate_column_names_handling_mode
sur l’une des valeurs suivantes :
rename
: Un suffixe tel que_dedup_1
,_dedup_2
, et ainsi de suite, sera ajouté à tous les noms de colonnes dupliqués après la première.drop
: Toutes les colonnes en double, sauf une, seront supprimées. Cela peut entraîner des résultats incorrects si les colonnes ont des valeurs différentes.