Propriétés Snowpark Connect for Spark¶
Snowpark Connect for Spark prend en charge la configuration personnalisée de manière similaire à la norme Spark. Vous pouvez modifier les propriétés de configuration uniquement via la méthode set de la session en utilisant une paire clé-valeur. Notez que Snowpark Connect for Spark ne reconnaît qu’un ensemble limité de propriétés qui influencent l’exécution. Toutes les propriétés non prises en charge sont ignorées silencieusement sans générer d’exception.
Propriétés Spark prises en charge¶
Snowpark Connect for Spark prend en charge un sous-ensemble de propriétés Spark.
Nom de propriété |
Par défaut |
Signification |
Depuis |
|---|---|---|---|
|
(aucun) |
Nom de l’application défini sur Snowflake |
1.0.0 |
|
|
Lorsque |
1.0.0 |
|
(aucun) |
ID de clé d’accès AWS pour l’authentification S3 lors de la lecture ou de l’écriture dans des emplacements S3. |
1.0.0 |
|
(aucun) |
ARN de rôle IAM AWS avec accès S3 lors de l’utilisation de l’authentification basée sur les rôles. |
1.0.0 |
|
(aucun) |
Clé d’accès secrète AWS pour l’authentification S3 lors de la lecture ou de l’écriture dans des emplacements S3. |
1.0.0 |
|
(aucun) |
ID de clé KMS AWS pour le chiffrement côté serveur lors de l’utilisation du type de chiffrement |
1.0.0 |
|
(aucun) |
Jeton de session AWS pour les identifiants de connexion S3 temporaires lors de l’utilisation de STS. |
1.0.0 |
|
|
Active le mode ANSI SQL pour une vérification plus stricte des types et un traitement des erreurs. Lorsque |
1.0.0 |
|
|
Contrôle la sensibilité à la casse pour les identificateurs. Lorsque |
1.0.0 |
|
|
Active ou désactive les jointures croisées implicites. Un |
1.0.0 |
|
|
Lorsque |
1.0.0 |
|
|
Nom du schéma pour les vues temporaires globales ; créé automatiquement s’il n’existe pas. |
1.0.0 |
|
|
Lorsque |
1.0.0 |
|
|
Comportement hérité pour le nommage des clés de regroupement des ensembles de données. |
1.6.0 |
|
|
Contrôle le comportement lorsque des clés en double sont trouvées lors de la création d’une carte. Valeurs : |
1.0.0 |
|
|
Lorsque |
1.0.0 |
|
|
Contrôle le type d’horodatage de sortie de Parquet. Prend en charge |
1.7.0 |
|
|
Lorsque |
1.0.0 |
|
|
Lorsque |
1.0.0 |
|
|
Lorsque |
1.0.0 |
|
|
Nombre maximum de lignes à afficher dans le mode d’évaluation immédiate dans REPL. |
1.0.0 |
|
|
Largeur maximale des valeurs des colonnes dans l’affichage de l’évaluation immédiate dans REPL avant troncature. |
1.0.0 |
|
|
Seuil d’octets pour la mise en cache des relations locales. Les relations plus grandes que cela sont mises en cache pour améliorer les performances. |
1.0.0 |
|
|
Fuseau horaire de la session utilisé pour les opérations d’horodatage. Synchronisé avec la session Snowflake via |
1.0.0 |
|
|
Format de source de données par défaut pour les opérations de lecture/écriture lorsque le format n’est pas explicitement spécifié. |
1.0.0 |
|
|
Type d’horodatage par défaut pour les opérations d’horodatage. Valeurs : |
1.0.0 |
|
|
Lorsque |
1.0.0 |
Propriétés Snowpark Connect for Spark prises en charge¶
Propriétés de configuration personnalisées spécifiques à Snowpark Connect for Spark.
Nom de propriété |
Par défaut |
Signification |
Depuis |
|---|---|---|---|
|
(aucun) |
Jeton SAS Azure pour l’authentification au stockage Blob. Utilisé lors de la lecture ou de l’écriture sur des emplacements de stockage Azure Blob. |
1.0.0 |
|
(aucun) |
Jeton SAS Azure pour l’authentification ADLS Gen2 (stockage Data Lake). Utilisé lors de la lecture ou de l’écriture sur des emplacements de stockage Gen2 Azure Data Lake. |
1.0.0 |
|
|
Lorsque |
1.0.0 |
|
|
Configuration alternative pour générer des fichiers de métadonnées de résumé Parquet. La fonctionnalité est activée soit par cette dernière, soit par |
1.4.0 |
|
|
Lorsque |
1.0.0 |
|
|
Lorsque |
1.0.0 |
|
|
Durée de vie en secondes pour les entrées du cache des requêtes. Réduit les recherches répétées dans le schéma. |
1.0.0 |
|
|
Lorsque |
1.0.0 |
|
|
Lorsque |
1.7.0 |
|
(aucun) |
Nom du volume externe Snowflake pour les opérations de tables Iceberg. |
1.0.0 |
|
|
Contrôle la conversion des types décimaux en types intégraux. Valeurs : |
1.7.0 |
|
|
Contrôle la version de Scala utilisée (prend en charge |
1.7.0 |
|
(aucun) |
Chemin d’emplacement de la table externe pour les écritures partitionnées. |
1.4.0 |
|
|
Lorsque |
1.0.1 |
|
(aucun) |
Liste des fichiers ou modules, séparés par des virgules, à importer pour l’exécution des UDF. Déclenche la recréation des UDF en cas de modification. |
1.0.0 |
|
(aucun) |
Liste des fichiers/modules, séparés par des virgules, à importer pour l’exécution des UDF Python. Déclenche la recréation des UDF en cas de modification. |
1.7.0 |
|
(aucun) |
Liste des fichiers ou des modules, séparés par des virgules, à importer pour l’exécution des UDF Java. Déclenche la recréation des UDF en cas de modification. |
1.7.0 |
|
(aucun) |
Liste des paquets Python, séparés par des virgules, à inclure lors de l’enregistrement des UDFs. |
1.0.0 |
|
|
Lorsque |
1.0.0 |
|
|
Lecture seule. Renvoie la version de Snowpark Connect for Spark actuelle. |
1.0.0 |
|
|
Indique comment gérer les noms de colonnes dupliqués dans les vues. Valeurs : |
1.0.0 |
|
|
Lorsque |
1.4.0 |
|
|
Lorsque |
1.12.3 |
|
(aucun) |
Spécifie le nom d’un dépôt d’artefacts Snowflake pour la résolution du paquet UDF/UDTF. Lorsque cette option est activée, les paquets sont résolus à partir du dépôt spécifié au lieu d’Anaconda. |
1.14.0 |
|
(aucun) |
Lorsque défini sur |
1.13.0 |
fs.azure.sas.<container>.<account>.blob.core.windows.net¶
Spécifie le jeton SAS Azure pour l’authentification au stockage Blob. Utilisé lors de la lecture ou de l’écriture sur des emplacements de stockage Azure Blob.
Par défaut : (aucun)
Depuis : 1.0.0
fs.azure.sas.fixed.token.<account>.dfs.core.windows.net¶
Spécifie le jeton SAS Azure pour l’authentification ADLS Gen2 (stockage Data Lake). Utilisé lors de la lecture ou de l’écriture sur des emplacements de stockage Gen2 Azure Data Lake.
Par défaut : (aucun)
Depuis : 1.0.0
mapreduce.fileoutputcommitter.marksuccessfuljobs¶
Spécifiez true pour générer un fichier _SUCCESS après les opérations d’écriture réussies pour la compatibilité avec les flux de travail Hadoop ou Spark.
Par défaut : false
Depuis : 1.0.0
parquet.enable.summary-metadata¶
Spécifie la configuration alternative pour générer des fichiers de métadonnées de résumé Parquet. Active cette fonctionnalité avec cette propriété ou spark.sql.parquet.enable.summary-metadata.
Par défaut : false
Depuis : 1.4.0
snowflake.repartition.for.writes¶
Spécifiez true pour forcer DataFrame.repartition(n) à diviser la sortie en fichiers n lors des écritures. Correspond au comportement Spark, mais ajoute une surcharge.
Par défaut : false
Depuis : 1.0.0
snowpark.connect.cte.optimization_enabled ¶
Spécifiez true pour activer l’optimisation des expressions de table communes (CTE) dans la session Snowpark pour de meilleures performances des requêtes.
Par défaut : false
Depuis : 1.0.0
snowpark.connect.describe_cache_ttl_seconds ¶
Spécifie la durée de vie, en secondes, des entrées du cache des requêtes. Réduit les recherches répétées dans le schéma.
Par défaut : 300
Depuis : 1.0.0
snowpark.connect.enable_snowflake_extension_behavior ¶
Spécifiez true pour activer des extensions spécifiques à Snowflake qui peuvent différer du comportement de Spark (telles qu’un hachage sur les types MAP ou le type de retour MD5).
Par défaut : false
Depuis : 1.0.0
Commentaires¶
Lorsque défini sur true, modifie le comportement de certaines opérations :
bit_get/getbit: utilisation explicite de la fonction Getbit de Snowflake
hash: utilisation explicite de la fonction de hachage de Snowflake
md5: utilisation explicite de la fonction md5 de SnowflakeRenommer les colonnes de la table : permet de modifier les colonnes de la table
snowpark.connect.handleIntegralOverflow¶
Spécifiez true pour aligner le comportement de débordement intégral sur l’approche Spark.
Par défaut : false
Depuis : 1.7.0
snowpark.connect.iceberg.external_volume ¶
Spécifie le nom du volume externe Snowflake pour les opérations de tables Iceberg.
Par défaut : (aucun)
Depuis : 1.0.0
snowpark.connect.integralTypesEmulation¶
Spécifie comment convertir les types décimaux en types intégraux. Valeurs : client_default, enabled, disabled
Par défaut : client_default
Depuis : 1.7.0
Commentaires¶
Par défaut, Snowpark Connect for Spark traite tous les types intégraux comme des types Long. Cela est dû à la manière dont les nombres sont représentés dans Snowflake. L’émulation des types intégraux permet un mappage exact entre les types Snowpark et Spark lors de la lecture à partir de sources de données.
L’option client_default par défaut active l’émulation uniquement lorsque le script est exécuté à partir du client Scala. Les types intégraux sont mappés en fonction des précisions suivantes :
Précision |
Type Spark |
|---|---|
19 |
|
10 |
|
5 |
|
3 |
|
Autre |
|
Lorsque d’autres précisions sont trouvées, le type final est mappé vers le DecimalType.
snowpark.connect.scala.version¶
Spécifie la version de Scala à utiliser (prend en charge 2.12 ou 2.13).
Par défaut : 2.12
Depuis : 1.7.0
snowpark.connect.sql.partition.external_table_location ¶
Spécifie le chemin d’emplacement de la table externe pour les écritures partitionnées.
Par défaut : (aucun)
Depuis : 1.4.0
Commentaires¶
Pour lire uniquement un sous-ensemble exact de fichiers partitionnés à partir du répertoire fourni, une configuration supplémentaire est nécessaire. Cette fonctionnalité n’est disponible que pour les fichiers stockés sur des zones de préparation externes. Pour nettoyer les fichiers lus, Snowpark Connect for Spark utilise des tables externes.
Cette fonctionnalité est activée lorsque la configuration snowpark.connect.sql.partition.external_table_location est définie. Elle doit contenir des noms de bases de données et de schémas existants dans lesquels des tables externes seront créées.
La lecture de fichiers Parquet stockés dans des zones de préparation externes entraînera la création d’une table externe. Pour les fichiers dans des zones de préparation internes, elle ne sera pas créée. Fournir le schéma réduira le temps d’exécution, en éliminant le coût de l’inférence à partir des sources.
Pour des performances optimales, filtrez en fonction des limitations de filtrage des tables externes Snowflake.
Exemple¶
spark.conf.set("snowpark.connect.sql.partition.external_table_location", "<database-name>.<schema-name>")
spark.read.parquet("@external-stage/example").filter(col("x") > lit(1)).show()
schema = StructType([StructField("x",IntegerType()),StructField("y",DoubleType())])
spark.read.schema(schema).parquet("@external-stage/example").filter(col("x") > lit(1)).show()
snowpark.connect.temporary.views.create_in_snowflake ¶
Spécifiez true pour créer des vues temporaires directement dans Snowflake au lieu de les gérer localement.
Par défaut : false
Depuis : 1.0.1
snowpark.connect.udf.imports [DEPRECATED 1.7.0]¶
Spécifiez une liste des fichiers et modules, séparés par des virgules, à importer pour l’exécution des UDF. Lorsque cette valeur est modifiée, elle déclenche la recréation des UDF.
Par défaut : (aucun)
Depuis : 1.0.0
snowpark.connect.udf.python.imports¶
Spécifie une liste des fichiers et modules, séparés par des virgules, à importer pour l’exécution des UDF Python. Lorsque cette valeur est modifiée, elle déclenche la recréation des UDF.
Par défaut : (aucun)
Depuis : 1.7.0
snowpark.connect.udf.java.imports¶
Spécifie une liste des fichiers et modules, séparés par des virgules, à importer pour l’exécution des UDF Java. Déclenche la recréation des UDF en cas de modification.
Par défaut : (aucun)
Depuis : 1.7.0
Commentaires¶
Cette configuration fonctionne de manière très similaire à snowpark.connect.udf.python.imports. Avec elle, vous pouvez spécifier des bibliothèques externes et des fichiers pour les UDFs Java créées à l’aide de `registerJavaFunction<https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.UDFRegistration.registerJavaFunction.html>`_. Les configurations s’excluent mutuellement pour empêcher les mélanges inutiles de dépendances.
Pour inclure des bibliothèques et des fichiers externes, vous fournissez des chemins d’accès aux zones de préparation aux fichiers en tant que valeur du paramètre de configuration snowpark.connect.udf.java.imports. La valeur de la configuration doit être un tableau de chemins d’accès aux zones de préparation, où les chemins sont séparés par des virgules.
Exemple¶
Le code de l’exemple suivant inclut deux fichiers dans le contexte d’exécution de l’UDF. L’UDF importe des fonctions de ces fichiers et les utilise dans sa logique.
# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.java.imports", "[@stage/library.jar]")
spark.registerJavaFunction("javaFunction", "com.example.ExampleFunction")
spark.sql("SELECT javaFunction('arg')").show()
Vous pouvez utiliser le paramètre snowpark.connect.udf.java.imports pour inclure d’autres types de fichiers, tels que ceux contenant des données que votre code doit lire. Notez que lorsque vous faites cela, votre code ne doit lire qu’à partir des fichiers inclus ; toutes les écritures dans ces fichiers seront perdues après la fin de l’exécution de la fonction.
snowpark.connect.udf.packages¶
Spécifie une liste des paquets Python, séparés par des virgules, à inclure lors de l’enregistrement des UDFs.
Par défaut : (aucun)
Depuis : 1.0.0
Commentaires¶
Vous pouvez l’utiliser pour définir des paquets supplémentaires à utiliser dans les UDFs Python. La valeur est une liste de dépendances séparées par des virgules.
Vous pouvez découvrir la liste des paquets pris en charge en exécutant la commande SQL suivante dans Snowflake :
SELECT * FROM INFORMATION_SCHEMA.PACKAGES WHERE LANGUAGE = 'python';
Exemple¶
spark.conf.set("snowpark.connect.udf.packages", "[numpy]")
@udtf(returnType="val: int")
class Powers:
def eval(self, x: int):
import numpy as np
for v in np.power(np.array([x, x, x]), [0, 1, 2]):
yield (int(v),)
spark.udtf.register(name="powers", f=Powers)
spark.sql("SELECT * FROM powers(10)").show()
Pour plus d’informations, voir Python.
snowpark.connect.udtf.compatibility_mode ¶
Spécifiez true pour activer le comportement UDTF compatible avec Spark pour une meilleure compatibilité avec la sémantique UDTF de Spark.
Par défaut : false
Depuis : 1.0.0
Commentaires¶
Cette propriété détermine si les UDTFs doivent utiliser un comportement compatible avec Spark ou le comportement par défaut de Snowpark. Lorsqu’elle est définie sur true, elle applique un wrapper de compatibilité qui imite les modèles de coercition des types de sortie et de traitement des erreurs de Spark.
Lorsqu’elles sont activées, les UDTFs utilisent un wrapper de compatibilité qui applique une coercition des types automatique (par exemple, chaîne « true » en booléen, booléen en entier) et un traitement des erreurs à la manière de Spark. Le wrapper convertit également les arguments de table en objets de type ligne pour l’accès positionnel et nommé, et traite correctement les valeurs SQL « null » pour correspondre aux schémas de comportement de Spark.
snowpark.connect.version¶
Renvoie la version de Snowpark Connect for Spark actuelle. Lecture seule.
Par défaut : <current_version>
Depuis : 1.0.0
snowpark.connect.views.duplicate_column_names_handling_mode ¶
Spécifie comment gérer les noms de colonnes dupliqués dans les vues. Les valeurs autorisées incluent rename (ajouter un suffixe), fail (soumettre l’erreur) ou drop (supprimer les doublons).
Par défaut : rename
Depuis : 1.0.0
Commentaires¶
Snowflake ne prend pas en charge les noms de colonne dupliqués.
Exemple¶
Le code suivant échoue à l’étape de création de la vue avec l’erreur de compilation SQL suivante : « duplicate column name “foo” ».
df = spark.createDataFrame([
(1, 1),
(2, 2)
], ["foo", "foo"])
df.show() # works
df.createTempView("df_view") # Fails with SQL compilation error: duplicate column name 'foo'
Pour contourner ce problème, définissez l’option de configuration snowpark.connect.views.duplicate_column_names_handling_mode sur l’une des valeurs suivantes :
rename: Un suffixe tel que_dedup_1,_dedup_2, et ainsi de suite, sera ajouté à tous les noms de colonnes dupliqués après la première.drop: Toutes les colonnes en double, sauf une, seront supprimées. Si les colonnes ont des valeurs différentes, cela peut entraîner des résultats incorrects.
snowpark.connect.udf.java.imports¶
Spécifie une liste des fichiers et modules, séparés par des virgules, à importer pour l’exécution des UDF Java. Déclenche la recréation des UDF en cas de modification.
Par défaut : (aucun)
Depuis : 1.7.0
Commentaires¶
Cette configuration fonctionne de manière très similaire à snowpark.connect.udf.python.imports. Vous pouvez l’utiliser pour spécifier des bibliothèques externes et des fichiers pour les UDFs Java créées à l’aide de `registerJavaFunction<https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.UDFRegistration.registerJavaFunction.html>`_. Les configurations s’excluent mutuellement pour empêcher les mélanges inutiles de dépendances.
Pour inclure des bibliothèques et des fichiers externes, vous fournissez des chemins d’accès aux zones de préparation aux fichiers en tant que valeur du paramètre de configuration snowpark.connect.udf.java.imports. La valeur est un tableau de chemins d’accès aux fichiers, où les chemins sont séparés par des virgules.
Exemple¶
Le code de l’exemple suivant inclut deux fichiers dans le contexte d’exécution de l’UDF. L’UDF importe des fonctions de ces fichiers et les utilise dans sa logique.
# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.java.imports", "[@stage/library.jar]")
spark.registerJavaFunction("javaFunction", "com.example.ExampleFunction")
spark.sql("SELECT javaFunction('arg')").show()
Vous pouvez utiliser le paramètre snowpark.connect.udf.java.imports pour inclure d’autres types de fichiers, tels que ceux contenant des données que votre code doit lire. Lorsque vous faites cela, votre code ne doit lire qu’à partir des fichiers inclus ; toutes les écritures dans ces fichiers seront perdues après la fin de l’exécution de la fonction.
snowpark.connect.udf.packages¶
Spécifie une liste des paquets Python, séparés par des virgules, à inclure lors de l’enregistrement des UDFs.
Par défaut : (aucun)
Depuis : 1.0.0
Commentaires¶
La configuration permet de définir des paquets supplémentaires disponibles dans les UDFs Python. La valeur est une liste de dépendances séparées par des virgules.
Vous pouvez découvrir la liste des paquets pris en charge en exécutant la commande SQL suivante dans Snowflake :
SELECT * FROM INFORMATION_SCHEMA.PACKAGES WHERE LANGUAGE = 'python';
Exemple¶
spark.conf.set("snowpark.connect.udf.packages", "[numpy]")
@udtf(returnType="val: int")
class Powers:
def eval(self, x: int):
import numpy as np
for v in np.power(np.array([x, x, x]), [0, 1, 2]):
yield (int(v),)
spark.udtf.register(name="powers", f=Powers)
spark.sql("SELECT * FROM powers(10)").show()
Référence : Référence Paquets
snowpark.connect.udtf.compatibility_mode ¶
Spécifiez true pour activer le comportement UDTF compatible avec Spark pour une meilleure compatibilité avec la sémantique UDTF de Spark.
Par défaut : false
Depuis : 1.0.0
Commentaires¶
Cette configuration détermine si les UDTFs doivent utiliser un comportement compatible avec Spark ou le comportement par défaut de Snowpark. Lorsqu’elles sont activées (true), cela applique un wrapper de compatibilité qui imite la coercition des types de sortie de Spark (par exemple, la chaîne « true » en booléen, le booléen en entier) et les modèles de gestion des erreurs.
Le wrapper convertit également les arguments de table en objets de type ligne pour l’accès positionnel et nommé, et traite correctement les valeurs SQL « null » pour correspondre aux schémas de comportement de Spark.
snowpark.connect.sql.emulatePartitionOverwritesForSnowflakeTables¶
Lorsque true, permet l’écrasement des partitions sur les tables Snowflake dans Spark SQL (INSERT OVERWRITE <table> PARTITION(<partition spec>)).
Par défaut : false
Depuis : 1.12.3
Commentaires¶
Les tables Snowflake ne prennent pas en charge le partitionnement défini par l’utilisateur et, par défaut, les écrasements de partition entraîneront une erreur. L’activation de cette option permet d’utiliser la syntaxe INSERT OVERWRITE <table> PARTITION(<partition spec>) pour effectuer des écrasements.
Le <partition spec> acceptera toutes les colonnes qui existent dans la table cible.
Exemple¶
Le code de l’exemple suivant écrase toutes les lignes de la table des étudiants qui ont un student_id de 222222.
spark.conf.set("snowpark.connect.sql.emulatePartitionOverwritesForSnowflakeTables", True)
# create the students and persons tables as standard Snowflake tables
students_data = [
("Ashua Hill", "456 Erica Ct, Cupertino", 111111),
("Brian Reed", "723 Kern Ave, Palo Alto", 222222)
]
students_df = spark.createDataFrame(students_data, ["name", "address", "student_id"])
students_df.write.mode("overwrite").saveAsTable("students")
persons_data = [
("Dora Williams", "134 Forest Ave, Menlo Park", 123456789),
("Eddie Davis", "245 Market St, Milpitas", 345678901)
]
persons_df = spark.createDataFrame(persons_data, ["name", "address", "ssn"])
persons_df.write.mode("overwrite").saveAsTable("persons")
# overwrites all rows in the students table that have a student_id of 222222
spark.sql("""
INSERT OVERWRITE students PARTITION (student_id = 222222)
SELECT name, address FROM persons WHERE name = 'Dora Williams'
""").collect()
snowpark.connect.artifact_repository ¶
Spécifie le nom d’un dépôt d’artefacts Snowflake à utiliser pour la résolution des paquets lors de l’enregistrement des opérations UDFs, UDTFs, applyInPandas, mapInArrow et cogroup. Lorsque cette option est activée, les paquets spécifiés via snowpark.connect.udf.packages sont résolus à partir du dépôt d’artefacts spécifié au lieu d’Anaconda.
Par défaut : (aucun)
Depuis : 1.14.0
Commentaires¶
Par défaut, Snowpark Connect for Spark résout les paquets Python du canal Anaconda de Snowflake. La définition de cette configuration sur un nom de dépôt d’artefacts permet de résoudre les paquets de PyPI ou d’autres sources configurées, afin d’utiliser des paquets qui ne sont pas disponibles dans le canal Anaconda.
Pour plus d’informations sur la création et la configuration d’un dépôt d’artefacts dans Snowflake, consultez Utilisation de paquets tiers.
La modification de cette configuration invalide les UDFs et les UDTFs mises en cache, ce qui entraîne leur recréation avec le nouveau dépôt lors de l’appel suivant.
Cette configuration s’applique aux opérations suivantes :
UDFs enregistrée via le décorateur
@udfouspark.udf.register()UDTFs enregistrée via le décorateur
@udtfouspark.udtf.register()applyInPandasviagroupBy().applyInPandas()mapInArrowviaDataFrame.mapInArrow()cogroupviagroupBy().cogroup().applyInPandas()
Exemple¶
L’exemple suivant configure le dépôt d’artefacts, puis définit une UDF qui utilise pykalman, un paquet disponible dans le dépôt d’artefacts, pour appliquer le lissage du filtre de Kalman.
spark.conf.set("snowpark.connect.artifact_repository", "my_pypi_repo")
spark.conf.set("snowpark.connect.udf.packages", "[pykalman]")
@udf(returnType=DoubleType())
def kalman_smooth_value(value: float) -> float:
import numpy as np
from pykalman import KalmanFilter
kf = KalmanFilter(
transition_matrices=[1],
observation_matrices=[1],
initial_state_mean=0,
initial_state_covariance=1,
observation_covariance=1,
transition_covariance=0.1,
)
observations = np.array([value, value, value])
smoothed_state_means, _ = kf.smooth(observations)
return float(smoothed_state_means[-1][0])
df = spark.createDataFrame([(1, 10.0), (2, 20.0), (3, 30.0)], ["id", "value"])
df.select("id", kalman_smooth_value("value").alias("smoothed")).show()
Pour plus d’informations sur les dépôts d’artefacts et les paquets disponibles, consultez Utilisation de paquets tiers.
snowpark.connect.udf.resource_constraint.architecture ¶
Lorsque défini sur x86, les opérations UDFs, UDTFs et applyInPandas sont créées avec une contrainte d’architecture x86. Cela nécessite un entrepôt configuré avec une contrainte de ressources x86 pour l’exécution.
Par défaut : (aucun)
Depuis : 1.13.0
Commentaires¶
Certains paquets Python de tiers (comme TensorFlow,XGBoost et certaines bibliothèques scientifiques) sont conçus uniquement pour l’architecture CPU x86. Définir cette configuration sur x86 ajoute:code:RESOURCE_CONSTRAINT=(architecture='x86') à l’instruction CREATE FUNCTION générée par|spconnect|, garantissant ainsi que la UDF s’exécute sur une infrastructure x86 compatible.
Pour utiliser cette configuration, vous devez exécuter votre charge de travail sur un entrepôt qui a été créé avec une contrainte de ressources x86. Les valeurs de contrainte de ressources suivantes prennent en charge x86 :
MEMORY_1X_x86(taille minimale d’entrepôt : XSMALL)MEMORY_16X_x86(taille minimale d’entrepôt : MEDIUM)MEMORY_64X_x86(taille minimale d’entrepôt : LARGE)
Si l’entrepôt n’a pas de contrainte de ressources x86, l’exécution de la UDF échouera.
Cette configuration s’applique aux opérations suivantes :
UDFs enregistrée via le décorateur
@udfouspark.udf.register()UDTFs enregistrée via le décorateur
@udtfouspark.udtf.register()applyInPandasviagroupBy().applyInPandas()
Exemple¶
L’exemple suivant crée un entrepôt avec une contrainte de ressources x86, puis configure Snowpark Connect for Spark pour utiliser l’architecture x86 pour les UDFs.
CREATE WAREHOUSE my_x86_warehouse WITH
WAREHOUSE_SIZE = 'MEDIUM'
WAREHOUSE_TYPE = 'SNOWPARK-OPTIMIZED'
RESOURCE_CONSTRAINT = 'MEMORY_16X_x86';
USE WAREHOUSE my_x86_warehouse;
spark.conf.set("snowpark.connect.udf.resource_constraint.architecture", "x86")
@udf(returnType=IntegerType())
def add_one(x: int) -> int:
return x + 1
df = spark.createDataFrame([(1,), (2,), (3,)], ["value"])
df.select(add_one(df["value"]).alias("result")).show()
Pour plus d’informations sur les entrepôts et les contraintes de ressources, consultez Entrepôts optimisés par Snowpark.
Commentaires¶
Configuration activant les expressions de table communes (CTEs) Snowflake. Cette configuration optimise les requêtes Snowflake dans lesquelles il existe beaucoup de blocs de code répétitifs. Cette modification conduira à l’amélioration des performances de compilation et d’exécution des requêtes.