Propriétés Snowpark Connect for Spark

Snowpark Connect for Spark prend en charge la configuration personnalisée de manière similaire à la norme Spark. Vous pouvez modifier les propriétés de configuration uniquement via la méthode set de la session en utilisant une paire clé-valeur. Notez que Snowpark Connect for Spark ne reconnaît qu’un ensemble limité de propriétés qui influencent l’exécution. Toutes les propriétés non prises en charge sont ignorées silencieusement sans générer d’exception.

Propriétés Spark prises en charge

Snowpark Connect for Spark prend en charge un sous-ensemble de propriétés Spark.

Nom de propriété

Par défaut

Signification

Depuis

spark.app.name

(aucun)

Nom de l’application défini sur Snowflake query_tag (Spark-Connect-App-Name={name}) pour le suivi des requêtes.

1.0.0

spark.Catalog.databaseFilterInformationSchema

false

Lorsque true, filtre INFORMATION_SCHEMA des annonces de la base de données dans les opérations de catalogue.

1.0.0

spark.hadoop.fs.s3a.access.key

(aucun)

ID de clé d’accès AWS pour l’authentification S3 lors de la lecture ou de l’écriture dans des emplacements S3.

1.0.0

spark.hadoop.fs.s3a.assumed.role.arn

(aucun)

ARN de rôle IAM AWS avec accès S3 lors de l’utilisation de l’authentification basée sur les rôles.

1.0.0

spark.hadoop.fs.s3a.secret.key

(aucun)

Clé d’accès secrète AWS pour l’authentification S3 lors de la lecture ou de l’écriture dans des emplacements S3.

1.0.0

spark.hadoop.fs.s3a.server-side-encryption.key

(aucun)

ID de clé KMS AWS pour le chiffrement côté serveur lors de l’utilisation du type de chiffrement AWS_SSE_KMS.

1.0.0

spark.hadoop.fs.s3a.session.token

(aucun)

Jeton de session AWS pour les identifiants de connexion S3 temporaires lors de l’utilisation de STS.

1.0.0

spark.sql.ansi.enabled

false

Active le mode ANSI SQL pour une vérification plus stricte des types et un traitement des erreurs. Lorsque true, les débordements arithmétiques et les conversions non valides génèrent des erreurs au lieu de renvoyer NULL.

1.0.0

spark.sql.caseSensitive

false

Contrôle la sensibilité à la casse pour les identificateurs. Lorsque false, les noms de colonnes et de tables sont insensibles à la casse (automatiquement mis en majuscules dans Snowflake).

1.0.0

spark.sql.crossJoin.enabled

true

Active ou désactive les jointures croisées implicites. Un false et une condition de jointure manquante ou non pertinente entraîneront une erreur.

1.0.0

spark.sql.execution.pythonUDTF.arrow.enabled

false

Lorsque true, active l’optimisation Apache Arrow pour la sérialisation/désérialisation des UDTF Python.

1.0.0

spark.sql.globalTempDatabase

global_temp

Nom du schéma pour les vues temporaires globales ; créé automatiquement s’il n’existe pas.

1.0.0

spark.sql.legacy.allowHashOnMapType

false

Lorsque true, permet le hachage des colonnes de type MAP. Par défaut, les types MAP ne peuvent pas être hachés par souci de cohérence avec le comportement de Spark.

1.0.0

spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue

false

Comportement hérité pour le nommage des clés de regroupement des ensembles de données.

1.6.0

spark.sql.mapKeyDedupPolicy

EXCEPTION

Contrôle le comportement lorsque des clés en double sont trouvées lors de la création d’une carte. Valeurs : EXCEPTION (soumettre l’erreur) ou LAST_WIN (conserver la dernière valeur).

1.0.0

spark.sql.parser.quotedRegexColumnNames

false

Lorsque true, active la correspondance des modèles regex dans les noms de colonnes entre guillemets dans les requêtes SQL (par exemple, SELECT '(col1|col2)' FROM table).

1.0.0

spark.sql.parquet.outputTimestampType

TIMESTAMP_MILLIS

Contrôle le type d’horodatage de sortie de Parquet. Prend en charge TIMESTAMP_MILLIS ou TIMESTAMP_MICROS.

1.7.0

spark.sql.pyspark.inferNestedDictAsStruct.enabled

false

Lorsque true, déduit les dictionnaires Python imbriqués en tant que StructType au lieu de MapType lors de la création d’un DataFrame.

1.0.0

spark.sql.pyspark.legacy.inferArrayTypeFromFirstElement.enabled

false

Lorsque true, déduit le type des éléments du tableau à partir du premier élément uniquement au lieu d’échantillonner tous les éléments.

1.0.0

spark.sql.repl.eagerEval.enabled

false

Lorsque true, active une évaluation immédiate dans REPL, qui affiche automatiquement les résultats du DataFrame sans appeler show().

1.0.0

spark.sql.repl.eagerEval.maxNumRows

20

Nombre maximum de lignes à afficher dans le mode d’évaluation immédiate dans REPL.

1.0.0

spark.sql.repl.eagerEval.truncate

20

Largeur maximale des valeurs des colonnes dans l’affichage de l’évaluation immédiate dans REPL avant troncature.

1.0.0

spark.sql.session.localRelationCacheThreshold

2147483647

Seuil d’octets pour la mise en cache des relations locales. Les relations plus grandes que cela sont mises en cache pour améliorer les performances.

1.0.0

spark.sql.session.timeZone

<system_local_timezone>

Fuseau horaire de la session utilisé pour les opérations d’horodatage. Synchronisé avec la session Snowflake via ALTER SESSION SET TIMEZONE.

1.0.0

spark.sql.sources.default

parquet

Format de source de données par défaut pour les opérations de lecture/écriture lorsque le format n’est pas explicitement spécifié.

1.0.0

spark.sql.timestampType

TIMESTAMP_LTZ

Type d’horodatage par défaut pour les opérations d’horodatage. Valeurs : TIMESTAMP_LTZ (avec fuseau horaire local) ou TIMESTAMP_NTZ (pas de fuseau horaire).

1.0.0

spark.sql.tvf.allowMultipleTableArguments.enabled

true

Lorsque true, permet aux fonctions à valeur de table d’accepter plusieurs arguments de table.

1.0.0

Propriétés Snowpark Connect for Spark prises en charge

Propriétés de configuration personnalisées spécifiques à Snowpark Connect for Spark.

Nom de propriété

Par défaut

Signification

Depuis

fs.azure.sas.<container>.<account>.blob.core.windows.net

(aucun)

Jeton SAS Azure pour l’authentification au stockage Blob. Utilisé lors de la lecture ou de l’écriture sur des emplacements de stockage Azure Blob.

1.0.0

fs.azure.sas.fixed.token.<account>.dfs.core.windows.net

(aucun)

Jeton SAS Azure pour l’authentification ADLS Gen2 (stockage Data Lake). Utilisé lors de la lecture ou de l’écriture sur des emplacements de stockage Gen2 Azure Data Lake.

1.0.0

mapreduce.fileoutputcommitter.marksuccessfuljobs

false

Lorsque true, génère un fichier _SUCCESS après les opérations d’écriture réussies pour la compatibilité avec les flux de travail Hadoop/Spark.

1.0.0

parquet.enable.summary-metadata

false

Configuration alternative pour générer des fichiers de métadonnées de résumé Parquet. La fonctionnalité est activée soit par cette dernière, soit par spark.sql.parquet.enable.summary-metadata.

1.4.0

snowflake.repartition.for.writes

false

Lorsque true, force DataFrame.repartition(n) à diviser la sortie en fichiers n lors des écritures. Correspond au comportement Spark, mais ajoute une surcharge.

1.0.0

snowpark.connect.cte.optimization_enabled

false

Lorsque true, active l’optimisation des expressions de table communes (CTE) dans les sessions Snowpark afin d’améliorer les performances des requêtes.

1.0.0

snowpark.connect.describe_cache_ttl_seconds

300

Durée de vie en secondes pour les entrées du cache des requêtes. Réduit les recherches répétées dans le schéma.

1.0.0

snowpark.connect.enable_snowflake_extension_behavior

false

Lorsque true, active des extensions spécifiques à Snowflake qui peuvent différer du comportement de Spark (telles que le hachage sur les types MAP ou le type de retour MD5).

1.0.0

snowpark.connect.handleIntegralOverflow

false

Lorsque true, le comportement de débordement intégral est aligné sur l’approche Spark.

1.7.0

snowpark.connect.iceberg.external_volume

(aucun)

Nom du volume externe Snowflake pour les opérations de tables Iceberg.

1.0.0

snowpark.connect.integralTypesEmulation

client_default

Contrôle la conversion des types décimaux en types intégraux. Valeurs : client_default, enabled, disabled

1.7.0

snowpark.connect.scala.version

2.12

Contrôle la version de Scala utilisée (prend en charge 2.12 ou 2.13).

1.7.0

snowpark.connect.sql.partition.external_table_location

(aucun)

Chemin d’emplacement de la table externe pour les écritures partitionnées.

1.4.0

snowpark.connect.temporary.views.create_in_snowflake

false

Lorsque true, crée des vues temporaires directement dans Snowflake au lieu de les gérer localement.

1.0.1

snowpark.connect.udf.imports [DEPRECATED 1.7.0]

(aucun)

Liste des fichiers ou modules, séparés par des virgules, à importer pour l’exécution des UDF. Déclenche la recréation des UDF en cas de modification.

1.0.0

snowpark.connect.udf.python.imports

(aucun)

Liste des fichiers/modules, séparés par des virgules, à importer pour l’exécution des UDF Python. Déclenche la recréation des UDF en cas de modification.

1.7.0

snowpark.connect.udf.java.imports

(aucun)

Liste des fichiers ou des modules, séparés par des virgules, à importer pour l’exécution des UDF Java. Déclenche la recréation des UDF en cas de modification.

1.7.0

snowpark.connect.udf.packages

(aucun)

Liste des paquets Python, séparés par des virgules, à inclure lors de l’enregistrement des UDFs.

1.0.0

snowpark.connect.udtf.compatibility_mode

false

Lorsque true, active le comportement UDTF compatible avec Spark pour une meilleure compatibilité avec la sémantique UDTF de Spark.

1.0.0

snowpark.connect.version

<current_version>

Lecture seule. Renvoie la version de Snowpark Connect for Spark actuelle.

1.0.0

snowpark.connect.views.duplicate_column_names_handling_mode

rename

Indique comment gérer les noms de colonnes dupliqués dans les vues. Valeurs : rename (ajouter un suffixe), fail (soumettre l’erreur) ou drop (supprimer les doublons).

1.0.0

spark.sql.parquet.enable.summary-metadata

false

Lorsque true, génère des fichiers de métadonnées de résumé Parquet (_metadata _common_metadata) pendant les écritures Parquet.

1.4.0

snowpark.connect.sql.emulatePartitionOverwritesForSnowflakeTables

false

Lorsque true, permet l’écrasement des partitions sur les tables Snowflake dans Spark SQL (INSERT OVERWRITE <table> PARTITION(<partition spec>)).

1.12.3

snowpark.connect.artifact_repository

(aucun)

Spécifie le nom d’un dépôt d’artefacts Snowflake pour la résolution du paquet UDF/UDTF. Lorsque cette option est activée, les paquets sont résolus à partir du dépôt spécifié au lieu d’Anaconda.

1.14.0

snowpark.connect.udf.resource_constraint.architecture

(aucun)

Lorsque défini sur x86, les opérations UDFs, UDTFs et applyInPandas sont créées avec une contrainte d’architecture x86. Nécessite un entrepôt avec une contrainte de ressources x86.

1.13.0

fs.azure.sas.<container>.<account>.blob.core.windows.net

Spécifie le jeton SAS Azure pour l’authentification au stockage Blob. Utilisé lors de la lecture ou de l’écriture sur des emplacements de stockage Azure Blob.

Par défaut : (aucun)

Depuis : 1.0.0

fs.azure.sas.fixed.token.<account>.dfs.core.windows.net

Spécifie le jeton SAS Azure pour l’authentification ADLS Gen2 (stockage Data Lake). Utilisé lors de la lecture ou de l’écriture sur des emplacements de stockage Gen2 Azure Data Lake.

Par défaut : (aucun)

Depuis : 1.0.0

mapreduce.fileoutputcommitter.marksuccessfuljobs

Spécifiez true pour générer un fichier _SUCCESS après les opérations d’écriture réussies pour la compatibilité avec les flux de travail Hadoop ou Spark.

Par défaut : false

Depuis : 1.0.0

parquet.enable.summary-metadata

Spécifie la configuration alternative pour générer des fichiers de métadonnées de résumé Parquet. Active cette fonctionnalité avec cette propriété ou spark.sql.parquet.enable.summary-metadata.

Par défaut : false

Depuis : 1.4.0

snowflake.repartition.for.writes

Spécifiez true pour forcer DataFrame.repartition(n) à diviser la sortie en fichiers n lors des écritures. Correspond au comportement Spark, mais ajoute une surcharge.

Par défaut : false

Depuis : 1.0.0

snowpark.connect.cte.optimization_enabled

Spécifiez true pour activer l’optimisation des expressions de table communes (CTE) dans la session Snowpark pour de meilleures performances des requêtes.

Par défaut : false

Depuis : 1.0.0

Commentaires

Configuration activant les expressions de table communes (CTEs) Snowflake. Cette configuration optimise les requêtes Snowflake dans lesquelles il existe beaucoup de blocs de code répétitifs. Cette modification conduira à l’amélioration des performances de compilation et d’exécution des requêtes.

snowpark.connect.describe_cache_ttl_seconds

Spécifie la durée de vie, en secondes, des entrées du cache des requêtes. Réduit les recherches répétées dans le schéma.

Par défaut : 300

Depuis : 1.0.0

snowpark.connect.enable_snowflake_extension_behavior

Spécifiez true pour activer des extensions spécifiques à Snowflake qui peuvent différer du comportement de Spark (telles qu’un hachage sur les types MAP ou le type de retour MD5).

Par défaut : false

Depuis : 1.0.0

Commentaires

Lorsque défini sur true, modifie le comportement de certaines opérations :

snowpark.connect.handleIntegralOverflow

Spécifiez true pour aligner le comportement de débordement intégral sur l’approche Spark.

Par défaut : false

Depuis : 1.7.0

snowpark.connect.iceberg.external_volume

Spécifie le nom du volume externe Snowflake pour les opérations de tables Iceberg.

Par défaut : (aucun)

Depuis : 1.0.0

snowpark.connect.integralTypesEmulation

Spécifie comment convertir les types décimaux en types intégraux. Valeurs : client_default, enabled, disabled

Par défaut : client_default

Depuis : 1.7.0

Commentaires

Par défaut, Snowpark Connect for Spark traite tous les types intégraux comme des types Long. Cela est dû à la manière dont les nombres sont représentés dans Snowflake. L’émulation des types intégraux permet un mappage exact entre les types Snowpark et Spark lors de la lecture à partir de sources de données.

L’option client_default par défaut active l’émulation uniquement lorsque le script est exécuté à partir du client Scala. Les types intégraux sont mappés en fonction des précisions suivantes :

Précision

Type Spark

19

LongType

10

IntegerType

5

ShortType

3

ByteType

Autre

DecimalType(precision, 0)

Lorsque d’autres précisions sont trouvées, le type final est mappé vers le DecimalType.

snowpark.connect.scala.version

Spécifie la version de Scala à utiliser (prend en charge 2.12 ou 2.13).

Par défaut : 2.12

Depuis : 1.7.0

snowpark.connect.sql.partition.external_table_location

Spécifie le chemin d’emplacement de la table externe pour les écritures partitionnées.

Par défaut : (aucun)

Depuis : 1.4.0

Commentaires

Pour lire uniquement un sous-ensemble exact de fichiers partitionnés à partir du répertoire fourni, une configuration supplémentaire est nécessaire. Cette fonctionnalité n’est disponible que pour les fichiers stockés sur des zones de préparation externes. Pour nettoyer les fichiers lus, Snowpark Connect for Spark utilise des tables externes.

Cette fonctionnalité est activée lorsque la configuration snowpark.connect.sql.partition.external_table_location est définie. Elle doit contenir des noms de bases de données et de schémas existants dans lesquels des tables externes seront créées.

La lecture de fichiers Parquet stockés dans des zones de préparation externes entraînera la création d’une table externe. Pour les fichiers dans des zones de préparation internes, elle ne sera pas créée. Fournir le schéma réduira le temps d’exécution, en éliminant le coût de l’inférence à partir des sources.

Pour des performances optimales, filtrez en fonction des limitations de filtrage des tables externes Snowflake.

Exemple
spark.conf.set("snowpark.connect.sql.partition.external_table_location", "<database-name>.<schema-name>")

spark.read.parquet("@external-stage/example").filter(col("x") > lit(1)).show()

schema = StructType([StructField("x",IntegerType()),StructField("y",DoubleType())])

spark.read.schema(schema).parquet("@external-stage/example").filter(col("x") > lit(1)).show()
Copy

snowpark.connect.temporary.views.create_in_snowflake

Spécifiez true pour créer des vues temporaires directement dans Snowflake au lieu de les gérer localement.

Par défaut : false

Depuis : 1.0.1

snowpark.connect.udf.imports [DEPRECATED 1.7.0]

Spécifiez une liste des fichiers et modules, séparés par des virgules, à importer pour l’exécution des UDF. Lorsque cette valeur est modifiée, elle déclenche la recréation des UDF.

Par défaut : (aucun)

Depuis : 1.0.0

snowpark.connect.udf.python.imports

Spécifie une liste des fichiers et modules, séparés par des virgules, à importer pour l’exécution des UDF Python. Lorsque cette valeur est modifiée, elle déclenche la recréation des UDF.

Par défaut : (aucun)

Depuis : 1.7.0

snowpark.connect.udf.java.imports

Spécifie une liste des fichiers et modules, séparés par des virgules, à importer pour l’exécution des UDF Java. Déclenche la recréation des UDF en cas de modification.

Par défaut : (aucun)

Depuis : 1.7.0

Commentaires

Cette configuration fonctionne de manière très similaire à snowpark.connect.udf.python.imports. Avec elle, vous pouvez spécifier des bibliothèques externes et des fichiers pour les UDFs Java créées à l’aide de `registerJavaFunction<https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.UDFRegistration.registerJavaFunction.html>`_. Les configurations s’excluent mutuellement pour empêcher les mélanges inutiles de dépendances.

Pour inclure des bibliothèques et des fichiers externes, vous fournissez des chemins d’accès aux zones de préparation aux fichiers en tant que valeur du paramètre de configuration snowpark.connect.udf.java.imports. La valeur de la configuration doit être un tableau de chemins d’accès aux zones de préparation, où les chemins sont séparés par des virgules.

Exemple

Le code de l’exemple suivant inclut deux fichiers dans le contexte d’exécution de l’UDF. L’UDF importe des fonctions de ces fichiers et les utilise dans sa logique.

# Files need to be previously staged

spark.conf.set("snowpark.connect.udf.java.imports", "[@stage/library.jar]")

spark.registerJavaFunction("javaFunction", "com.example.ExampleFunction")

spark.sql("SELECT javaFunction('arg')").show()
Copy

Vous pouvez utiliser le paramètre snowpark.connect.udf.java.imports pour inclure d’autres types de fichiers, tels que ceux contenant des données que votre code doit lire. Notez que lorsque vous faites cela, votre code ne doit lire qu’à partir des fichiers inclus ; toutes les écritures dans ces fichiers seront perdues après la fin de l’exécution de la fonction.

snowpark.connect.udf.packages

Spécifie une liste des paquets Python, séparés par des virgules, à inclure lors de l’enregistrement des UDFs.

Par défaut : (aucun)

Depuis : 1.0.0

Commentaires

Vous pouvez l’utiliser pour définir des paquets supplémentaires à utiliser dans les UDFs Python. La valeur est une liste de dépendances séparées par des virgules.

Vous pouvez découvrir la liste des paquets pris en charge en exécutant la commande SQL suivante dans Snowflake :

SELECT * FROM INFORMATION_SCHEMA.PACKAGES WHERE LANGUAGE = 'python';
Copy
Exemple
spark.conf.set("snowpark.connect.udf.packages", "[numpy]")

@udtf(returnType="val: int")

class Powers:

  def eval(self, x: int):
      import numpy as np

      for v in np.power(np.array([x, x, x]), [0, 1, 2]):
          yield (int(v),)

spark.udtf.register(name="powers", f=Powers)

spark.sql("SELECT * FROM powers(10)").show()
Copy

Pour plus d’informations, voir Python.

snowpark.connect.udtf.compatibility_mode

Spécifiez true pour activer le comportement UDTF compatible avec Spark pour une meilleure compatibilité avec la sémantique UDTF de Spark.

Par défaut : false

Depuis : 1.0.0

Commentaires

Cette propriété détermine si les UDTFs doivent utiliser un comportement compatible avec Spark ou le comportement par défaut de Snowpark. Lorsqu’elle est définie sur true, elle applique un wrapper de compatibilité qui imite les modèles de coercition des types de sortie et de traitement des erreurs de Spark.

Lorsqu’elles sont activées, les UDTFs utilisent un wrapper de compatibilité qui applique une coercition des types automatique (par exemple, chaîne « true » en booléen, booléen en entier) et un traitement des erreurs à la manière de Spark. Le wrapper convertit également les arguments de table en objets de type ligne pour l’accès positionnel et nommé, et traite correctement les valeurs SQL « null » pour correspondre aux schémas de comportement de Spark.

snowpark.connect.version

Renvoie la version de Snowpark Connect for Spark actuelle. Lecture seule.

Par défaut : <current_version>

Depuis : 1.0.0

snowpark.connect.views.duplicate_column_names_handling_mode

Spécifie comment gérer les noms de colonnes dupliqués dans les vues. Les valeurs autorisées incluent rename (ajouter un suffixe), fail (soumettre l’erreur) ou drop (supprimer les doublons).

Par défaut : rename

Depuis : 1.0.0

Commentaires

Snowflake ne prend pas en charge les noms de colonne dupliqués.

Exemple

Le code suivant échoue à l’étape de création de la vue avec l’erreur de compilation SQL suivante : « duplicate column name “foo” ».

df = spark.createDataFrame([
(1, 1),
(2, 2)
], ["foo", "foo"])

df.show() # works

df.createTempView("df_view") # Fails with SQL compilation error: duplicate column name 'foo'
Copy

Pour contourner ce problème, définissez l’option de configuration snowpark.connect.views.duplicate_column_names_handling_mode sur l’une des valeurs suivantes :

  • rename : Un suffixe tel que _dedup_1, _dedup_2, et ainsi de suite, sera ajouté à tous les noms de colonnes dupliqués après la première.

  • drop : Toutes les colonnes en double, sauf une, seront supprimées. Si les colonnes ont des valeurs différentes, cela peut entraîner des résultats incorrects.

snowpark.connect.udf.java.imports

Spécifie une liste des fichiers et modules, séparés par des virgules, à importer pour l’exécution des UDF Java. Déclenche la recréation des UDF en cas de modification.

Par défaut : (aucun)

Depuis : 1.7.0

Commentaires

Cette configuration fonctionne de manière très similaire à snowpark.connect.udf.python.imports. Vous pouvez l’utiliser pour spécifier des bibliothèques externes et des fichiers pour les UDFs Java créées à l’aide de `registerJavaFunction<https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.UDFRegistration.registerJavaFunction.html>`_. Les configurations s’excluent mutuellement pour empêcher les mélanges inutiles de dépendances.

Pour inclure des bibliothèques et des fichiers externes, vous fournissez des chemins d’accès aux zones de préparation aux fichiers en tant que valeur du paramètre de configuration snowpark.connect.udf.java.imports. La valeur est un tableau de chemins d’accès aux fichiers, où les chemins sont séparés par des virgules.

Exemple

Le code de l’exemple suivant inclut deux fichiers dans le contexte d’exécution de l’UDF. L’UDF importe des fonctions de ces fichiers et les utilise dans sa logique.

# Files need to be previously staged

spark.conf.set("snowpark.connect.udf.java.imports", "[@stage/library.jar]")

spark.registerJavaFunction("javaFunction", "com.example.ExampleFunction")

spark.sql("SELECT javaFunction('arg')").show()
Copy

Vous pouvez utiliser le paramètre snowpark.connect.udf.java.imports pour inclure d’autres types de fichiers, tels que ceux contenant des données que votre code doit lire. Lorsque vous faites cela, votre code ne doit lire qu’à partir des fichiers inclus ; toutes les écritures dans ces fichiers seront perdues après la fin de l’exécution de la fonction.

snowpark.connect.udf.packages

Spécifie une liste des paquets Python, séparés par des virgules, à inclure lors de l’enregistrement des UDFs.

Par défaut : (aucun)

Depuis : 1.0.0

Commentaires

La configuration permet de définir des paquets supplémentaires disponibles dans les UDFs Python. La valeur est une liste de dépendances séparées par des virgules.

Vous pouvez découvrir la liste des paquets pris en charge en exécutant la commande SQL suivante dans Snowflake :

SELECT * FROM INFORMATION_SCHEMA.PACKAGES WHERE LANGUAGE = 'python';
Copy
Exemple
spark.conf.set("snowpark.connect.udf.packages", "[numpy]")

@udtf(returnType="val: int")

class Powers:

  def eval(self, x: int):
      import numpy as np

      for v in np.power(np.array([x, x, x]), [0, 1, 2]):
          yield (int(v),)

spark.udtf.register(name="powers", f=Powers)

spark.sql("SELECT * FROM powers(10)").show()
Copy

Référence : Référence Paquets

snowpark.connect.udtf.compatibility_mode

Spécifiez true pour activer le comportement UDTF compatible avec Spark pour une meilleure compatibilité avec la sémantique UDTF de Spark.

Par défaut : false

Depuis : 1.0.0

Commentaires

Cette configuration détermine si les UDTFs doivent utiliser un comportement compatible avec Spark ou le comportement par défaut de Snowpark. Lorsqu’elles sont activées (true), cela applique un wrapper de compatibilité qui imite la coercition des types de sortie de Spark (par exemple, la chaîne « true » en booléen, le booléen en entier) et les modèles de gestion des erreurs.

Le wrapper convertit également les arguments de table en objets de type ligne pour l’accès positionnel et nommé, et traite correctement les valeurs SQL « null » pour correspondre aux schémas de comportement de Spark.

snowpark.connect.sql.emulatePartitionOverwritesForSnowflakeTables

Lorsque true, permet l’écrasement des partitions sur les tables Snowflake dans Spark SQL (INSERT OVERWRITE <table> PARTITION(<partition spec>)).

Par défaut : false

Depuis : 1.12.3

Commentaires

Les tables Snowflake ne prennent pas en charge le partitionnement défini par l’utilisateur et, par défaut, les écrasements de partition entraîneront une erreur. L’activation de cette option permet d’utiliser la syntaxe INSERT OVERWRITE <table> PARTITION(<partition spec>) pour effectuer des écrasements.

Le <partition spec> acceptera toutes les colonnes qui existent dans la table cible.

Exemple

Le code de l’exemple suivant écrase toutes les lignes de la table des étudiants qui ont un student_id de 222222.

spark.conf.set("snowpark.connect.sql.emulatePartitionOverwritesForSnowflakeTables", True)

# create the students and persons tables as standard Snowflake tables
students_data = [
  ("Ashua Hill", "456 Erica Ct, Cupertino", 111111),
  ("Brian Reed", "723 Kern Ave, Palo Alto", 222222)
]

students_df = spark.createDataFrame(students_data, ["name", "address", "student_id"])
students_df.write.mode("overwrite").saveAsTable("students")

persons_data = [
    ("Dora Williams", "134 Forest Ave, Menlo Park", 123456789),
    ("Eddie Davis", "245 Market St, Milpitas", 345678901)
]

persons_df = spark.createDataFrame(persons_data, ["name", "address", "ssn"])
persons_df.write.mode("overwrite").saveAsTable("persons")

# overwrites all rows in the students table that have a student_id of 222222
spark.sql("""
    INSERT OVERWRITE students PARTITION (student_id = 222222)
    SELECT name, address FROM persons WHERE name = 'Dora Williams'
""").collect()
Copy

snowpark.connect.artifact_repository

Spécifie le nom d’un dépôt d’artefacts Snowflake à utiliser pour la résolution des paquets lors de l’enregistrement des opérations UDFs, UDTFs, applyInPandas, mapInArrow et cogroup. Lorsque cette option est activée, les paquets spécifiés via snowpark.connect.udf.packages sont résolus à partir du dépôt d’artefacts spécifié au lieu d’Anaconda.

Par défaut : (aucun)

Depuis : 1.14.0

Commentaires

Par défaut, Snowpark Connect for Spark résout les paquets Python du canal Anaconda de Snowflake. La définition de cette configuration sur un nom de dépôt d’artefacts permet de résoudre les paquets de PyPI ou d’autres sources configurées, afin d’utiliser des paquets qui ne sont pas disponibles dans le canal Anaconda.

Pour plus d’informations sur la création et la configuration d’un dépôt d’artefacts dans Snowflake, consultez Utilisation de paquets tiers.

La modification de cette configuration invalide les UDFs et les UDTFs mises en cache, ce qui entraîne leur recréation avec le nouveau dépôt lors de l’appel suivant.

Cette configuration s’applique aux opérations suivantes :

  • UDFs enregistrée via le décorateur @udf ou spark.udf.register()

  • UDTFs enregistrée via le décorateur @udtf ou spark.udtf.register()

  • applyInPandas via groupBy().applyInPandas()

  • mapInArrow via DataFrame.mapInArrow()

  • cogroup via groupBy().cogroup().applyInPandas()

Exemple

L’exemple suivant configure le dépôt d’artefacts, puis définit une UDF qui utilise pykalman, un paquet disponible dans le dépôt d’artefacts, pour appliquer le lissage du filtre de Kalman.

spark.conf.set("snowpark.connect.artifact_repository", "my_pypi_repo")
spark.conf.set("snowpark.connect.udf.packages", "[pykalman]")

@udf(returnType=DoubleType())
def kalman_smooth_value(value: float) -> float:
    import numpy as np
    from pykalman import KalmanFilter

    kf = KalmanFilter(
        transition_matrices=[1],
        observation_matrices=[1],
        initial_state_mean=0,
        initial_state_covariance=1,
        observation_covariance=1,
        transition_covariance=0.1,
    )
    observations = np.array([value, value, value])
    smoothed_state_means, _ = kf.smooth(observations)
    return float(smoothed_state_means[-1][0])

df = spark.createDataFrame([(1, 10.0), (2, 20.0), (3, 30.0)], ["id", "value"])
df.select("id", kalman_smooth_value("value").alias("smoothed")).show()
Copy

Pour plus d’informations sur les dépôts d’artefacts et les paquets disponibles, consultez Utilisation de paquets tiers.

snowpark.connect.udf.resource_constraint.architecture

Lorsque défini sur x86, les opérations UDFs, UDTFs et applyInPandas sont créées avec une contrainte d’architecture x86. Cela nécessite un entrepôt configuré avec une contrainte de ressources x86 pour l’exécution.

Par défaut : (aucun)

Depuis : 1.13.0

Commentaires

Certains paquets Python de tiers (comme TensorFlow,XGBoost et certaines bibliothèques scientifiques) sont conçus uniquement pour l’architecture CPU x86. Définir cette configuration sur x86 ajoute:code:RESOURCE_CONSTRAINT=(architecture='x86') à l’instruction CREATE FUNCTION générée par|spconnect|, garantissant ainsi que la UDF s’exécute sur une infrastructure x86 compatible.

Pour utiliser cette configuration, vous devez exécuter votre charge de travail sur un entrepôt qui a été créé avec une contrainte de ressources x86. Les valeurs de contrainte de ressources suivantes prennent en charge x86 :

  • MEMORY_1X_x86 (taille minimale d’entrepôt : XSMALL)

  • MEMORY_16X_x86 (taille minimale d’entrepôt : MEDIUM)

  • MEMORY_64X_x86 (taille minimale d’entrepôt : LARGE)

Si l’entrepôt n’a pas de contrainte de ressources x86, l’exécution de la UDF échouera.

Cette configuration s’applique aux opérations suivantes :

  • UDFs enregistrée via le décorateur @udf ou spark.udf.register()

  • UDTFs enregistrée via le décorateur @udtf ou spark.udtf.register()

  • applyInPandas via groupBy().applyInPandas()

Exemple

L’exemple suivant crée un entrepôt avec une contrainte de ressources x86, puis configure Snowpark Connect for Spark pour utiliser l’architecture x86 pour les UDFs.

CREATE WAREHOUSE my_x86_warehouse WITH
  WAREHOUSE_SIZE = 'MEDIUM'
  WAREHOUSE_TYPE = 'SNOWPARK-OPTIMIZED'
  RESOURCE_CONSTRAINT = 'MEMORY_16X_x86';

USE WAREHOUSE my_x86_warehouse;
Copy
spark.conf.set("snowpark.connect.udf.resource_constraint.architecture", "x86")

@udf(returnType=IntegerType())
def add_one(x: int) -> int:
    return x + 1

df = spark.createDataFrame([(1,), (2,), (3,)], ["value"])
df.select(add_one(df["value"]).alias("result")).show()
Copy

Pour plus d’informations sur les entrepôts et les contraintes de ressources, consultez Entrepôts optimisés par Snowpark.