Framework de test local

Cette rubrique explique comment tester votre code localement lorsque vous utilisez la bibliothèque Snowpark Python.

Dans ce chapitre :

Le cadre de test local Python Snowpark est un émulateur qui vous permet de créer et d’agir sur des DataFrames Python Snowpark localement sans vous connecter à un compte Snowflake. Vous pouvez utiliser le cadre de test local pour tester vos opérations DataFrame sur votre poste de développement ou dans un pipeline CI (intégration continue), avant de déployer les changements de code sur votre compte. L’API est la même, de sorte que vous pouvez exécuter vos tests localement ou sur un compte Snowflake, sans modifier le code.

Conditions préalables

Pour utiliser le cadre de test local :

  • Vous devez utiliser la version 1.18.0 ou une version supérieure de la bibliothèque Python Snowpark avec la dépendance localtest facultative.

  • Les versions de Python prises en charge sont les suivantes :

    • 3,9

    • 3,10

    • 3,11

    • 3,12

Installer la bibliothèque Snowpark Python

  • Pour installer la bibliothèque avec la dépendance facultative, exécutez la commande suivante :

    pip install "snowflake-snowpark-python[localtest]"
    
    Copy

Créer une session et activer les tests locaux

  1. Créez une Session Snowpark et définissez la configuration de test local sur True :

    from snowflake.snowpark import Session
    
    session = Session.builder.config('local_testing', True).create()
    
    Copy
  2. Utilisez la session pour créer et opérer sur des DataFrames :

    df = session.create_dataframe([[1,2],[3,4]],['a','b'])
    df.with_column('c', df['a']+df['b']).show()
    
    Copy

Chargement des données

Vous pouvez créer des DataFrames Snowpark à partir de primitives Python, de fichiers et de DataFrames pandas. Cela s’avère utile pour spécifier l’entrée et la sortie prévues des cas de test. En procédant de cette manière, les données se trouvent dans le contrôle source, ce qui facilite la synchronisation des données de test sur les cas de test.

Charger les données CSV

  • Pour charger des fichiers CSV dans un DataFrame Snowpark, commencez par appeler Session.file.put() pour charger le fichier dans la zone de préparation en mémoire, puis utilisez Session.read() pour lire le contenu.

Exemple

Supposons qu’il existe un fichier, data.csv, avec le contenu suivant :

col1,col2,col3,col4
1,a,true,1.23
2,b,false,4.56
Copy

Vous pouvez utiliser le code suivant pour charger data.csv dans un DataFrame Snowpark. Vous devez d’abord placer le fichier sur une zone de préparation ; si vous ne le faites pas, vous recevrez une erreur « fichier introuvable ».

from snowflake.snowpark.types import StructType, StructField, IntegerType, BooleanType, StringType, DoubleType


# Put file onto stage
session.file.put("data.csv", "@mystage", auto_compress=False)
schema = StructType(
    [
        StructField("col1", IntegerType()),
        StructField("col2", StringType()),
        StructField("col3", BooleanType()),
        StructField("col4", DoubleType()),
    ]
)

# with option SKIP_HEADER set to 1, the header will be skipped when the csv file is loaded
dataframe = session.read.schema(schema).option("SKIP_HEADER", 1).csv("@mystage/data.csv")
dataframe.show()
Copy

Résultat attendu :

-------------------------------------
|"COL1"  |"COL2"  |"COL3"  |"COL4"  |
-------------------------------------
|1       |a       |True    |1.23    |
|2       |b       |False   |4.56    |
-------------------------------------

Charger les données pandas

  • Pour créer un DataFrame Snowpark Python à partir d’un DataFrame pandas, appelez la méthode create_dataframe et transmettez les données en tant que DataFrame pandas.

Exemple

import pandas as pd

pandas_df = pd.DataFrame(
    data={
        "col1": pd.Series(["value1", "value2"]),
        "col2": pd.Series([1.23, 4.56]),
        "col3": pd.Series([123, 456]),
        "col4": pd.Series([True, False]),
    }
)

dataframe = session.create_dataframe(data=pandas_df)
dataframe.show()
Copy

Résultat attendu :

-------------------------------------
|"col1"  |"col2"  |"col3"  |"col4"  |
-------------------------------------
|value1  |1.23    |123     |True    |
|value2  |4.56    |456     |False   |
-------------------------------------
  • Pour convertir un DataFrame Snowpark Python à un DataFrame pandas, appelez la méthode to_pandas sur le DataFrame.

Exemple

from snowflake.snowpark.types import StructType, StructField, StringType, DoubleType, LongType, BooleanType

dataframe = session.create_dataframe(
    data=[
        ["value1", 1.23, 123, True],
        ["value2", 4.56, 456, False],
    ],
    schema=StructType([
        StructField("col1", StringType()),
        StructField("col2", DoubleType()),
        StructField("col3", LongType()),
        StructField("col4", BooleanType()),
    ])
)

pandas_dataframe = dataframe.to_pandas()
print(pandas_dataframe.to_string())
Copy

Résultat attendu :

    COL1  COL2  COL3   COL4
0  value1  1.23   123   True
1  value2  4.56   456  False

Créer une fixture PyTest pour une session

Les fixtures PyTest sont des fonctions qui sont exécutées avant un test (ou un module de tests), généralement pour fournir des données ou des connexions aux tests. Dans cette procédure, créez une fixture qui renvoie un objet Session Snowpark.

  1. Si vous n’avez pas déjà un répertoire test, créez-en un.

  2. Dans le répertoire test, créez un fichier nommé conftest.py avec le contenu suivant, où connection_parameters est un dictionnaire, avec les identifiants de connexion de votre compte Snowflake :

    # test/conftest.py
    import pytest
    from snowflake.snowpark.session import Session
    
    def pytest_addoption(parser):
        parser.addoption("--snowflake-session", action="store", default="live")
    
    @pytest.fixture(scope='module')
    def session(request) -> Session:
        if request.config.getoption('--snowflake-session') == 'local':
            return Session.builder.config('local_testing', True).create()
        else:
            return Session.builder.configs(CONNECTION_PARAMETERS).create()
    
    Copy

Pour plus d’informations sur le format de dictionnaire, voir Création d’une session.

L’appel de pytest_addoption ajoute une option de ligne de commande nommée snowflake-session à la commande pytest. La fixture Session vérifie cette option de ligne de commande et crée une Session locale ou en direct, suivant sa valeur. Cela vous permet de basculer facilement entre les modes local et en direct pour les tests, comme indiqué dans les exemples de ligne de commande suivants :

# Using local mode:
pytest --snowflake-session local

# Using live mode
pytest
Copy

Opérations SQL

Session.sql(...) n’est pas pris en charge par le cadre de test local. Utilisez les APIs DataFrame de Snowpark, autant que possible, et, dans les cas où vous devez utiliser Session.sql(...), vous pouvez simuler la valeur de renvoi tabulaire via le unittest.mock.patch de Python pour corriger la réponse attendue d’un appel Session.sql() donné.

Dans l’exemple ci-dessous, mock_sql() mappe le texte de requête SQL vers la réponse du DataFrame souhaitée. L’instruction conditionnelle vérifie si la session en cours utilise un test local et, si c’est le cas, applique le correctif à la méthode Session.sql().

from unittest import mock
from functools import partial

def test_something(pytestconfig, session):

    def mock_sql(session, sql_string):  # patch for SQL operations
        if sql_string == "select 1,2,3":
            return session.create_dataframe([[1,2,3]])
        else:
            raise RuntimeError(f"Unexpected query execution: {sql_string}")

    if pytestconfig.getoption('--snowflake-session') == 'local':
        with mock.patch.object(session, 'sql', wraps=partial(mock_sql, session)): # apply patch for SQL operations
            assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
    else:
        assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
Copy

Lorsque le test local est activé, toutes les tables créées par DataFrame.save_as_table() sont enregistrées sous forme de tables temporaires dans la mémoire et peuvent être récupérées via Session.table(). Vous pouvez utiliser les opérations DataFrame prises en charge sur la table, comme d’habitude.

Correction des fonctions intégrées

Toutes les fonctions intégrées sous snowflake.snowpark.functions ne sont pas prises en charge par le cadre de test local. Si vous utilisez une fonction non prise en charge, vous devez utiliser le décorateur @patch de snowflake.snowpark.mock pour créer un correctif.

Pour définir et mettre en œuvre la fonction corrigée, il faut que la signature (liste de paramètres) soit alignée sur les paramètres de la fonction intégrée. Le cadre de test local transmet les paramètres à la fonction corrigée en appliquant les règles suivantes :

  • Pour les paramètres de type ColumnOrName dans la signature des fonctions intégrées, ColumnEmulator est transmis comme paramètre des fonctions corrigées. ColumnEmulator est similaire à un objet pandas.Series contenant les données de colonne.

  • Pour les paramètres de type LiteralType dans la signature des fonctions intégrées, la valeur littérale est transmise comme paramètre des fonctions corrigées.

  • Sinon, la valeur brute est transmise comme paramètre des fonctions corrigées.

En ce qui concerne le type de renvoi des fonctions corrigées, le renvoi d’une instance de ColumnEmulator est attendu en correspondance avec le type de renvoi de Column des fonctions intégrées.

Par exemple, la fonction intégrée to_timestamp() pourrait être corrigée comme suit :

import datetime
from snowflake.snowpark.mock import patch, ColumnEmulator, ColumnType
from snowflake.snowpark.functions import to_timestamp
from snowflake.snowpark.types import TimestampType

@patch(to_timestamp)
def mock_to_timestamp(column: ColumnEmulator, format = None) -> ColumnEmulator:
    ret_column = ColumnEmulator(data=[datetime.datetime.strptime(row, '%Y-%m-%dT%H:%M:%S%z') for row in column])
    ret_column.sf_type = ColumnType(TimestampType(), True)
    return ret_column
Copy

Omission des cas de test

Si votre suite de tests PyTest contient un cas de test qui n’est pas bien pris en charge par les tests locaux, vous pouvez ignorer ces cas en utilisant le décorateur mark.skipif de PyTest. L’exemple ci-dessous suppose que vous ayez configuré votre session et vos paramètres comme décrit précédemment. La condition vérifie si le local_testing_mode est réglé sur local ; si c’est le cas, le cas de test est ignoré avec un message explicatif.

import pytest

@pytest.mark.skipif(
    condition="config.getvalue('local_testing_mode') == 'local'",
reason="Test case disabled for local testing"
)
def test_case(session):
    ...
Copy

Enregistrement d’UDFs et de procédures stockées

Vous pouvez créer et appeler des fonctions définies par l’utilisateur (UDFs) et des procédures stockées dans le cadre de tests locaux. Pour créer les objets, vous pouvez utiliser les options de syntaxe suivantes :

Syntaxe

UDF

Procédure stockée

Décorateurs

@udf

@sproc

Méthodes d’enregistrement

udf.register()

sproc.register()

Méthodes Register-from-file

udf.register_from_file()

sproc.register_from_file()

Exemple

L’exemple de code suivant crée une UDF et une procédure stockée à l’aide des décorateurs, puis les appelle par leur nom.

from snowflake.snowpark.session import Session
from snowflake.snowpark.dataframe import col, DataFrame
from snowflake.snowpark.functions import udf, sproc, call_udf
from snowflake.snowpark.types import IntegerType, StringType

# Create local session
session = Session.builder.config('local_testing', True).create()

# Create local table
table = 'example'
session.create_dataframe([[1,2],[3,4]],['a','b']).write.save_as_table(table)

# Register a UDF, which is called from the stored procedure
@udf(name='example_udf', return_type=IntegerType(), input_types=[IntegerType(), IntegerType()])
def example_udf(a, b):
    return a + b

# Register stored procedure
@sproc(name='example_proc', return_type=IntegerType(), input_types=[StringType()])
def example_proc(session, table_name):
    return session.table(table_name)\
        .with_column('c', call_udf('example_udf', col('a'), col('b')))\
        .count()

# Call the stored procedure by name
output = session.call('example_proc', table)

print(output)
Copy

Limitations

La liste suivante contient les limitations et les écarts de comportement connus dans le cadre de tests locaux. Snowflake n’a actuellement aucun projet de résolution de ces problèmes.

  • Les chaînes et opérations SQL brutes nécessitant l’analyse des chaînes SQL, comme session.sql et DataFrame.filter("col1 > 12"), ne sont pas prises en charge.

  • Les opérations asynchrones ne sont pas prises en charge.

  • Les objets de la base de données tels que les tables, les procédures stockées et les UDFs ne sont pas conservés au-delà du niveau de la session et toutes les opérations sont effectuées en mémoire. Par exemple, les procédures stockées permanentes enregistrées dans une session fictive ne sont pas visibles par les autres sessions fictives.

  • Les fonctions liées au classement des chaînes, telles que Column.collate, ne sont pas prises en charge.

  • Les types de données Variant, Array et Object sont pris en charge uniquement avec le codage et le décodage JSON standards. Des expressions telles que [1,2,,3,] sont considérées comme des valeurs JSON valides dans Snowflake, mais pas dans les tests locaux, dans lesquels les fonctionnalités JSON intégrées de Python sont utilisées. Vous pouvez spécifier les variables snowflake.snowpark.mock.CUSTOM_JSON_ENCODER et snowflake.snowpark.mock.CUSTOM_JSON_DECODER au niveau du module pour remplacer les paramètres par défaut.

  • Seul un sous-ensemble des fonctions de Snowflake (y compris les fonctions de fenêtre) est mis en œuvre. Pour savoir comment injecter votre propre définition de fonction, voir Correction des fonctions intégrées.

    • La correction des fonctions liées au rang n’est actuellement pas prise en charge.

  • Modèles de format SQL ne sont pas pris en charge. Par exemple, l’implémentation fictive de to_decimal ne gère pas le paramètre facultatif format.

  • La bibliothèque Python de Snowpark n’a pas d’API intégrée Python pour créer ou supprimer des zones de préparation, de sorte que le cadre de tests local suppose que chaque zone de préparation entrante a déjà été créée.

  • L’implémentation actuelle d’UDFs et de procédures stockées n’effectue aucune validation de paquet. Tous les paquets référencés dans votre code doivent être installés avant l’exécution du programme.

  • Les balises d’interrogation ne sont pas prises en charge.

  • L’historique des requêtes n’est pas pris en charge.

  • La lignée n’est pas prise en charge.

  • Lors de l’enregistrement d’une UDF ou d’une procédure stockée, les paramètres facultatifs tels que parallel, execute_as, statement_params, source_code_display, external_access_integrations, secrets, et comment sont ignorés.

  • Pour Table.sample, SYSTEM ou BLOCK, l’échantillonnage est le même que pour ROW.

  • Snowflake ne prend pas officiellement en charge l’exécution du cadre de test local à l’intérieur des procédures stockées. Les sessions en mode de test local à l’intérieur des procédures stockées peuvent rencontrer ou déclencher des erreurs inattendues.

Fonctionnalités non prises en charge

Vous trouverez ci-dessous la liste des fonctions qui ne sont actuellement pas implémentées dans le cadre de test local. Snowflake travaille activement à la résolution de ces problèmes.

En règle générale, toute référence à ces fonctionnalités devrait susciter un NotImplementedError :

  • UDTFs (fonctions de table définies par l’utilisateur)

  • UDAFs (fonctions agrégées définies par l’utilisateur)

  • UDFs et UDTFs vectorisées

  • Fonctions de table intégrées

  • Procédures stockées de table

  • Types de données Geometry, Geography, et Vector.

  • Expressions d’intervalles

  • Lire des formats de fichiers autres que JSON et CSV

    • Pour un format de fichier pris en charge, toutes les options de lecture ne sont pas prises en charge. Par exemple, infer_schema n’est pas pris en charge pour le format CSV.

Pour toutes les fonctions qui ne sont pas annoncées ici comme non prises en charge ou comme une limitation connue, consultez la dernière liste des demandes de fonctions pour des tests locaux, ou créez une demande de fonction dans le référentiel snowpark-python GitHub.

Problèmes connus

Vous trouverez ci-dessous une liste des problèmes connus ou des écarts de comportement qui existent dans le cadre des tests locaux. Snowflake planifie activement la résolution de ces problèmes.

  • L’utilisation de fonctions de fenêtre à l’intérieur de DataFrame.groupby ou d’autres opérations d’agrégation n’est pas prise en charge.

    # Selecting window function expressions is supported
    df.select("key", "value", sum_("value").over(), avg("value").over())
    
    # Aggregating window function expressions is NOT supported
    df.group_by("key").agg([sum_("value"), sum_(sum_("value")).over(window) - sum_("value")])
    
    Copy
  • La sélection de colonnes portant le même nom renvoie une seule colonne. Pour contourner le problème, utilisez Column.alias pour renommer les colonnes pour qu’elles aient des noms distincts.

    df.select(lit(1), lit(1)).show() # col("a"), col("a")
    #---------
    #|"'1'"  |
    #---------
    #|1      |
    #|...    |
    #---------
    
    # Workaround: Column.alias
    DataFrame.select(lit(1).alias("col1_1"), lit(1).alias("col1_2"))
    # "col1_1", "col1_2"
    
    Copy
  • Pour Table.merge et Table.update, les paramètres de session ERROR_ON_NONDETERMINISTIC_UPDATE et ERROR_ON_NONDETERMINISTIC_MERGE doivent être définis sur False. Cela signifie que pour les jointures multiples, l’une des lignes correspondantes est mise à jour.

  • Les noms de zone de préparation entièrement qualifiés dans les opérations de fichiers GET et PUT ne sont pas pris en charge. Les noms des bases de données et des schémas sont traités comme faisant partie du nom de la zone de préparation.

  • L’implémentation de mock_to_char ne prend en charge que les horodatages dans un format qui comporte des séparateurs entre les différentes parties temporelles.

  • DataFrame.pivot dispose d’un paramètre appelé values qui permet de limiter un pivot à des valeurs spécifiques. Seules des valeurs définies statistiquement peuvent être utilisées pour l’instant. Les valeurs fournies à l’aide d’une sous-requête entraînent une erreur.

  • La création d’un DataFrame à partir d’un DataFrame pandas contenant un horodatage avec des informations sur le fuseau horaire n’est pas prise en charge.

Pour tout problème non mentionné dans cette liste, consultez la dernière liste des problèmes ouverts, ou créez un rapport de bogue dans le référentiel snowpark-python GitHub.