Écriture de tests pour Python Snowpark¶
Cette rubrique explique comment tester votre code Snowpark tout en étant connecté à Snowflake. Vous pouvez utiliser des utilitaires de test standards, comme PyTest, pour tester vos procédures stockées, vos transformations DataFrame et vos UDFs Python Snowpark.
Dans ce chapitre :
Des tests approfondis permettent d’éviter les modifications entraînant des ruptures involontaires. Les tests unitaires permettent de vérifier qu’une section du code fonctionne comme prévu. Les tests d’intégration permettent de s’assurer que les composants fonctionnent correctement ensemble pour un cas d’utilisation de bout en bout.
Les exemples présentés dans ce document utilisent PyTest, l’un des cadres de test les plus populaires pour Python. Pour des conseils supplémentaires et les meilleures pratiques, voir la documentation PyTest.
Vous pouvez également utiliser le cadre de test local Python Snowpark pour créer et utiliser des DataFrames Python Snowpark localement sans vous connecter à un compte Snowflake. Pour plus d’informations, voir Cadre de test local.
Configuration des tests¶
Installez PyTest dans votre projet, en exécutant pip install pytest
ou conda install pytest
. Vous pouvez également l’ajouter à votre fichier d’environnement requirements.txt
ou conda.
Créez un répertoire test
à côté de votre répertoire de code source et ajoutez-y vos tests unitaires et d’intégration. Pour un exemple, voir le modèle de projet Python Snowpark.
Création d’une fixture PyTest pour la session Snowpark¶
Les fixtures PyTest sont des fonctions qui sont exécutées avant un test (ou un module de tests) pour fournir des données ou des connexions aux tests. Dans ce scénario, créez une fixture PyTest qui renvoie un objet Session
Snowpark.
Créez un répertoire
test
, si vous n’en avez pas déjà un.Créez un
conftest.py
soustest
avec le contenu suivant, oùconnection_parameters
est un dictionnaire, avec les identifiants de connexion de votre compte Snowflake. Pour plus d’informations sur le format de dictionnaire, voir Création d’une session.Créez la fixture
Session
sous forme de fixture à l’échelle du module plutôt qu’à l’échelle du fichier, afin d’éviter la création de plusieurs sessions et des problèmes en raison d’objets de session conflictuels.
from snowflake.snowpark.session import Session
@pytest.fixture(scope='module')
def session(request) -> Session:
connection_parameters = {}
return Session.builder.configs(...).create()
Tests unitaires pour UDFs¶
Vous pouvez tester votre logique UDF Python en testant le gestionnaire UDF sous forme de méthode Python générique.
Créez un fichier sous votre répertoire
test
pour les tests unitaires UDF. Par exemple, nommez le fichiertest_functions.py
.Importez les méthodes Python à tester.
Pour chaque scénario de test, créez une méthode Python nommée
test_<scénario_à_tester>
.
Par exemple, voici un gestionnaire UDF Python :
def fahrenheit_to_celsius(temp_f: float) -> float:
"""
Converts fahrenheit to celsius
"""
return (float(temp_f) - 32) * (5/9)
Vous pouvez importer cette méthode dans le fichier de test (test/test_functions.py
) et effectuer le test sous forme de méthode Python générique.
import farenheit_to_celsius
def test_farenheit_to_celsius():
expected = 0.0
actual = farenheit_to_celsius(32)
assert expected == actual
Tests unitaires pour les transformations de DataFrame¶
L’ajout de tests unitaires pour vos transformations de DataFrame permet de se prémunir contre les bogues et les régressions imprévus. Pour faciliter les tests de votre logique DataFrame, encapsulez les transformations dans une méthode Python qui prend en entrée les DataFrames à transformer et renvoie les DataFrames transformés.
Dans l’exemple ci-dessous, mf_df_transformer
contient la logique de transformation. Elle peut être importée dans d’autres modules du projet Python et facilement testée.
from snowflake.snowpark.dataframe import DataFrame, col
def my_df_tranformer(df: DataFrame) -> DataFrame:
return df \
.with_column('c', df['a']+df['b']) \
.filter(col('c') > 3)
Pour tester cette transformation, procédez comme suit :
Créez un fichier pour les tests DataFrame,
test_transformers.py
, sous le répertoiretest
(test/test_transformers.py
).Créez une méthode de test pour le transformateur à tester :
test_my_df_transformer(session)
. Le paramètresession
fait ici référence à la fixture de session créée dans la section précédente.À l’aide de la fixture de session, créez les DataFrames d’entrée et ceux de sortie prévus dans la méthode de test.
Transmettez le DataFrame d’entrée au transformateur et comparez le DataFrame prévu au DataFrame effectivement renvoyé par le transformateur.
# test/test_transformers.py
import my_df_transformer
def test_my_df_transformer(session):
input_df = session.create_dataframe([[1,2],[3,4]], ['a', 'b'])
expected_df = session.create_dataframe([3,4,7], ['a','b','c'])
actual_df = my_df_transformer(input_df)
assert input_df.collect() == actual_df.collect()
Tests d’intégration pour les procédures stockées¶
Pour tester vos gestionnaires de procédure stockée, utilisez la fixture de session pour appeler le gestionnaire de procédure stockée. Si votre procédure stockée lit des tables, comme dans un pipeline ETL, vous pouvez créer ces tables avant d’appeler le gestionnaire de procédure stockée, comme le montre l’exemple ci-dessous. Ce modèle garantit le suivi de vos données d’entrée dans le contrôle source et le fait qu’elles ne changent pas de manière imprévue entre les exécutions des tests.
from project import my_sproc_handler # import stored proc handler
def test_my_sproc_handler(session: Session):
# Create input table
input_tbl = session.create_dataframe(
data=[...],
schema=[...],
)
input_tbl.write.mode('overwrite').save_as_table(['DB', 'SCHEMA', 'INPUT_TBL'], mode='overwrite')
# Create expected output dataframe
expected_df = session.create_dataframe(
data=[...],
schema=[...],
).collect()
# Call the stored procedure
my_sproc_handler()
# Get actual table
actual_tbl = session.table(['DB', 'SCHEMA', 'OUTPUT_TBL']).collect()
# Clean up tables
session.table(['DB', 'SCHEMA', 'OUTPUT_TBL']).delete()
session.table(['DB', 'SCHEMA', 'INPUT_TBL']).delete()
# Compare the actual and expected tables
assert expected_df == actual_tbl