Didacticiel : Test de Python Snowpark

Introduction

Ce didacticiel présente les bases pour tester votre code Python Snowpark.

Ce que vous apprendrez

Dans ce tutoriel, vous apprendrez à :

  • Testez votre code Snowpark en étant connecté à Snowflake.

    Vous pouvez utiliser des utilitaires de test standards, comme PyTest, pour tester vos procédures stockées, vos transformations DataFrame et vos UDFs Python Snowpark.

  • Testez vos DataFrames Python Snowpark localement sans vous connecter à un compte Snowflake en utilisant le cadre de test local.

    Vous pouvez utiliser le cadre de test local pour effectuer des tests localement, sur votre poste de développement, avant de déployer les modifications du code.

Conditions préalables

Pour utiliser le cadre de test local :

  • Vous devez utiliser la version 1.11.1 ou une version supérieure de la bibliothèque Python Snowpark.

  • Les versions de Python prises en charge sont les suivantes :

    • 3,8

    • 3,9

    • 3,10

    • 3,11

Configuration du projet

Dans cette section, vous allez cloner le référentiel du projet et configurer l’environnement dont vous aurez besoin pour le didacticiel.

  1. Clonez le référentiel du projet.

    git clone https://github.com/Snowflake-Labs/sftutorial-snowpark-testing
    
    Copy

    Si vous n’avez pas installé git, accédez à la page du référentiel et téléchargez le contenu en cliquant sur Code » Download Contents.

  2. Définissez les variables d’environnement à l’aide des identifiants de connexion de votre compte. L’API Snowpark les utilisera pour s’authentifier auprès de votre compte Snowflake.

    # Linux/MacOS
    export SNOWSQL_ACCOUNT=<replace with your account identifer>
    export SNOWSQL_USER=<replace with your username>
    export SNOWSQL_ROLE=<replace with your role>
    export SNOWSQL_PWD=<replace with your password>
    export SNOWSQL_DATABASE=<replace with your database>
    export SNOWSQL_SCHEMA=<replace with your schema>
    export SNOWSQL_WAREHOUSE=<replace with your warehouse>
    
    Copy
    # Windows/PowerShell
    $env:SNOWSQL_ACCOUNT = "<replace with your account identifer>"
    $env:SNOWSQL_USER = "<replace with your username>"
    $env:SNOWSQL_ROLE = "<replace with your role>"
    $env:SNOWSQL_PWD = "<replace with your password>"
    $env:SNOWSQL_DATABASE = "<replace with your database>"
    $env:SNOWSQL_SCHEMA = "<replace with your schema>"
    $env:SNOWSQL_WAREHOUSE = "<replace with your warehouse>"
    
    Copy

    Facultatif : Vous pouvez définir cette variable d’environnement de manière permanente en modifiant votre profil bash (sous Linux/MacOS) ou en utilisant le menu System Properties (sous Windows).

  3. Créez et activez un environnement conda via Anaconda :

    conda env create --file environment.yml
    conda activate snowpark-testing
    
    Copy
  4. Créez l’exemple de table dans votre compte en exécutant setup/create_table.py. Ce script Python créera une base de données appelée CITIBIKE, un schéma appelé PUBLIC et une petite table appelée TRIPS.

    python setup/create_table.py
    
    Copy

Vous êtes maintenant prêt à passer à la section suivante. Dans cette section, vous :

  • avez cloné le référentiel du didacticiel ;

  • avez créé des variables d’environnement à l’aide des identifiants de connexion de votre compte ;

  • avez créé un environnement conda pour le projet ;

  • vous êtes connecté à Snowflake à l’aide de l’API Snowpark et vous avez créé un exemple de base de données, de schéma et de table.

Essayer la procédure stockée

L’exemple de projet comprend un gestionnaire de procédure stockée (sproc.py) et trois méthodes de transformation de DataFrames (transformers.py). Le gestionnaire de procédure stockée utilise les transformateurs d’UDF et de DataFrame pour lire la table source CITIBIKE.PUBLIC.TRIPS et crée deux tables de faits : MONTH_FACTS et BIKE_FACTS.

Vous pouvez exécuter la procédure stockée à partir de la ligne de commande en exécutant cette commande.

python project/sproc.py
Copy

Maintenant que vous vous êtes familiarisé avec le projet, dans la section suivante, vous allez configurer le répertoire de test et créer une fixture PyTest pour la session Snowflake.

Créer un fixture PyTest pour la session Snowflake

Les fixtures PyTest sont des fonctions qui sont exécutées avant un test (ou un module de tests), généralement pour fournir des données ou des connexions aux tests. Pour ce projet, vous allez créer une fixture PyTest qui renvoie un objet Session Snowpark. Vos cas de test utiliseront cette session pour se connecter à Snowflake.

  1. Créez un répertoire test sous le répertoire racine du projet.

    mkdir test
    
    Copy
  2. Sous le répertoire test, créez un nouveau fichier Python nommé conftest.py. Dans conftest.py, créez une fixture PyTest pour l’objet Session :

    import pytest
    from project.utils import get_env_var_config
    from snowflake.snowpark.session import Session
    
    @pytest.fixture
    def session() -> Session:
        return Session.builder.configs(get_env_var_config()).create()
    
    Copy

Ajouter des tests unitaires pour les transformateurs de DataFrame

  1. Dans le répertoire test, créez un nouveau fichier Python nommé test_transformers.py.

  2. Dans le fichier test_transformers.py, importez les méthodes de transformation.

    # test/test_transformers.py
    
    from project.transformers import add_rider_age, calc_bike_facts, calc_month_facts
    
    Copy
  3. Ensuite, créez des tests unitaires pour ces transformateurs. La convention type consiste à créer une méthode pour chaque test avec le nom test_<nom de la méthode>. Dans notre cas, les tests seront les suivants :

    # test/test_transformers.py
    from project.transformers import add_rider_age, calc_bike_facts, calc_month_facts
    def test_add_rider_age(session):
        ...
    
    def test_calc_bike_facts(session):
        ...
    
    
    def test_calc_month_facts(session):
        ...
    
    Copy

    Le paramètre session de chaque cas de test fait référence à la fixture PyTest que vous avez créée dans la section précédente.

  4. Mettez maintenant en œuvre les cas de test pour chaque transformateur. Utilisez le modèle suivant.

    1. Créez un DataFrame d’entrée.

    2. Créer le DataFrame de sortie prévu.

    3. Transmettez le DataFrame d’entrée de l’étape 1 à la méthode de transformation.

    4. Comparez la sortie de l’étape 3 à la sortie prévue de l’étape 2.

    # test/test_transformers.py
    from project.transformers import add_rider_age, calc_bike_facts, calc_month_facts
    from snowflake.snowpark.types import StructType, StructField, IntegerType, FloatType
    
    def test_add_rider_age(session: Session):
        input = session.create_dataframe(
            [
                [1980],
                [1995],
                [2000]
            ],
            schema=StructType([StructField("BIRTH_YEAR", IntegerType())])
        )
    
        expected = session.create_dataframe(
            [
                [1980, 43],
                [1995, 28],
                [2000, 23]
            ],
            schema=StructType([StructField("BIRTH_YEAR", IntegerType()), StructField("RIDER_AGE", IntegerType())])
        )
    
        actual = add_rider_age(input)
        assert expected.collect() == actual.collect()
    
    
    def test_calc_bike_facts(session: Session):
        input = session.create_dataframe([
                [1, 10, 20],
                [1, 5, 30],
                [2, 20, 50],
                [2, 10, 60]
            ],
            schema=StructType([
                StructField("BIKEID", IntegerType()),
                StructField("TRIPDURATION", IntegerType()),
                StructField("RIDER_AGE", IntegerType())
            ])
        )
    
        expected = session.create_dataframe([
                [1, 2, 7.5, 25.0],
                [2, 2, 15.0, 55.0],
            ],
            schema=StructType([
                StructField("BIKEID", IntegerType()),
                StructField("COUNT", IntegerType()),
                StructField("AVG_TRIPDURATION", FloatType()),
                StructField("AVG_RIDER_AGE", FloatType())
            ])
        )
    
        actual = calc_bike_facts(input)
        assert expected.collect() == actual.collect()
    
    
    def test_calc_month_facts(session: Session):
        from patches import patch_to_timestamp
    
        input = session.create_dataframe(
            data=[
                ['2018-03-01 09:47:00.000 +0000', 1, 10,  15],
                ['2018-03-01 09:47:14.000 +0000', 2, 20, 12],
                ['2018-04-01 09:47:04.000 +0000', 3, 6,  30]
            ],
            schema=['STARTTIME', 'BIKE_ID', 'TRIPDURATION', 'RIDER_AGE']
        )
    
        expected = session.create_dataframe(
            data=[
                ['Mar', 2, 15, 13.5],
                ['Apr', 1, 6, 30.0]
            ],
            schema=['MONTH', 'COUNT', 'AVG_TRIPDURATION', 'AVG_RIDER_AGE']
        )
    
        actual = calc_month_facts(input)
    
        assert expected.collect() == actual.collect()
    
    Copy
  5. Vous pouvez maintenant lancer PyTest pour exécuter tous les tests unitaires.

    pytest test/test_transformers.py
    
    Copy

Ajouter des tests d’intégration pour les procédures stockées

Maintenant que nous avons des tests unitaires pour les méthodes de transformation DataFrame, ajoutons un test d’intégration pour la procédure stockée. Le cas de test suivra ce modèle :

  1. Créez une table représentant les données d’entrée de la procédure stockée.

  2. Créez deux DataFrames avec le contenu prévu des deux tables de sortie de la procédure stockée.

  3. Appelez la procédure stockée.

  4. Comparez les tables de sortie effectives aux DataFrames de l’étape 2.

  5. Nettoyez : supprimez la table d’entrée de l’étape 1 et les tables de sortie de l’étape 3.

Créez un fichier Python nommé test_sproc.py sous le répertoire test.

Importez le gestionnaire de procédure stockée depuis le répertoire du projet et créez un cas de test.

# test/test_sproc.py
from project.sproc import create_fact_tables

def test_create_fact_tables(session):
    ...
Copy

Mettez en œuvre le cas de test, en commençant par la création de la table d’entrée.

# test/test_sproc.py
from project.sproc import create_fact_tables
from snowflake.snowpark.types import *

def test_create_fact_tables(session):
    DB = 'CITIBIKE'
    SCHEMA = 'TEST'

    # Set up source table
    tbl = session.create_dataframe(
        data=[
            [1983, '2018-03-01 09:47:00.000 +0000', 551, 30958],
            [1988, '2018-03-01 09:47:01.000 +0000', 242, 19278],
            [1992, '2018-03-01 09:47:01.000 +0000', 768, 18461],
            [1980, '2018-03-01 09:47:03.000 +0000', 690, 15533],
            [1991, '2018-03-01 09:47:03.000 +0000', 490, 32449],
            [1959, '2018-03-01 09:47:04.000 +0000', 457, 29411],
            [1971, '2018-03-01 09:47:08.000 +0000', 279, 28015],
            [1964, '2018-03-01 09:47:09.000 +0000', 546, 15148],
            [1983, '2018-03-01 09:47:11.000 +0000', 358, 16967],
            [1985, '2018-03-01 09:47:12.000 +0000', 848, 20644],
            [1984, '2018-03-01 09:47:14.000 +0000', 295, 16365]
        ],
        schema=['BIRTH_YEAR', 'STARTTIME', 'TRIPDURATION',    'BIKEID'],
    )

    tbl.write.mode('overwrite').save_as_table([DB, SCHEMA, 'TRIPS_TEST'], mode='overwrite')
Copy

Ensuite, créez des DataFrames pour les tables de sortie prévues.

# test/test_sproc.py
from project.sproc import create_fact_tables
from snowflake.snowpark.types import *

def test_create_fact_tables(session):
    DB = 'CITIBIKE'
    SCHEMA = 'TEST'

    # Set up source table
    tbl = session.create_dataframe(
        data=[
            [1983, '2018-03-01 09:47:00.000 +0000', 551, 30958],
            [1988, '2018-03-01 09:47:01.000 +0000', 242, 19278],
            [1992, '2018-03-01 09:47:01.000 +0000', 768, 18461],
            [1980, '2018-03-01 09:47:03.000 +0000', 690, 15533],
            [1991, '2018-03-01 09:47:03.000 +0000', 490, 32449],
            [1959, '2018-03-01 09:47:04.000 +0000', 457, 29411],
            [1971, '2018-03-01 09:47:08.000 +0000', 279, 28015],
            [1964, '2018-03-01 09:47:09.000 +0000', 546, 15148],
            [1983, '2018-03-01 09:47:11.000 +0000', 358, 16967],
            [1985, '2018-03-01 09:47:12.000 +0000', 848, 20644],
            [1984, '2018-03-01 09:47:14.000 +0000', 295, 16365]
        ],
        schema=['BIRTH_YEAR', 'STARTTIME', 'TRIPDURATION',    'BIKEID'],
    )

    tbl.write.mode('overwrite').save_as_table([DB, SCHEMA, 'TRIPS_TEST'], mode='overwrite')

    # Expected values
    n_rows_expected = 12
    bike_facts_expected = session.create_dataframe(
        data=[
            [30958, 1, 551.0, 40.0],
            [19278, 1, 242.0, 35.0],
            [18461, 1, 768.0, 31.0],
            [15533, 1, 690.0, 43.0],
            [32449, 1, 490.0, 32.0],
            [29411, 1, 457.0, 64.0],
            [28015, 1, 279.0, 52.0],
            [15148, 1, 546.0, 59.0],
            [16967, 1, 358.0, 40.0],
            [20644, 1, 848.0, 38.0],
            [16365, 1, 295.0, 39.0]
        ],
        schema=StructType([
            StructField("BIKEID", IntegerType()),
            StructField("COUNT", IntegerType()),
            StructField("AVG_TRIPDURATION", FloatType()),
            StructField("AVG_RIDER_AGE", FloatType())
        ])
    ).collect()

    month_facts_expected = session.create_dataframe(
        data=[['Mar', 11, 502.18182, 43.00000]],
        schema=StructType([
            StructField("MONTH", StringType()),
            StructField("COUNT", IntegerType()),
            StructField("AVG_TRIPDURATION", DecimalType()),
            StructField("AVG_RIDER_AGE", DecimalType())
        ])
    ).collect()
Copy

Pour finir, appelez la procédure stockée et lisez les tables de sortie. Comparez les tables effectives au contenu de DataFrame.

# test/test_sproc.py
from project.sproc import create_fact_tables
from snowflake.snowpark.types import *

def test_create_fact_tables(session):
    DB = 'CITIBIKE'
    SCHEMA = 'TEST'

    # Set up source table
    tbl = session.create_dataframe(
        data=[
            [1983, '2018-03-01 09:47:00.000 +0000', 551, 30958],
            [1988, '2018-03-01 09:47:01.000 +0000', 242, 19278],
            [1992, '2018-03-01 09:47:01.000 +0000', 768, 18461],
            [1980, '2018-03-01 09:47:03.000 +0000', 690, 15533],
            [1991, '2018-03-01 09:47:03.000 +0000', 490, 32449],
            [1959, '2018-03-01 09:47:04.000 +0000', 457, 29411],
            [1971, '2018-03-01 09:47:08.000 +0000', 279, 28015],
            [1964, '2018-03-01 09:47:09.000 +0000', 546, 15148],
            [1983, '2018-03-01 09:47:11.000 +0000', 358, 16967],
            [1985, '2018-03-01 09:47:12.000 +0000', 848, 20644],
            [1984, '2018-03-01 09:47:14.000 +0000', 295, 16365]
        ],
        schema=['BIRTH_YEAR', 'STARTTIME', 'TRIPDURATION',    'BIKEID'],
    )

    tbl.write.mode('overwrite').save_as_table([DB, SCHEMA, 'TRIPS_TEST'], mode='overwrite')

    # Expected values
    n_rows_expected = 12
    bike_facts_expected = session.create_dataframe(
        data=[
            [30958, 1, 551.0, 40.0],
            [19278, 1, 242.0, 35.0],
            [18461, 1, 768.0, 31.0],
            [15533, 1, 690.0, 43.0],
            [32449, 1, 490.0, 32.0],
            [29411, 1, 457.0, 64.0],
            [28015, 1, 279.0, 52.0],
            [15148, 1, 546.0, 59.0],
            [16967, 1, 358.0, 40.0],
            [20644, 1, 848.0, 38.0],
            [16365, 1, 295.0, 39.0]
        ],
        schema=StructType([
            StructField("BIKEID", IntegerType()),
            StructField("COUNT", IntegerType()),
            StructField("AVG_TRIPDURATION", FloatType()),
            StructField("AVG_RIDER_AGE", FloatType())
        ])
    ).collect()

    month_facts_expected = session.create_dataframe(
        data=[['Mar', 11, 502.18182, 43.00000]],
        schema=StructType([
            StructField("MONTH", StringType()),
            StructField("COUNT", IntegerType()),
            StructField("AVG_TRIPDURATION", DecimalType()),
            StructField("AVG_RIDER_AGE", DecimalType())
        ])
    ).collect()

    # Call sproc, get actual values
    n_rows_actual = create_fact_tables(session, 'TRIPS_TEST')
    bike_facts_actual = session.table([DB, SCHEMA, 'bike_facts']).collect()
    month_facts_actual = session.table([DB, SCHEMA, 'month_facts']).collect()

    # Comparisons
    assert n_rows_expected == n_rows_actual
    assert bike_facts_expected == bike_facts_actual
    assert month_facts_expected ==  month_facts_actual
Copy

Pour exécuter le cas de test, lancez pytest à partir du terminal.

pytest test/test_sproc.py
Copy

Pour exécuter tous les tests du projet, lancez pytest sans aucune autre option.

pytest
Copy

Configurer des tests locaux

À ce stade, vous disposez d’une suite de tests PyTest pour vos transformateurs de DataFrame et votre procédure stockée. Dans chaque cas de test, la fixture Session est utilisée pour la connexion à votre compte Snowflake, l’envoi du SQL de l’API Python Snowpark et la récupération de la réponse.

Vous pouvez également utiliser le cadre de test local pour exécuter les transformations localement sans connexion à Snowflake. Dans les grandes suites de tests, l’exécution des tests peut s’en trouver considérablement accélérée. Cette section montre comment mettre à jour la suite de tests pour utiliser les fonctionnalités de cadre de test local.

  1. Commencez par mettre à jour la fixture PyTest Session. Nous allons ajouter une option de ligne de commande à PyTest pour basculer entre les modes de test local et en direct.

    # test/conftest.py
    
    import pytest
    from project.utils import get_env_var_config
    from snowflake.snowpark.session import Session
    
    def pytest_addoption(parser):
        parser.addoption("--snowflake-session", action="store", default="live")
    
    @pytest.fixture(scope='module')
    def session(request) -> Session:
        if request.config.getoption('--snowflake-session') == 'local':
            return Session.builder.configs('local_testing', True).create()
        else:
            return Session.builder.configs(get_env_var_config()).create()
    
    Copy
  2. Nous devons d’abord corriger cette méthode, car toutes les fonctions intégrées ne sont pas prises en charge par le cadre de test local, par exemple, la fonction monthname() utilisée dans le transformateur calc_month_facts(). Créez un fichier nommé patches.py sous le répertoire de tests. Dans ce fichier, collez le code suivant.

    from snowflake.snowpark.mock.functions import patch
    from snowflake.snowpark.functions import monthname
    from snowflake.snowpark.mock.snowflake_data_type import ColumnEmulator, ColumnType
    from snowflake.snowpark.types import StringType
    import datetime
    import calendar
    
    @patch(monthname)
    def patch_monthname(column: ColumnEmulator) -> ColumnEmulator:
        ret_column = ColumnEmulator(data=[
            calendar.month_abbr[datetime.datetime.strptime(row, '%Y-%m-%d %H:%M:%S.%f %z').month]
            for row in column])
        ret_column.sf_type = ColumnType(StringType(), True)
        return ret_column
    
    Copy

    Le correctif ci-dessus accepte un seul paramètre, column, qui est un objet de type pandas.Series contenant les lignes de données de la colonne. Nous utilisons ensuite une combinaison de méthodes issues des modules Python datetime et calendar pour émuler la fonctionnalité de la colonne monthname() intégrée. Pour finir, nous définissons le type de renvoi sur String, car la méthode intégrée renvoie des chaînes correspondant aux mois (« Jan », « Feb », « Mar », etc.).

  3. Ensuite, importez cette méthode dans les tests du transformateur de DataFrame et de la procédure stockée.

    # test/test_transformers.py
    
    # No changes to the other unit test methods
    
    def test_calc_month_facts(request, session):
        # Add conditional to include the patch if local testing is being used
        if request.config.getoption('--snowflake-session') == 'local':
            from patches import patch_monthname
    
        # No other changes
    
    Copy
  4. Réexécutez pytest avec l’indicateur local.

    pytest test/test_transformers.py --snowflake-session local
    
    Copy
  5. Appliquez maintenant le même correctif au test de la procédure stockée.

    #test/test_sproc.py
    
    def test_create_fact_tables(request, session):
        # Add conditional to include the patch if local testing is being used
        if request.config.getoption('--snowflake-session') == 'local':
            from patches import patch_monthname
    
        # No other changes required
    
    Copy
  6. Réexécutez pytest avec l’indicateur local.

    pytest test/test_sproc.py --snowflake-session local
    
    Copy
  7. Pour conclure, comparons le temps nécessaire à l’exécution de la suite de tests complète en local par rapport à celle effectuée dans le cadre d’une connexion en direct. Nous utiliserons la commande time pour mesurer le temps nécessaire aux deux commandes. Commençons par la connexion en direct.

    time pytest
    
    Copy

    Dans ce cas, l’exécution de la suite de tests a pris 7,89 secondes. (Le temps exact peut varier en fonction de votre ordinateur, de votre connexion réseau et d’autres facteurs.)

    =================================== test session starts ==========================
    platform darwin -- Python 3.9.18, pytest-7.4.3, pluggy-1.3.0
    rootdir: /Users/jfreeberg/Desktop/snowpark-testing-tutorial
    configfile: pytest.ini
    collected 4 items
    
    test/test_sproc.py .                                                             [ 25%]
    test/test_transformers.py ...                                                    [100%]
    
    =================================== 4 passed in 6.86s =================================
    pytest  1.63s user 1.86s system 44% cpu 7.893 total
    

    Essayons maintenant avec le cadre de test local :

    time pytest --snowflake-session local
    
    Copy

    Avec le cadre de test local, l’exécution de la suite de tests n’a pris qu’1 seconde !

    ================================== test session starts ================================
    platform darwin -- Python 3.9.18, pytest-7.4.3, pluggy-1.3.0
    rootdir: /Users/jfreeberg/Desktop/snowpark-testing-tutorial
    configfile: pytest.ini
    collected 4 items
    
    test/test_sproc.py .                                                             [ 25%]
    test/test_transformers.py ...                                                    [100%]
    
    =================================== 4 passed in 0.10s ==================================
    pytest --snowflake-session local  1.37s user 1.70s system 281% cpu 1.093 total
    

En savoir plus

Vous avez terminé ! C’est bien fait.

Dans ce didacticiel, vous avez pu assister à une présentation complète de la façon dont vous pouvez tester votre code Python Snowpark. Vous avez effectué les opérations suivantes :