Cadre de test local

Cette rubrique explique comment tester votre code localement lorsque vous utilisez la bibliothèque Snowpark.

Dans ce chapitre :

Le cadre de test local Python Snowpark vous permet de créer et d’agir sur des DataFrames Python Snowpark localement sans vous connecter à un compte Snowflake. Vous pouvez utiliser le cadre de test local pour tester vos opérations DataFrame localement, sur votre poste de développement ou dans un pipeline CI (intégration continue), avant de déployer les changements de code sur votre compte. L’API est la même, de sorte que vous pouvez exécuter vos tests localement ou sur un compte Snowflake, sans modifier le code.

Conditions préalables

Pour utiliser le cadre de test local :

  • Vous devez utiliser la version 1.11.1 ou une version supérieure de la bibliothèque Python Snowpark avec la dépendance pandas facultative. Effectuez l’installation en exécutant pip install "snowflake-snowpark-python[pandas]"

  • Les versions de Python prises en charge sont les suivantes :

    • 3,8

    • 3,9

    • 3,10

    • 3,11

Création d’une session et activation de tests locaux

Pour commencer, créez une Session Snowpark et définissez la configuration de test local sur True.

from snowflake.snowpark import Session

session = Session.builder.config('local_testing', True).create()
Copy

Une fois la session créée, vous pouvez l’utiliser pour créer et agir sur des DataFrames.

df = session.create_dataframe([[1,2],[3,4]],['a','b'])
df.with_column('c', df['a']+df['b']).show()
Copy

Chargement des données

Vous pouvez créer des DataFrames Snowpark à partir de primitives Python, de fichiers et de DataFrames Pandas. Cela s’avère utile pour spécifier l’entrée et la sortie prévues des cas de test. En procédant de cette manière, les données se trouvent dans le contrôle source, ce qui facilite la synchronisation des données de test sur les cas de test.

Chargement des données CSV

Vous pouvez charger des fichiers CSV dans un DataFrame Snowpark en commençant par appeler Session.file.put() pour charger le fichier dans la zone de préparation en mémoire, puis en utilisant Session.read() pour lire le contenu. Supposons qu’il existe un fichier, data.csv, dont le contenu est le suivant :

col1,col2,col3,col4
1,a,true,1.23
2,b,false,4.56
Copy

Vous pouvez charger data.csv dans un DataFrame Snowpark en utilisant le code suivant. Vous devez d’abord placer le fichier dans une zone de préparation. Sinon, vous obtiendrez un message d’erreur indiquant que le fichier est introuvable.

from snowflake.snowpark.types import StructType, StructField, IntegerType, BooleanType, StringType, DoubleType


# Put file onto stage
session.file.put("data.csv", "@mystage", auto_compress=False)
schema = StructType(
    [
        StructField("col1", IntegerType()),
        StructField("col2", StringType()),
        StructField("col3", BooleanType()),
        StructField("col4", DoubleType()),
    ]
)

# with option SKIP_HEADER set to 1, the header will be skipped when the csv file is loaded
dataframe = session.read.schema(schema).option("SKIP_HEADER", 1).csv("@mystage/data.csv")
dataframe.show()
Copy

La sortie de dataframe.show() sera :

-------------------------------------
|"COL1"  |"COL2"  |"COL3"  |"COL4"  |
-------------------------------------
|1       |a       |True    |1.23    |
|2       |b       |False   |4.56    |
-------------------------------------

Chargement des données Pandas

Vous pouvez créer un DataFrame Python Snowpark à partir d’un DataFrame Pandas en appelant la méthode create_dataframe et en transmettant les données sous forme de DataFrame Pandas.

import pandas as pd

pandas_df = pd.DataFrame(
    data={
        "col1": pd.Series(["value1", "value2"]),
        "col2": pd.Series([1.23, 4.56]),
        "col3": pd.Series([123, 456]),
        "col4": pd.Series([True, False]),
    }
)

dataframe = session.create_dataframe(data=pandas_df)
dataframe.show()
Copy

dataframe.show() produit le résultat suivant :

-------------------------------------
|"col1"  |"col2"  |"col3"  |"col4"  |
-------------------------------------
|value1  |1.23    |123     |True    |
|value2  |4.56    |456     |False   |
-------------------------------------

Un DataFrame Python Snowpark peut également être converti en DataFrame Pandas via l’appel de la méthode to_pandas sur le DataFrame.

from snowflake.snowpark.types import StructType, StructField, StringType, DoubleType, LongType, BooleanType

dataframe = session.create_dataframe(
    data=[
        ["value1", 1.23, 123, True],
        ["value2", 4.56, 456, False],
    ],
    schema=StructType([
        StructField("col1", StringType()),
        StructField("col2", DoubleType()),
        StructField("col3", LongType()),
        StructField("col4", BooleanType()),
    ])
)

pandas_dataframe = dataframe.to_pandas()
print(pandas_dataframe.to_string())
Copy

L’appel de print(pandas_dataframe.to_string()) produit le résultat suivant :

    COL1  COL2  COL3   COL4
0  value1  1.23   123   True
1  value2  4.56   456  False

Création d’une fixture PyTest pour la session

Les fixtures PyTest sont des fonctions qui sont exécutées avant un test (ou un module de tests), généralement pour fournir des données ou des connexions aux tests. Dans ce cas, créez une fixture qui renvoie un objet Session Snowpark. Commencez par créer un répertoire test, si vous n’en avez pas déjà un. Ensuite, dans le répertoire test, créez un fichier conftest.py avec le contenu suivant, où connection_parameters est un dictionnaire, avec les identifiants de connexion de votre compte Snowflake. Pour plus d’informations sur le format de dictionnaire, voir Création d’une session.

# test/conftest.py
import pytest
from snowflake.snowpark.session import Session

def pytest_addoption(parser):
    parser.addoption("--snowflake-session", action="store", default="live")

@pytest.fixture(scope='module')
def session(request) -> Session:
    if request.config.getoption('--snowflake-session') == 'local':
        return Session.builder.config('local_testing', True).create()
    else:
        return Session.builder.configs(CONNECTION_PARAMETERS).create()
Copy

L’appel de pytest_addoption ajoute une option de ligne de commande nommée snowflake-session à la commande pytest. La fixture Session vérifie cette option de ligne de commande et crée une Session locale ou en direct, suivant sa valeur. Cela vous permet de basculer facilement entre les modes local et en direct pour les tests.

# Using local mode:
pytest --snowflake-session local

# Using live mode
pytest
Copy

Opérations SQL

Session.sql(...) n’est pas pris en charge par le cadre de test local. Utilisez les APIs DataFrame de Snowpark, autant que possible, et, dans les cas où vous devez utiliser Session.sql(...), vous pouvez simuler la valeur de renvoi tabulaire via le unittest.mock.patch de Python pour corriger la réponse attendue d’un appel Session.sql() donné.

Dans l’exemple ci-dessous, mock_sql() mappe le texte de requête SQL vers la réponse DataFrame souhaitée. L’instruction conditionnelle suivante vérifie si la session en cours utilise un test local et, si c’est le cas, applique le correctif à la méthode Session.sql().

from unittest import mock
from functools import partial

def test_something(pytestconfig, session):

    def mock_sql(session, sql_string):  # patch for SQL operations
        if sql_string == "select 1,2,3":
            return session.create_dataframe([[1,2,3]])
        else:
            raise RuntimeError(f"Unexpected query execution: {sql_string}")

    if pytestconfig.getoption('--snowflake-session') == 'local':
        with mock.patch.object(session, 'sql', wraps=partial(mock_sql, session)): # apply patch for SQL operations
            assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
    else:
        assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
Copy

Lorsque le test local est activé, toutes les tables créées par DataFrame.save_as_table() sont enregistrées sous forme de tables temporaires dans la mémoire et peuvent être récupérées via Session.table(). Vous pouvez utiliser les opérations DataFrame prises en charge sur la table, comme d’habitude.

Correction des fonctions intégrées

Toutes les fonctions intégrées sous snowflake.snowpark.functions ne sont pas prises en charge par le cadre de test local. Si vous utilisez une fonction non prise en charge, vous devez utiliser le décorateur @patch de snowflake.snowpark.mock pour créer un correctif.

Pour définir et mettre en œuvre la fonction corrigée, il faut que la signature (liste de paramètres) soit alignée sur les paramètres de la fonction intégrée. Le cadre de test local transmet les paramètres à la fonction corrigée en appliquant les règles suivantes :

  • Pour les paramètres de type ColumnOrName dans la signature des fonctions intégrées, ColumnEmulator est transmis comme paramètre des fonctions corrigées. ColumnEmulator est similaire à un objet pandas.Series contenant les données de colonne.

  • Pour les paramètres de type LiteralType dans la signature des fonctions intégrées, la valeur littérale est transmise comme paramètre des fonctions corrigées.

  • Sinon, la valeur brute est transmise comme paramètre des fonctions corrigées.

En ce qui concerne le type de renvoi des fonctions corrigées, le renvoi d’une instance de ColumnEmulator est attendu en correspondance avec le type de renvoi de Column des fonctions intégrées.

Par exemple, la fonction intégrée to_timestamp() pourrait être corrigée comme suit :

import datetime
from snowflake.snowpark.mock import patch, ColumnEmulator, ColumnType
from snowflake.snowpark.functions import to_timestamp
from snowflake.snowpark.types import TimestampType

@patch(to_timestamp)
def mock_to_timestamp(column: ColumnEmulator, format = None) -> ColumnEmulator:
    ret_column = ColumnEmulator(data=[datetime.datetime.strptime(row, '%Y-%m-%dT%H:%M:%S%z') for row in column])
    ret_column.sf_type = ColumnType(TimestampType(), True)
    return ret_column
Copy

Omission des cas de test

Si votre suite de tests PyTest contient un cas de test qui n’est pas bien pris en charge par les tests locaux, vous pouvez ignorer ces cas en utilisant le décorateur mark.skipif de PyTest. L’exemple ci-dessous suppose que vous ayez configuré votre session et vos paramètres comme décrit précédemment. La condition vérifie si le local_testing_mode est défini sur local, et, si c’est le cas, le cas de test est ignoré avec un message expliquant pourquoi il a été ignoré.

import pytest

@pytest.mark.skipif(
    condition="config.getvalue('local_testing_mode') == 'local'",
reason="Test case disabled for local testing"
)
def test_case(session):
    ...
Copy

Limites

Les fonctionnalités suivantes ne sont pas prises en charge :

  • Chaînes et opérations SQL brutes nécessitant l’analyse des chaînes SQL. Par exemple, session.sql et DataFrame.filter("col1 > 12") ne sont pas pris en charge.

  • UDFs, UDTFs et procédures stockées.

  • Fonctions de table.

  • AsyncJobs.

  • Opérations de session telles que la modification des entrepôts, des schémas et d’autres propriétés de session.

  • Types de données Geometry et Geography.

  • Agrégation des fonctions de fenêtre.

    # Selecting window function expressions is supported
    df.select("key", "value", sum_("value").over(), avg("value").over())
    
    # Aggregating window function expressions is NOT supported
    df.group_by("key").agg([sum_("value"), sum_(sum_("value")).over(window) - sum_("value")])
    
    Copy

Les autres limites sont les suivantes :

  • Les types de données Variant, Array et Object sont pris en charge uniquement avec le codage et le décodage JSON standards. Des expressions telles que [1,2,,3,] sont considérées comme des valeurs JSON valides dans Snowflake, mais pas dans les tests locaux, dans lesquels les fonctionnalités JSON intégrées de Python sont utilisées. Vous pouvez spécifier les variables snowflake.snowpark.mock.CUSTOM_JSON_ENCODER et snowflake.snowpark.mock.CUSTOM_JSON_DECODER au niveau du module pour remplacer les paramètres par défaut.

  • Seul un sous-ensemble des fonctions de Snowflake (y compris les fonctions de fenêtre) est mis en œuvre. Voir Correction des fonctions intégrées pour savoir comment injecter votre propre définition de fonction.

  • La correction de fonctions associées au classement n’est actuellement pas prise en charge.

  • La sélection de colonnes portant le même nom renvoie une seule colonne. Pour contourner le problème, renommez les colonnes pour qu’elles aient des noms distincts via Column.alias.

    df.select(lit(1), lit(1)).show() # col("a"), col("a")
    #---------
    #|"'1'"  |
    #---------
    #|1      |
    #|...    |
    #---------
    
    # Workaround: Column.alias
    DataFrame.select(lit(1).alias("col1_1"), lit(1).alias("col1_2"))
    # "col1_1", "col1_2"
    
    Copy
  • La conversion de type explicite via Column.cast est limitée, en ce sens que les chaînes de format ne sont pas prises en charge pour les entrées : to_decimal, to_number, to_numeric, to_double, to_date, to_time, to_timestamp, ni pour les pour sorties : to_char, to_varchar, to_binary.

  • Les chaînes JSON stockées dans VariantType ne peuvent pas être converties en types Datetime.

  • Pour Table.merge et Table.update, la mise en œuvre ne prend en charge le comportement que lorsque les paramètres de session ERROR_ON_NONDETERMINISTIC_UPDATE et ERROR_ON_NONDETERMINISTIC_MERGE sont définis sur False. Cela signifie que pour les multi-jointures, une des lignes correspondantes est mise à jour.

Liste des APIs prises en charge

Session Snowpark

Session.createDataFrame

Session.create_dataframe

Session.flatten

Session.range

Session.table

Entrée/sortie

DataFrameReader.csv

DataFrameReader.table

DataFrameWriter.saveAsTable

DataFrameWriter.save_as_table

DataFrame

DataFrame.agg

DataFrame.cache_result

DataFrame.col

DataFrame.collect

DataFrame.collect_nowait

DataFrame.copy_into_table

DataFrame.count

DataFrame.createOrReplaceTempView

DataFrame.createOrReplaceView

DataFrame.create_or_replace_temp_view

DataFrame.create_or_replace_view

DataFrame.crossJoin

DataFrame.cross_join

DataFrame.distinct

DataFrame.drop

DataFrame.dropDuplicates

DataFrame.drop_duplicates

DataFrame.dropna

DataFrame.except_

DataFrame.explain

DataFrame.fillna

DataFrame.filter

DataFrame.first

DataFrame.groupBy

DataFrame.group_by

DataFrame.intersect

DataFrame.join

DataFrame.limit

DataFrame.minus

DataFrame.natural_join

DataFrame.orderBy

DataFrame.order_by

DataFrame.rename

DataFrame.replace

DataFrame.rollup

DataFrame.sample

DataFrame.select

DataFrame.show

DataFrame.sort

DataFrame.subtract

DataFrame.take

DataFrame.toDF

DataFrame.toLocalIterator

DataFrame.toPandas

DataFrame.to_df

DataFrame.to_local_iterator

DataFrame.to_pandas

DataFrame.to_pandas_batches

DataFrame.union

DataFrame.unionAll

DataFrame.unionAllByName

DataFrame.unionByName

DataFrame.union_all

DataFrame.union_all_by_name

DataFrame.union_by_name

DataFrame.unpivot

DataFrame.where

DataFrame.withColumn

DataFrame.withColumnRenamed

DataFrame.with_column

DataFrame.with_column_renamed

DataFrame.with_columns

DataFrameNaFunctions.drop

DataFrameNaFunctions.fill

DataFrameNaFunctions.replace

Colonne

Column.alias

Column.as_

Column.asc

Column.asc_nulls_first

Column.asc_nulls_last

Column.astype

Column.between

Column.bitand

Column.bitor

Column.bitwiseAnd

Column.bitwiseOR

Column.bitwiseXOR

Column.bitxor

Column.cast

Column.collate

Column.desc

Column.desc_nulls_first

Column.desc_nulls_last

Column.endswith

Column.eqNullSafe

Column.equal_nan

Column.equal_null

Column.getItem

Column.getName

Column.get_name

Column.in_

Column.isNotNull

Column.isNull

Column.is_not_null

Column.is_null

Column.isin

Column.like

Column.name

Column.over

Column.regexp

Column.rlike

Column.startswith

Column.substr

Column.substring

Column.try_cast

Column.within_group

CaseExpr.when

CaseExpr.otherwise

Types de données

ArrayType

BinaryType

BooleanType

ByteType

ColumnIdentifier

DataType

DateType

DecimalType

DoubleType

FloatType

IntegerType

LongType

MapType

NullType

ShortType

StringType

StructField

StructType

Timestamp

TimestampType

TimeType

Variant

VariantType

Ligne

Row.asDict

Row.as_dict

Row.count

Row.index

Fonction

abs

avg

coalesce

contains

count

count_distinct

covar_pop

endswith

first_value

iff

lag

last_value

lead

list_agg

max

median

min

parse_json

row_number

startswith

substring

sum

to_array

to_binary

to_boolean

to_char

to_date

to_decimal

to_double

to_object

to_time

to_timestamp

to_variant

Window

Window.orderBy

Window.order_by

Window.partitionBy

Window.partition_by

Window.rangeBetween

Window.range_between

Window.rowsBetween

Window.rows_between

WindowSpec.orderBy

WindowSpec.order_by

WindowSpec.partitionBy

WindowSpec.partition_by

WindowSpec.rangeBetween

WindowSpec.range_between

WindowSpec.rowsBetween

WindowSpec.rows_between

Groupement

RelationalGroupedDataFrame.agg

RelationalGroupedDataFrame.apply_in_pandas

RelationalGroupedDataFrame.applyInPandas

RelationalGroupedDataFrame.avg

RelationalGroupedDataFrame.builtin

RelationalGroupedDataFrame.count

RelationalGroupedDataFrame.function

RelationalGroupedDataFrame.max

RelationalGroupedDataFrame.mean

RelationalGroupedDataFrame.median

RelationalGroupedDataFrame.min

RelationalGroupedDataFrame.sum

Table

Table.delete

Table.drop_table

Table.merge

Table.sample

Table.update

WhenMatchedClause.delete

WhenMatchedClause.update

WhenNotMatchedClause.insert