Didacticiel : Test de Python Snowpark¶
Introduction¶
Ce didacticiel présente les bases pour tester votre code Python Snowpark.
Ce que vous apprendrez¶
Dans ce tutoriel, vous apprendrez à :
Testez votre code Snowpark en étant connecté à Snowflake.
Vous pouvez utiliser des utilitaires de test standards, comme PyTest, pour tester vos procédures stockées, vos transformations DataFrame et vos UDFs Python Snowpark.
Testez vos DataFrames Python Snowpark localement sans vous connecter à un compte Snowflake en utilisant le cadre de test local.
Vous pouvez utiliser le cadre de test local pour effectuer des tests localement, sur votre poste de développement, avant de déployer les modifications du code.
Conditions préalables¶
Pour utiliser le cadre de test local :
Vous devez utiliser la version 1.11.1 ou une version supérieure de la bibliothèque Python Snowpark.
Les versions de Python prises en charge sont les suivantes :
3,8
3,9
3,10
3,11
Configuration du projet¶
Dans cette section, vous allez cloner le référentiel du projet et configurer l’environnement dont vous aurez besoin pour le didacticiel.
Clonez le référentiel du projet.
git clone https://github.com/Snowflake-Labs/sftutorial-snowpark-testing
Si vous n’avez pas installé git, accédez à la page du référentiel et téléchargez le contenu en cliquant sur Code » Download Contents.
Définissez les variables d’environnement à l’aide des identifiants de connexion de votre compte. L’API Snowpark les utilisera pour s’authentifier auprès de votre compte Snowflake.
# Linux/MacOS export SNOWSQL_ACCOUNT=<replace with your account identifer> export SNOWSQL_USER=<replace with your username> export SNOWSQL_ROLE=<replace with your role> export SNOWSQL_PWD=<replace with your password> export SNOWSQL_DATABASE=<replace with your database> export SNOWSQL_SCHEMA=<replace with your schema> export SNOWSQL_WAREHOUSE=<replace with your warehouse>
# Windows/PowerShell $env:SNOWSQL_ACCOUNT = "<replace with your account identifer>" $env:SNOWSQL_USER = "<replace with your username>" $env:SNOWSQL_ROLE = "<replace with your role>" $env:SNOWSQL_PWD = "<replace with your password>" $env:SNOWSQL_DATABASE = "<replace with your database>" $env:SNOWSQL_SCHEMA = "<replace with your schema>" $env:SNOWSQL_WAREHOUSE = "<replace with your warehouse>"
Facultatif : Vous pouvez définir cette variable d’environnement de manière permanente en modifiant votre profil bash (sous Linux/MacOS) ou en utilisant le menu System Properties (sous Windows).
Créez et activez un environnement conda via Anaconda :
conda env create --file environment.yml conda activate snowpark-testing
Créez l’exemple de table dans votre compte en exécutant
setup/create_table.py
. Ce script Python créera une base de données appelée CITIBIKE, un schéma appelé PUBLIC et une petite table appelée TRIPS.python setup/create_table.py
Vous êtes maintenant prêt à passer à la section suivante. Dans cette section, vous :
avez cloné le référentiel du didacticiel ;
avez créé des variables d’environnement à l’aide des identifiants de connexion de votre compte ;
avez créé un environnement conda pour le projet ;
vous êtes connecté à Snowflake à l’aide de l’API Snowpark et vous avez créé un exemple de base de données, de schéma et de table.
Essayer la procédure stockée¶
L’exemple de projet comprend un gestionnaire de procédure stockée (sproc.py
) et trois méthodes de transformation de DataFrames (transformers.py
). Le gestionnaire de procédure stockée utilise les transformateurs d’UDF et de DataFrame pour lire la table source CITIBIKE.PUBLIC.TRIPS
et crée deux tables de faits : MONTH_FACTS
et BIKE_FACTS
.
Vous pouvez exécuter la procédure stockée à partir de la ligne de commande en exécutant cette commande.
python project/sproc.py
Maintenant que vous vous êtes familiarisé avec le projet, dans la section suivante, vous allez configurer le répertoire de test et créer une fixture PyTest pour la session Snowflake.
Créer un fixture PyTest pour la session Snowflake¶
Les fixtures PyTest sont des fonctions qui sont exécutées avant un test (ou un module de tests), généralement pour fournir des données ou des connexions aux tests. Pour ce projet, vous allez créer une fixture PyTest qui renvoie un objet Session
Snowpark. Vos cas de test utiliseront cette session pour se connecter à Snowflake.
Créez un répertoire
test
sous le répertoire racine du projet.mkdir test
Sous le répertoire
test
, créez un nouveau fichier Python nomméconftest.py
. Dansconftest.py
, créez une fixture PyTest pour l’objetSession
:import pytest from project.utils import get_env_var_config from snowflake.snowpark.session import Session @pytest.fixture def session() -> Session: return Session.builder.configs(get_env_var_config()).create()
Ajouter des tests unitaires pour les transformateurs de DataFrame¶
Dans le répertoire
test
, créez un nouveau fichier Python nommétest_transformers.py
.Dans le fichier
test_transformers.py
, importez les méthodes de transformation.# test/test_transformers.py from project.transformers import add_rider_age, calc_bike_facts, calc_month_facts
Ensuite, créez des tests unitaires pour ces transformateurs. La convention type consiste à créer une méthode pour chaque test avec le nom
test_<nom de la méthode>
. Dans notre cas, les tests seront les suivants :# test/test_transformers.py from project.transformers import add_rider_age, calc_bike_facts, calc_month_facts def test_add_rider_age(session): ... def test_calc_bike_facts(session): ... def test_calc_month_facts(session): ...
Le paramètre
session
de chaque cas de test fait référence à la fixture PyTest que vous avez créée dans la section précédente.Mettez maintenant en œuvre les cas de test pour chaque transformateur. Utilisez le modèle suivant.
Créez un DataFrame d’entrée.
Créer le DataFrame de sortie prévu.
Transmettez le DataFrame d’entrée de l’étape 1 à la méthode de transformation.
Comparez la sortie de l’étape 3 à la sortie prévue de l’étape 2.
# test/test_transformers.py from project.transformers import add_rider_age, calc_bike_facts, calc_month_facts from snowflake.snowpark.types import StructType, StructField, IntegerType, FloatType def test_add_rider_age(session: Session): input = session.create_dataframe( [ [1980], [1995], [2000] ], schema=StructType([StructField("BIRTH_YEAR", IntegerType())]) ) expected = session.create_dataframe( [ [1980, 43], [1995, 28], [2000, 23] ], schema=StructType([StructField("BIRTH_YEAR", IntegerType()), StructField("RIDER_AGE", IntegerType())]) ) actual = add_rider_age(input) assert expected.collect() == actual.collect() def test_calc_bike_facts(session: Session): input = session.create_dataframe([ [1, 10, 20], [1, 5, 30], [2, 20, 50], [2, 10, 60] ], schema=StructType([ StructField("BIKEID", IntegerType()), StructField("TRIPDURATION", IntegerType()), StructField("RIDER_AGE", IntegerType()) ]) ) expected = session.create_dataframe([ [1, 2, 7.5, 25.0], [2, 2, 15.0, 55.0], ], schema=StructType([ StructField("BIKEID", IntegerType()), StructField("COUNT", IntegerType()), StructField("AVG_TRIPDURATION", FloatType()), StructField("AVG_RIDER_AGE", FloatType()) ]) ) actual = calc_bike_facts(input) assert expected.collect() == actual.collect() def test_calc_month_facts(session: Session): from patches import patch_to_timestamp input = session.create_dataframe( data=[ ['2018-03-01 09:47:00.000 +0000', 1, 10, 15], ['2018-03-01 09:47:14.000 +0000', 2, 20, 12], ['2018-04-01 09:47:04.000 +0000', 3, 6, 30] ], schema=['STARTTIME', 'BIKE_ID', 'TRIPDURATION', 'RIDER_AGE'] ) expected = session.create_dataframe( data=[ ['Mar', 2, 15, 13.5], ['Apr', 1, 6, 30.0] ], schema=['MONTH', 'COUNT', 'AVG_TRIPDURATION', 'AVG_RIDER_AGE'] ) actual = calc_month_facts(input) assert expected.collect() == actual.collect()
Vous pouvez maintenant lancer PyTest pour exécuter tous les tests unitaires.
pytest test/test_transformers.py
Ajouter des tests d’intégration pour les procédures stockées¶
Maintenant que nous avons des tests unitaires pour les méthodes de transformation DataFrame, ajoutons un test d’intégration pour la procédure stockée. Le cas de test suivra ce modèle :
Créez une table représentant les données d’entrée de la procédure stockée.
Créez deux DataFrames avec le contenu prévu des deux tables de sortie de la procédure stockée.
Appelez la procédure stockée.
Comparez les tables de sortie effectives aux DataFrames de l’étape 2.
Nettoyez : supprimez la table d’entrée de l’étape 1 et les tables de sortie de l’étape 3.
Créez un fichier Python nommé test_sproc.py
sous le répertoire test
.
Importez le gestionnaire de procédure stockée depuis le répertoire du projet et créez un cas de test.
# test/test_sproc.py
from project.sproc import create_fact_tables
def test_create_fact_tables(session):
...
Mettez en œuvre le cas de test, en commençant par la création de la table d’entrée.
# test/test_sproc.py
from project.sproc import create_fact_tables
from snowflake.snowpark.types import *
def test_create_fact_tables(session):
DB = 'CITIBIKE'
SCHEMA = 'TEST'
# Set up source table
tbl = session.create_dataframe(
data=[
[1983, '2018-03-01 09:47:00.000 +0000', 551, 30958],
[1988, '2018-03-01 09:47:01.000 +0000', 242, 19278],
[1992, '2018-03-01 09:47:01.000 +0000', 768, 18461],
[1980, '2018-03-01 09:47:03.000 +0000', 690, 15533],
[1991, '2018-03-01 09:47:03.000 +0000', 490, 32449],
[1959, '2018-03-01 09:47:04.000 +0000', 457, 29411],
[1971, '2018-03-01 09:47:08.000 +0000', 279, 28015],
[1964, '2018-03-01 09:47:09.000 +0000', 546, 15148],
[1983, '2018-03-01 09:47:11.000 +0000', 358, 16967],
[1985, '2018-03-01 09:47:12.000 +0000', 848, 20644],
[1984, '2018-03-01 09:47:14.000 +0000', 295, 16365]
],
schema=['BIRTH_YEAR', 'STARTTIME', 'TRIPDURATION', 'BIKEID'],
)
tbl.write.mode('overwrite').save_as_table([DB, SCHEMA, 'TRIPS_TEST'], mode='overwrite')
Ensuite, créez des DataFrames pour les tables de sortie prévues.
# test/test_sproc.py
from project.sproc import create_fact_tables
from snowflake.snowpark.types import *
def test_create_fact_tables(session):
DB = 'CITIBIKE'
SCHEMA = 'TEST'
# Set up source table
tbl = session.create_dataframe(
data=[
[1983, '2018-03-01 09:47:00.000 +0000', 551, 30958],
[1988, '2018-03-01 09:47:01.000 +0000', 242, 19278],
[1992, '2018-03-01 09:47:01.000 +0000', 768, 18461],
[1980, '2018-03-01 09:47:03.000 +0000', 690, 15533],
[1991, '2018-03-01 09:47:03.000 +0000', 490, 32449],
[1959, '2018-03-01 09:47:04.000 +0000', 457, 29411],
[1971, '2018-03-01 09:47:08.000 +0000', 279, 28015],
[1964, '2018-03-01 09:47:09.000 +0000', 546, 15148],
[1983, '2018-03-01 09:47:11.000 +0000', 358, 16967],
[1985, '2018-03-01 09:47:12.000 +0000', 848, 20644],
[1984, '2018-03-01 09:47:14.000 +0000', 295, 16365]
],
schema=['BIRTH_YEAR', 'STARTTIME', 'TRIPDURATION', 'BIKEID'],
)
tbl.write.mode('overwrite').save_as_table([DB, SCHEMA, 'TRIPS_TEST'], mode='overwrite')
# Expected values
n_rows_expected = 12
bike_facts_expected = session.create_dataframe(
data=[
[30958, 1, 551.0, 40.0],
[19278, 1, 242.0, 35.0],
[18461, 1, 768.0, 31.0],
[15533, 1, 690.0, 43.0],
[32449, 1, 490.0, 32.0],
[29411, 1, 457.0, 64.0],
[28015, 1, 279.0, 52.0],
[15148, 1, 546.0, 59.0],
[16967, 1, 358.0, 40.0],
[20644, 1, 848.0, 38.0],
[16365, 1, 295.0, 39.0]
],
schema=StructType([
StructField("BIKEID", IntegerType()),
StructField("COUNT", IntegerType()),
StructField("AVG_TRIPDURATION", FloatType()),
StructField("AVG_RIDER_AGE", FloatType())
])
).collect()
month_facts_expected = session.create_dataframe(
data=[['Mar', 11, 502.18182, 43.00000]],
schema=StructType([
StructField("MONTH", StringType()),
StructField("COUNT", IntegerType()),
StructField("AVG_TRIPDURATION", DecimalType()),
StructField("AVG_RIDER_AGE", DecimalType())
])
).collect()
Pour finir, appelez la procédure stockée et lisez les tables de sortie. Comparez les tables effectives au contenu de DataFrame.
# test/test_sproc.py
from project.sproc import create_fact_tables
from snowflake.snowpark.types import *
def test_create_fact_tables(session):
DB = 'CITIBIKE'
SCHEMA = 'TEST'
# Set up source table
tbl = session.create_dataframe(
data=[
[1983, '2018-03-01 09:47:00.000 +0000', 551, 30958],
[1988, '2018-03-01 09:47:01.000 +0000', 242, 19278],
[1992, '2018-03-01 09:47:01.000 +0000', 768, 18461],
[1980, '2018-03-01 09:47:03.000 +0000', 690, 15533],
[1991, '2018-03-01 09:47:03.000 +0000', 490, 32449],
[1959, '2018-03-01 09:47:04.000 +0000', 457, 29411],
[1971, '2018-03-01 09:47:08.000 +0000', 279, 28015],
[1964, '2018-03-01 09:47:09.000 +0000', 546, 15148],
[1983, '2018-03-01 09:47:11.000 +0000', 358, 16967],
[1985, '2018-03-01 09:47:12.000 +0000', 848, 20644],
[1984, '2018-03-01 09:47:14.000 +0000', 295, 16365]
],
schema=['BIRTH_YEAR', 'STARTTIME', 'TRIPDURATION', 'BIKEID'],
)
tbl.write.mode('overwrite').save_as_table([DB, SCHEMA, 'TRIPS_TEST'], mode='overwrite')
# Expected values
n_rows_expected = 12
bike_facts_expected = session.create_dataframe(
data=[
[30958, 1, 551.0, 40.0],
[19278, 1, 242.0, 35.0],
[18461, 1, 768.0, 31.0],
[15533, 1, 690.0, 43.0],
[32449, 1, 490.0, 32.0],
[29411, 1, 457.0, 64.0],
[28015, 1, 279.0, 52.0],
[15148, 1, 546.0, 59.0],
[16967, 1, 358.0, 40.0],
[20644, 1, 848.0, 38.0],
[16365, 1, 295.0, 39.0]
],
schema=StructType([
StructField("BIKEID", IntegerType()),
StructField("COUNT", IntegerType()),
StructField("AVG_TRIPDURATION", FloatType()),
StructField("AVG_RIDER_AGE", FloatType())
])
).collect()
month_facts_expected = session.create_dataframe(
data=[['Mar', 11, 502.18182, 43.00000]],
schema=StructType([
StructField("MONTH", StringType()),
StructField("COUNT", IntegerType()),
StructField("AVG_TRIPDURATION", DecimalType()),
StructField("AVG_RIDER_AGE", DecimalType())
])
).collect()
# Call sproc, get actual values
n_rows_actual = create_fact_tables(session, 'TRIPS_TEST')
bike_facts_actual = session.table([DB, SCHEMA, 'bike_facts']).collect()
month_facts_actual = session.table([DB, SCHEMA, 'month_facts']).collect()
# Comparisons
assert n_rows_expected == n_rows_actual
assert bike_facts_expected == bike_facts_actual
assert month_facts_expected == month_facts_actual
Pour exécuter le cas de test, lancez pytest
à partir du terminal.
pytest test/test_sproc.py
Pour exécuter tous les tests du projet, lancez pytest
sans aucune autre option.
pytest
Configurer des tests locaux¶
À ce stade, vous disposez d’une suite de tests PyTest pour vos transformateurs de DataFrame et votre procédure stockée. Dans chaque cas de test, la fixture Session
est utilisée pour la connexion à votre compte Snowflake, l’envoi du SQL de l’API Python Snowpark et la récupération de la réponse.
Vous pouvez également utiliser le cadre de test local pour exécuter les transformations localement sans connexion à Snowflake. Dans les grandes suites de tests, l’exécution des tests peut s’en trouver considérablement accélérée. Cette section montre comment mettre à jour la suite de tests pour utiliser les fonctionnalités de cadre de test local.
Commencez par mettre à jour la fixture PyTest
Session
. Nous allons ajouter une option de ligne de commande à PyTest pour basculer entre les modes de test local et en direct.# test/conftest.py import pytest from project.utils import get_env_var_config from snowflake.snowpark.session import Session def pytest_addoption(parser): parser.addoption("--snowflake-session", action="store", default="live") @pytest.fixture(scope='module') def session(request) -> Session: if request.config.getoption('--snowflake-session') == 'local': return Session.builder.configs({'local_testing': True}).create() else: return Session.builder.configs(get_env_var_config()).create()
Nous devons d’abord corriger cette méthode, car toutes les fonctions intégrées ne sont pas prises en charge par le cadre de test local, par exemple, la fonction
monthname()
utilisée dans le transformateurcalc_month_facts()
. Créez un fichier nommépatches.py
sous le répertoire de tests. Dans ce fichier, collez le code suivant.from snowflake.snowpark.mock.functions import patch from snowflake.snowpark.functions import monthname from snowflake.snowpark.mock.snowflake_data_type import ColumnEmulator, ColumnType from snowflake.snowpark.types import StringType import datetime import calendar @patch(monthname) def patch_monthname(column: ColumnEmulator) -> ColumnEmulator: ret_column = ColumnEmulator(data=[ calendar.month_abbr[datetime.datetime.strptime(row, '%Y-%m-%d %H:%M:%S.%f %z').month] for row in column]) ret_column.sf_type = ColumnType(StringType(), True) return ret_column
Le correctif ci-dessus accepte un seul paramètre,
column
, qui est un objet de typepandas.Series
contenant les lignes de données de la colonne. Nous utilisons ensuite une combinaison de méthodes issues des modules Pythondatetime
etcalendar
pour émuler la fonctionnalité de la colonnemonthname()
intégrée. Pour finir, nous définissons le type de renvoi surString
, car la méthode intégrée renvoie des chaînes correspondant aux mois (« Jan », « Feb », « Mar », etc.).Ensuite, importez cette méthode dans les tests du transformateur de DataFrame et de la procédure stockée.
# test/test_transformers.py # No changes to the other unit test methods def test_calc_month_facts(request, session): # Add conditional to include the patch if local testing is being used if request.config.getoption('--snowflake-session') == 'local': from patches import patch_monthname # No other changes
Réexécutez
pytest
avec l’indicateur local.pytest test/test_transformers.py --snowflake-session local
Appliquez maintenant le même correctif au test de la procédure stockée.
#test/test_sproc.py def test_create_fact_tables(request, session): # Add conditional to include the patch if local testing is being used if request.config.getoption('--snowflake-session') == 'local': from patches import patch_monthname # No other changes required
Réexécutez pytest avec l’indicateur local.
pytest test/test_sproc.py --snowflake-session local
Pour conclure, comparons le temps nécessaire à l’exécution de la suite de tests complète en local par rapport à celle effectuée dans le cadre d’une connexion en direct. Nous utiliserons la commande
time
pour mesurer le temps nécessaire aux deux commandes. Commençons par la connexion en direct.time pytest
Dans ce cas, l’exécution de la suite de tests a pris 7,89 secondes. (Le temps exact peut varier en fonction de votre ordinateur, de votre connexion réseau et d’autres facteurs.)
=================================== test session starts ========================== platform darwin -- Python 3.9.18, pytest-7.4.3, pluggy-1.3.0 rootdir: /Users/jfreeberg/Desktop/snowpark-testing-tutorial configfile: pytest.ini collected 4 items test/test_sproc.py . [ 25%] test/test_transformers.py ... [100%] =================================== 4 passed in 6.86s ================================= pytest 1.63s user 1.86s system 44% cpu 7.893 total
Essayons maintenant avec le cadre de test local :
time pytest --snowflake-session local
Avec le cadre de test local, l’exécution de la suite de tests n’a pris qu’1 seconde !
================================== test session starts ================================ platform darwin -- Python 3.9.18, pytest-7.4.3, pluggy-1.3.0 rootdir: /Users/jfreeberg/Desktop/snowpark-testing-tutorial configfile: pytest.ini collected 4 items test/test_sproc.py . [ 25%] test/test_transformers.py ... [100%] =================================== 4 passed in 0.10s ================================== pytest --snowflake-session local 1.37s user 1.70s system 281% cpu 1.093 total
En savoir plus¶
Vous avez terminé ! C’est bien fait.
Dans ce didacticiel, vous avez pu assister à une présentation complète de la façon dont vous pouvez tester votre code Python Snowpark. Vous avez effectué les opérations suivantes :
Créé une fixture PyTest et ajouté des tests unitaires et des tests d’intégration.
Pour plus d’informations, voir Écriture de tests pour Python Snowpark.
Configuré des tests locaux.
Pour plus d’informations, voir Cadre de test local.