Écriture de tests pour Python Snowpark

Cette rubrique explique comment tester votre code Snowpark tout en étant connecté à Snowflake. Vous pouvez utiliser des utilitaires de test standards, comme PyTest, pour tester vos procédures stockées, vos transformations DataFrame et vos UDFs Python Snowpark.

Dans ce chapitre :

Des tests approfondis permettent d’éviter les modifications entraînant des ruptures involontaires. Les tests unitaires permettent de vérifier qu’une section du code fonctionne comme prévu. Les tests d’intégration permettent de s’assurer que les composants fonctionnent correctement ensemble pour un cas d’utilisation de bout en bout.

Les exemples présentés dans ce document utilisent PyTest, l’un des cadres de test les plus populaires pour Python. Pour des conseils supplémentaires et les meilleures pratiques, voir la documentation PyTest.

Vous pouvez également utiliser le cadre de test local Python Snowpark pour créer et utiliser des DataFrames Python Snowpark localement sans vous connecter à un compte Snowflake. Pour plus d’informations, voir Cadre de test local.

Configuration des tests

Installez PyTest dans votre projet, en exécutant pip install pytest ou conda install pytest. Vous pouvez également l’ajouter à votre fichier d’environnement requirements.txt ou conda.

Créez un répertoire test à côté de votre répertoire de code source et ajoutez-y vos tests unitaires et d’intégration. Pour un exemple, voir le modèle de projet Python Snowpark.

Création d’une fixture PyTest pour la session Snowpark

Les fixtures PyTest sont des fonctions qui sont exécutées avant un test (ou un module de tests) pour fournir des données ou des connexions aux tests. Dans ce scénario, créez une fixture PyTest qui renvoie un objet Session Snowpark.

  1. Créez un répertoire test, si vous n’en avez pas déjà un.

  2. Créez un conftest.py sous test avec le contenu suivant, où connection_parameters est un dictionnaire, avec les identifiants de connexion de votre compte Snowflake. Pour plus d’informations sur le format de dictionnaire, voir Création d’une session.

  3. Créez la fixture Session sous forme de fixture à l’échelle du module plutôt qu’à l’échelle du fichier, afin d’éviter la création de plusieurs sessions et des problèmes en raison d’objets de session conflictuels.

from snowflake.snowpark.session import Session

@pytest.fixture(scope='module')
def session(request) -> Session:
connection_parameters = {}
return Session.builder.configs(...).create()
Copy

Tests unitaires pour UDFs

Vous pouvez tester votre logique UDF Python en testant le gestionnaire UDF sous forme de méthode Python générique.

  1. Créez un fichier sous votre répertoire test pour les tests unitaires UDF. Par exemple, nommez le fichier test_functions.py.

  2. Importez les méthodes Python à tester.

  3. Pour chaque scénario de test, créez une méthode Python nommée test_<scénario_à_tester>.

Par exemple, voici un gestionnaire UDF Python :

def fahrenheit_to_celsius(temp_f: float) -> float:
    """
    Converts fahrenheit to celsius
    """
    return (float(temp_f) - 32) * (5/9)
Copy

Vous pouvez importer cette méthode dans le fichier de test (test/test_functions.py) et effectuer le test sous forme de méthode Python générique.

import farenheit_to_celsius

def test_farenheit_to_celsius():
    expected = 0.0
    actual = farenheit_to_celsius(32)
    assert expected == actual
Copy

Tests unitaires pour les transformations de DataFrame

L’ajout de tests unitaires pour vos transformations de DataFrame permet de se prémunir contre les bogues et les régressions imprévus. Pour faciliter les tests de votre logique DataFrame, encapsulez les transformations dans une méthode Python qui prend en entrée les DataFrames à transformer et renvoie les DataFrames transformés.

Dans l’exemple ci-dessous, mf_df_transformer contient la logique de transformation. Elle peut être importée dans d’autres modules du projet Python et facilement testée.

from snowflake.snowpark.dataframe import DataFrame, col

def my_df_tranformer(df: DataFrame) -> DataFrame:
    return df \
        .with_column('c', df['a']+df['b']) \
        .filter(col('c') > 3)
Copy

Pour tester cette transformation, procédez comme suit :

  1. Créez un fichier pour les tests DataFrame, test_transformers.py, sous le répertoire test (test/test_transformers.py).

  2. Créez une méthode de test pour le transformateur à tester : test_my_df_transformer(session). Le paramètre session fait ici référence à la fixture de session créée dans la section précédente.

  3. À l’aide de la fixture de session, créez les DataFrames d’entrée et ceux de sortie prévus dans la méthode de test.

  4. Transmettez le DataFrame d’entrée au transformateur et comparez le DataFrame prévu au DataFrame effectivement renvoyé par le transformateur.

# test/test_transformers.py

import my_df_transformer

def test_my_df_transformer(session):
input_df = session.create_dataframe([[1,2],[3,4]], ['a', 'b'])
expected_df = session.create_dataframe([3,4,7], ['a','b','c'])
actual_df = my_df_transformer(input_df)
assert input_df.collect() == actual_df.collect()
Copy

Tests d’intégration pour les procédures stockées

Pour tester vos gestionnaires de procédure stockée, utilisez la fixture de session pour appeler le gestionnaire de procédure stockée. Si votre procédure stockée lit des tables, comme dans un pipeline ETL, vous pouvez créer ces tables avant d’appeler le gestionnaire de procédure stockée, comme le montre l’exemple ci-dessous. Ce modèle garantit le suivi de vos données d’entrée dans le contrôle source et le fait qu’elles ne changent pas de manière imprévue entre les exécutions des tests.

from project import my_sproc_handler  # import stored proc handler

def test_my_sproc_handler(session: Session):

    # Create input table
    input_tbl = session.create_dataframe(
        data=[...],
        schema=[...],
    )

    input_tbl.write.mode('overwrite').save_as_table(['DB', 'SCHEMA', 'INPUT_TBL'], mode='overwrite')

    # Create expected output dataframe
    expected_df = session.create_dataframe(
        data=[...],
        schema=[...],
    ).collect()

    # Call the stored procedure
    my_sproc_handler()

    # Get actual table
    actual_tbl = session.table(['DB', 'SCHEMA', 'OUTPUT_TBL']).collect()

    # Clean up tables
    session.table(['DB', 'SCHEMA', 'OUTPUT_TBL']).delete()
    session.table(['DB', 'SCHEMA', 'INPUT_TBL']).delete()

    # Compare the actual and expected tables
    assert expected_df == actual_tbl
Copy