Framework de test local¶
Cette rubrique explique comment tester votre code localement lorsque vous utilisez la bibliothèque Snowpark Python.
Dans ce chapitre :
Le cadre de test local Python Snowpark est un émulateur qui vous permet de créer et d’agir sur des DataFrames Python Snowpark localement sans vous connecter à un compte Snowflake. Vous pouvez utiliser le cadre de test local pour tester vos opérations DataFrame sur votre poste de développement ou dans un pipeline CI (intégration continue), avant de déployer les changements de code sur votre compte. L’API est la même, de sorte que vous pouvez exécuter vos tests localement ou sur un compte Snowflake, sans modifier le code.
Conditions préalables¶
Pour utiliser le cadre de test local :
Vous devez utiliser la version 1.18.0 ou une version supérieure de la bibliothèque Python Snowpark avec la dépendance
localtest
facultative.Les versions de Python prises en charge sont les suivantes :
3,9
3,10
3,11
3,12
Installer la bibliothèque Snowpark Python¶
Pour installer la bibliothèque avec la dépendance facultative, exécutez la commande suivante :
pip install "snowflake-snowpark-python[localtest]"
Créer une session et activer les tests locaux¶
Créez une
Session
Snowpark et définissez la configuration de test local surTrue
:from snowflake.snowpark import Session session = Session.builder.config('local_testing', True).create()
Utilisez la session pour créer et opérer sur des DataFrames :
df = session.create_dataframe([[1,2],[3,4]],['a','b']) df.with_column('c', df['a']+df['b']).show()
Chargement des données¶
Vous pouvez créer des DataFrames Snowpark à partir de primitives Python, de fichiers et de DataFrames pandas. Cela s’avère utile pour spécifier l’entrée et la sortie prévues des cas de test. En procédant de cette manière, les données se trouvent dans le contrôle source, ce qui facilite la synchronisation des données de test sur les cas de test.
Charger les données CSV¶
Pour charger des fichiers CSV dans un DataFrame Snowpark, commencez par appeler
Session.file.put()
pour charger le fichier dans la zone de préparation en mémoire, puis utilisezSession.read()
pour lire le contenu.
Exemple
Supposons qu’il existe un fichier, data.csv
, avec le contenu suivant :
col1,col2,col3,col4
1,a,true,1.23
2,b,false,4.56
Vous pouvez utiliser le code suivant pour charger data.csv
dans un DataFrame Snowpark. Vous devez d’abord placer le fichier sur une zone de préparation ; si vous ne le faites pas, vous recevrez une erreur « fichier introuvable ».
from snowflake.snowpark.types import StructType, StructField, IntegerType, BooleanType, StringType, DoubleType
# Put file onto stage
session.file.put("data.csv", "@mystage", auto_compress=False)
schema = StructType(
[
StructField("col1", IntegerType()),
StructField("col2", StringType()),
StructField("col3", BooleanType()),
StructField("col4", DoubleType()),
]
)
# with option SKIP_HEADER set to 1, the header will be skipped when the csv file is loaded
dataframe = session.read.schema(schema).option("SKIP_HEADER", 1).csv("@mystage/data.csv")
dataframe.show()
Résultat attendu :
-------------------------------------
|"COL1" |"COL2" |"COL3" |"COL4" |
-------------------------------------
|1 |a |True |1.23 |
|2 |b |False |4.56 |
-------------------------------------
Charger les données pandas¶
Pour créer un DataFrame Snowpark Python à partir d’un DataFrame pandas, appelez la méthode
create_dataframe
et transmettez les données en tant que DataFrame pandas.
Exemple
import pandas as pd
pandas_df = pd.DataFrame(
data={
"col1": pd.Series(["value1", "value2"]),
"col2": pd.Series([1.23, 4.56]),
"col3": pd.Series([123, 456]),
"col4": pd.Series([True, False]),
}
)
dataframe = session.create_dataframe(data=pandas_df)
dataframe.show()
Résultat attendu :
-------------------------------------
|"col1" |"col2" |"col3" |"col4" |
-------------------------------------
|value1 |1.23 |123 |True |
|value2 |4.56 |456 |False |
-------------------------------------
Pour convertir un DataFrame Snowpark Python à un DataFrame pandas, appelez la méthode
to_pandas
sur le DataFrame.
Exemple
from snowflake.snowpark.types import StructType, StructField, StringType, DoubleType, LongType, BooleanType
dataframe = session.create_dataframe(
data=[
["value1", 1.23, 123, True],
["value2", 4.56, 456, False],
],
schema=StructType([
StructField("col1", StringType()),
StructField("col2", DoubleType()),
StructField("col3", LongType()),
StructField("col4", BooleanType()),
])
)
pandas_dataframe = dataframe.to_pandas()
print(pandas_dataframe.to_string())
Résultat attendu :
COL1 COL2 COL3 COL4
0 value1 1.23 123 True
1 value2 4.56 456 False
Créer une fixture PyTest pour une session¶
Les fixtures PyTest sont des fonctions qui sont exécutées avant un test (ou un module de tests), généralement pour fournir des données ou des connexions aux tests. Dans cette procédure, créez une fixture qui renvoie un objet Session
Snowpark.
Si vous n’avez pas déjà un répertoire
test
, créez-en un.Dans le répertoire
test
, créez un fichier nomméconftest.py
avec le contenu suivant, oùconnection_parameters
est un dictionnaire, avec les identifiants de connexion de votre compte Snowflake :# test/conftest.py import pytest from snowflake.snowpark.session import Session def pytest_addoption(parser): parser.addoption("--snowflake-session", action="store", default="live") @pytest.fixture(scope='module') def session(request) -> Session: if request.config.getoption('--snowflake-session') == 'local': return Session.builder.config('local_testing', True).create() else: return Session.builder.configs(CONNECTION_PARAMETERS).create()
Pour plus d’informations sur le format de dictionnaire, voir Création d’une session.
L’appel de pytest_addoption
ajoute une option de ligne de commande nommée snowflake-session
à la commande pytest
. La fixture Session
vérifie cette option de ligne de commande et crée une Session
locale ou en direct, suivant sa valeur. Cela vous permet de basculer facilement entre les modes local et en direct pour les tests, comme indiqué dans les exemples de ligne de commande suivants :
# Using local mode:
pytest --snowflake-session local
# Using live mode
pytest
Opérations SQL¶
Session.sql(...)
n’est pas pris en charge par le cadre de test local. Utilisez les APIs DataFrame de Snowpark, autant que possible, et, dans les cas où vous devez utiliser Session.sql(...)
, vous pouvez simuler la valeur de renvoi tabulaire via le unittest.mock.patch
de Python pour corriger la réponse attendue d’un appel Session.sql()
donné.
Dans l’exemple ci-dessous, mock_sql()
mappe le texte de requête SQL vers la réponse du DataFrame souhaitée. L’instruction conditionnelle vérifie si la session en cours utilise un test local et, si c’est le cas, applique le correctif à la méthode Session.sql()
.
from unittest import mock
from functools import partial
def test_something(pytestconfig, session):
def mock_sql(session, sql_string): # patch for SQL operations
if sql_string == "select 1,2,3":
return session.create_dataframe([[1,2,3]])
else:
raise RuntimeError(f"Unexpected query execution: {sql_string}")
if pytestconfig.getoption('--snowflake-session') == 'local':
with mock.patch.object(session, 'sql', wraps=partial(mock_sql, session)): # apply patch for SQL operations
assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
else:
assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
Lorsque le test local est activé, toutes les tables créées par DataFrame.save_as_table()
sont enregistrées sous forme de tables temporaires dans la mémoire et peuvent être récupérées via Session.table()
. Vous pouvez utiliser les opérations DataFrame prises en charge sur la table, comme d’habitude.
Correction des fonctions intégrées¶
Toutes les fonctions intégrées sous snowflake.snowpark.functions
ne sont pas prises en charge par le cadre de test local. Si vous utilisez une fonction non prise en charge, vous devez utiliser le décorateur @patch
de snowflake.snowpark.mock
pour créer un correctif.
Pour définir et mettre en œuvre la fonction corrigée, il faut que la signature (liste de paramètres) soit alignée sur les paramètres de la fonction intégrée. Le cadre de test local transmet les paramètres à la fonction corrigée en appliquant les règles suivantes :
Pour les paramètres de type
ColumnOrName
dans la signature des fonctions intégrées,ColumnEmulator
est transmis comme paramètre des fonctions corrigées.ColumnEmulator
est similaire à un objetpandas.Series
contenant les données de colonne.Pour les paramètres de type
LiteralType
dans la signature des fonctions intégrées, la valeur littérale est transmise comme paramètre des fonctions corrigées.Sinon, la valeur brute est transmise comme paramètre des fonctions corrigées.
En ce qui concerne le type de renvoi des fonctions corrigées, le renvoi d’une instance de ColumnEmulator
est attendu en correspondance avec le type de renvoi de Column
des fonctions intégrées.
Par exemple, la fonction intégrée to_timestamp()
pourrait être corrigée comme suit :
import datetime
from snowflake.snowpark.mock import patch, ColumnEmulator, ColumnType
from snowflake.snowpark.functions import to_timestamp
from snowflake.snowpark.types import TimestampType
@patch(to_timestamp)
def mock_to_timestamp(column: ColumnEmulator, format = None) -> ColumnEmulator:
ret_column = ColumnEmulator(data=[datetime.datetime.strptime(row, '%Y-%m-%dT%H:%M:%S%z') for row in column])
ret_column.sf_type = ColumnType(TimestampType(), True)
return ret_column
Omission des cas de test¶
Si votre suite de tests PyTest contient un cas de test qui n’est pas bien pris en charge par les tests locaux, vous pouvez ignorer ces cas en utilisant le décorateur mark.skipif
de PyTest. L’exemple ci-dessous suppose que vous ayez configuré votre session et vos paramètres comme décrit précédemment. La condition vérifie si le local_testing_mode
est réglé sur local
; si c’est le cas, le cas de test est ignoré avec un message explicatif.
import pytest
@pytest.mark.skipif(
condition="config.getvalue('local_testing_mode') == 'local'",
reason="Test case disabled for local testing"
)
def test_case(session):
...
Enregistrement d’UDFs et de procédures stockées¶
Vous pouvez créer et appeler des fonctions définies par l’utilisateur (UDFs) et des procédures stockées dans le cadre de tests locaux. Pour créer les objets, vous pouvez utiliser les options de syntaxe suivantes :
Syntaxe |
UDF |
Procédure stockée |
---|---|---|
Décorateurs |
|
|
Méthodes d’enregistrement |
|
|
Méthodes Register-from-file |
|
|
Exemple
L’exemple de code suivant crée une UDF et une procédure stockée à l’aide des décorateurs, puis les appelle par leur nom.
from snowflake.snowpark.session import Session
from snowflake.snowpark.dataframe import col, DataFrame
from snowflake.snowpark.functions import udf, sproc, call_udf
from snowflake.snowpark.types import IntegerType, StringType
# Create local session
session = Session.builder.config('local_testing', True).create()
# Create local table
table = 'example'
session.create_dataframe([[1,2],[3,4]],['a','b']).write.save_as_table(table)
# Register a UDF, which is called from the stored procedure
@udf(name='example_udf', return_type=IntegerType(), input_types=[IntegerType(), IntegerType()])
def example_udf(a, b):
return a + b
# Register stored procedure
@sproc(name='example_proc', return_type=IntegerType(), input_types=[StringType()])
def example_proc(session, table_name):
return session.table(table_name)\
.with_column('c', call_udf('example_udf', col('a'), col('b')))\
.count()
# Call the stored procedure by name
output = session.call('example_proc', table)
print(output)
Limitations¶
La liste suivante contient les limitations et les écarts de comportement connus dans le cadre de tests locaux. Snowflake n’a actuellement aucun projet de résolution de ces problèmes.
Les chaînes et opérations SQL brutes nécessitant l’analyse des chaînes SQL, comme
session.sql
etDataFrame.filter("col1 > 12")
, ne sont pas prises en charge.Les opérations asynchrones ne sont pas prises en charge.
Les objets de la base de données tels que les tables, les procédures stockées et les UDFs ne sont pas conservés au-delà du niveau de la session et toutes les opérations sont effectuées en mémoire. Par exemple, les procédures stockées permanentes enregistrées dans une session fictive ne sont pas visibles par les autres sessions fictives.
Les fonctions liées au classement des chaînes, telles que
Column.collate
, ne sont pas prises en charge.Les types de données
Variant
,Array
etObject
sont pris en charge uniquement avec le codage et le décodage JSON standards. Des expressions telles que [1,2,,3,] sont considérées comme des valeurs JSON valides dans Snowflake, mais pas dans les tests locaux, dans lesquels les fonctionnalités JSON intégrées de Python sont utilisées. Vous pouvez spécifier les variablessnowflake.snowpark.mock.CUSTOM_JSON_ENCODER
etsnowflake.snowpark.mock.CUSTOM_JSON_DECODER
au niveau du module pour remplacer les paramètres par défaut.Seul un sous-ensemble des fonctions de Snowflake (y compris les fonctions de fenêtre) est mis en œuvre. Pour savoir comment injecter votre propre définition de fonction, voir Correction des fonctions intégrées.
La correction des fonctions liées au rang n’est actuellement pas prise en charge.
Modèles de format SQL ne sont pas pris en charge. Par exemple, l’implémentation fictive de
to_decimal
ne gère pas le paramètre facultatifformat
.La bibliothèque Python de Snowpark n’a pas d’API intégrée Python pour créer ou supprimer des zones de préparation, de sorte que le cadre de tests local suppose que chaque zone de préparation entrante a déjà été créée.
L’implémentation actuelle d’UDFs et de procédures stockées n’effectue aucune validation de paquet. Tous les paquets référencés dans votre code doivent être installés avant l’exécution du programme.
Les balises d’interrogation ne sont pas prises en charge.
L’historique des requêtes n’est pas pris en charge.
La lignée n’est pas prise en charge.
Lors de l’enregistrement d’une UDF ou d’une procédure stockée, les paramètres facultatifs tels que
parallel
,execute_as
,statement_params
,source_code_display
,external_access_integrations
,secrets
, etcomment
sont ignorés.Pour
Table.sample
, SYSTEM ou BLOCK, l’échantillonnage est le même que pour ROW.Snowflake ne prend pas officiellement en charge l’exécution du cadre de test local à l’intérieur des procédures stockées. Les sessions en mode de test local à l’intérieur des procédures stockées peuvent rencontrer ou déclencher des erreurs inattendues.
Fonctionnalités non prises en charge¶
Vous trouverez ci-dessous la liste des fonctions qui ne sont actuellement pas implémentées dans le cadre de test local. Snowflake travaille activement à la résolution de ces problèmes.
En règle générale, toute référence à ces fonctionnalités devrait susciter un NotImplementedError
:
UDTFs (fonctions de table définies par l’utilisateur)
UDAFs (fonctions agrégées définies par l’utilisateur)
UDFs et UDTFs vectorisées
Fonctions de table intégrées
Procédures stockées de table
Types de données
Geometry
,Geography
, etVector
.Expressions d’intervalles
Lire des formats de fichiers autres que JSON et CSV
Pour un format de fichier pris en charge, toutes les options de lecture ne sont pas prises en charge. Par exemple,
infer_schema
n’est pas pris en charge pour le format CSV.
Pour toutes les fonctions qui ne sont pas annoncées ici comme non prises en charge ou comme une limitation connue, consultez la dernière liste des demandes de fonctions pour des tests locaux, ou créez une demande de fonction dans le référentiel snowpark-python
GitHub.
Problèmes connus¶
Vous trouverez ci-dessous une liste des problèmes connus ou des écarts de comportement qui existent dans le cadre des tests locaux. Snowflake planifie activement la résolution de ces problèmes.
L’utilisation de fonctions de fenêtre à l’intérieur de
DataFrame.groupby
ou d’autres opérations d’agrégation n’est pas prise en charge.# Selecting window function expressions is supported df.select("key", "value", sum_("value").over(), avg("value").over()) # Aggregating window function expressions is NOT supported df.group_by("key").agg([sum_("value"), sum_(sum_("value")).over(window) - sum_("value")])
La sélection de colonnes portant le même nom renvoie une seule colonne. Pour contourner le problème, utilisez
Column.alias
pour renommer les colonnes pour qu’elles aient des noms distincts.df.select(lit(1), lit(1)).show() # col("a"), col("a") #--------- #|"'1'" | #--------- #|1 | #|... | #--------- # Workaround: Column.alias DataFrame.select(lit(1).alias("col1_1"), lit(1).alias("col1_2")) # "col1_1", "col1_2"
Pour
Table.merge
etTable.update
, les paramètres de sessionERROR_ON_NONDETERMINISTIC_UPDATE
etERROR_ON_NONDETERMINISTIC_MERGE
doivent être définis surFalse
. Cela signifie que pour les jointures multiples, l’une des lignes correspondantes est mise à jour.Les noms de zone de préparation entièrement qualifiés dans les opérations de fichiers GET et PUT ne sont pas pris en charge. Les noms des bases de données et des schémas sont traités comme faisant partie du nom de la zone de préparation.
L’implémentation de
mock_to_char
ne prend en charge que les horodatages dans un format qui comporte des séparateurs entre les différentes parties temporelles.DataFrame.pivot
dispose d’un paramètre appelévalues
qui permet de limiter un pivot à des valeurs spécifiques. Seules des valeurs définies statistiquement peuvent être utilisées pour l’instant. Les valeurs fournies à l’aide d’une sous-requête entraînent une erreur.La création d’un
DataFrame
à partir d’unDataFrame
pandas contenant un horodatage avec des informations sur le fuseau horaire n’est pas prise en charge.
Pour tout problème non mentionné dans cette liste, consultez la dernière liste des problèmes ouverts, ou créez un rapport de bogue dans le référentiel snowpark-python
GitHub.