Cadre de test local¶
Cette rubrique explique comment tester votre code localement lorsque vous utilisez la bibliothèque Snowpark.
Dans ce chapitre :
Le cadre de test local Python Snowpark vous permet de créer et d’agir sur des DataFrames Python Snowpark localement sans vous connecter à un compte Snowflake. Vous pouvez utiliser le cadre de test local pour tester vos opérations DataFrame localement, sur votre poste de développement ou dans un pipeline CI (intégration continue), avant de déployer les changements de code sur votre compte. L’API est la même, de sorte que vous pouvez exécuter vos tests localement ou sur un compte Snowflake, sans modifier le code.
Conditions préalables¶
Pour utiliser le cadre de test local :
Vous devez utiliser la version 1.11.1 ou une version supérieure de la bibliothèque Python Snowpark avec la dépendance
pandas
facultative. Effectuez l’installation en exécutantpip install "snowflake-snowpark-python[pandas]"
Les versions de Python prises en charge sont les suivantes :
3,8
3,9
3,10
3,11
Création d’une session et activation de tests locaux¶
Pour commencer, créez une Session
Snowpark et définissez la configuration de test local sur True
.
from snowflake.snowpark import Session
session = Session.builder.config('local_testing', True).create()
Une fois la session créée, vous pouvez l’utiliser pour créer et agir sur des DataFrames.
df = session.create_dataframe([[1,2],[3,4]],['a','b'])
df.with_column('c', df['a']+df['b']).show()
Chargement des données¶
Vous pouvez créer des DataFrames Snowpark à partir de primitives Python, de fichiers et de DataFrames Pandas. Cela s’avère utile pour spécifier l’entrée et la sortie prévues des cas de test. En procédant de cette manière, les données se trouvent dans le contrôle source, ce qui facilite la synchronisation des données de test sur les cas de test.
Chargement des données CSV¶
Vous pouvez charger des fichiers CSV dans un DataFrame Snowpark en commençant par appeler Session.file.put()
pour charger le fichier dans la zone de préparation en mémoire, puis en utilisant Session.read()
pour lire le contenu. Supposons qu’il existe un fichier, data.csv
, dont le contenu est le suivant :
col1,col2,col3,col4
1,a,true,1.23
2,b,false,4.56
Vous pouvez charger data.csv
dans un DataFrame Snowpark en utilisant le code suivant. Vous devez d’abord placer le fichier dans une zone de préparation. Sinon, vous obtiendrez un message d’erreur indiquant que le fichier est introuvable.
from snowflake.snowpark.types import StructType, StructField, IntegerType, BooleanType, StringType, DoubleType
# Put file onto stage
session.file.put("data.csv", "@mystage", auto_compress=False)
schema = StructType(
[
StructField("col1", IntegerType()),
StructField("col2", StringType()),
StructField("col3", BooleanType()),
StructField("col4", DoubleType()),
]
)
# with option SKIP_HEADER set to 1, the header will be skipped when the csv file is loaded
dataframe = session.read.schema(schema).option("SKIP_HEADER", 1).csv("@mystage/data.csv")
dataframe.show()
La sortie de dataframe.show()
sera :
-------------------------------------
|"COL1" |"COL2" |"COL3" |"COL4" |
-------------------------------------
|1 |a |True |1.23 |
|2 |b |False |4.56 |
-------------------------------------
Chargement des données Pandas¶
Vous pouvez créer un DataFrame Python Snowpark à partir d’un DataFrame Pandas en appelant la méthode create_dataframe
et en transmettant les données sous forme de DataFrame Pandas.
import pandas as pd
pandas_df = pd.DataFrame(
data={
"col1": pd.Series(["value1", "value2"]),
"col2": pd.Series([1.23, 4.56]),
"col3": pd.Series([123, 456]),
"col4": pd.Series([True, False]),
}
)
dataframe = session.create_dataframe(data=pandas_df)
dataframe.show()
dataframe.show()
produit le résultat suivant :
-------------------------------------
|"col1" |"col2" |"col3" |"col4" |
-------------------------------------
|value1 |1.23 |123 |True |
|value2 |4.56 |456 |False |
-------------------------------------
Un DataFrame Python Snowpark peut également être converti en DataFrame Pandas via l’appel de la méthode to_pandas
sur le DataFrame.
from snowflake.snowpark.types import StructType, StructField, StringType, DoubleType, LongType, BooleanType
dataframe = session.create_dataframe(
data=[
["value1", 1.23, 123, True],
["value2", 4.56, 456, False],
],
schema=StructType([
StructField("col1", StringType()),
StructField("col2", DoubleType()),
StructField("col3", LongType()),
StructField("col4", BooleanType()),
])
)
pandas_dataframe = dataframe.to_pandas()
print(pandas_dataframe.to_string())
L’appel de print(pandas_dataframe.to_string())
produit le résultat suivant :
COL1 COL2 COL3 COL4
0 value1 1.23 123 True
1 value2 4.56 456 False
Création d’une fixture PyTest pour la session¶
Les fixtures PyTest sont des fonctions qui sont exécutées avant un test (ou un module de tests), généralement pour fournir des données ou des connexions aux tests. Dans ce cas, créez une fixture qui renvoie un objet Session
Snowpark. Commencez par créer un répertoire test
, si vous n’en avez pas déjà un. Ensuite, dans le répertoire test
, créez un fichier conftest.py
avec le contenu suivant, où connection_parameters
est un dictionnaire, avec les identifiants de connexion de votre compte Snowflake. Pour plus d’informations sur le format de dictionnaire, voir Création d’une session.
# test/conftest.py
import pytest
from snowflake.snowpark.session import Session
def pytest_addoption(parser):
parser.addoption("--snowflake-session", action="store", default="live")
@pytest.fixture(scope='module')
def session(request) -> Session:
if request.config.getoption('--snowflake-session') == 'local':
return Session.builder.config('local_testing', True).create()
else:
return Session.builder.configs(CONNECTION_PARAMETERS).create()
L’appel de pytest_addoption
ajoute une option de ligne de commande nommée snowflake-session
à la commande pytest
. La fixture Session
vérifie cette option de ligne de commande et crée une Session
locale ou en direct, suivant sa valeur. Cela vous permet de basculer facilement entre les modes local et en direct pour les tests.
# Using local mode:
pytest --snowflake-session local
# Using live mode
pytest
Opérations SQL¶
Session.sql(...)
n’est pas pris en charge par le cadre de test local. Utilisez les APIs DataFrame de Snowpark, autant que possible, et, dans les cas où vous devez utiliser Session.sql(...)
, vous pouvez simuler la valeur de renvoi tabulaire via le unittest.mock.patch
de Python pour corriger la réponse attendue d’un appel Session.sql()
donné.
Dans l’exemple ci-dessous, mock_sql()
mappe le texte de requête SQL vers la réponse DataFrame souhaitée. L’instruction conditionnelle suivante vérifie si la session en cours utilise un test local et, si c’est le cas, applique le correctif à la méthode Session.sql()
.
from unittest import mock
from functools import partial
def test_something(pytestconfig, session):
def mock_sql(session, sql_string): # patch for SQL operations
if sql_string == "select 1,2,3":
return session.create_dataframe([[1,2,3]])
else:
raise RuntimeError(f"Unexpected query execution: {sql_string}")
if pytestconfig.getoption('--snowflake-session') == 'local':
with mock.patch.object(session, 'sql', wraps=partial(mock_sql, session)): # apply patch for SQL operations
assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
else:
assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
Lorsque le test local est activé, toutes les tables créées par DataFrame.save_as_table()
sont enregistrées sous forme de tables temporaires dans la mémoire et peuvent être récupérées via Session.table()
. Vous pouvez utiliser les opérations DataFrame prises en charge sur la table, comme d’habitude.
Correction des fonctions intégrées¶
Toutes les fonctions intégrées sous snowflake.snowpark.functions
ne sont pas prises en charge par le cadre de test local. Si vous utilisez une fonction non prise en charge, vous devez utiliser le décorateur @patch
de snowflake.snowpark.mock
pour créer un correctif.
Pour définir et mettre en œuvre la fonction corrigée, il faut que la signature (liste de paramètres) soit alignée sur les paramètres de la fonction intégrée. Le cadre de test local transmet les paramètres à la fonction corrigée en appliquant les règles suivantes :
Pour les paramètres de type
ColumnOrName
dans la signature des fonctions intégrées,ColumnEmulator
est transmis comme paramètre des fonctions corrigées.ColumnEmulator
est similaire à un objetpandas.Series
contenant les données de colonne.Pour les paramètres de type
LiteralType
dans la signature des fonctions intégrées, la valeur littérale est transmise comme paramètre des fonctions corrigées.Sinon, la valeur brute est transmise comme paramètre des fonctions corrigées.
En ce qui concerne le type de renvoi des fonctions corrigées, le renvoi d’une instance de ColumnEmulator
est attendu en correspondance avec le type de renvoi de Column
des fonctions intégrées.
Par exemple, la fonction intégrée to_timestamp()
pourrait être corrigée comme suit :
import datetime
from snowflake.snowpark.mock import patch, ColumnEmulator, ColumnType
from snowflake.snowpark.functions import to_timestamp
from snowflake.snowpark.types import TimestampType
@patch(to_timestamp)
def mock_to_timestamp(column: ColumnEmulator, format = None) -> ColumnEmulator:
ret_column = ColumnEmulator(data=[datetime.datetime.strptime(row, '%Y-%m-%dT%H:%M:%S%z') for row in column])
ret_column.sf_type = ColumnType(TimestampType(), True)
return ret_column
Omission des cas de test¶
Si votre suite de tests PyTest contient un cas de test qui n’est pas bien pris en charge par les tests locaux, vous pouvez ignorer ces cas en utilisant le décorateur mark.skipif
de PyTest. L’exemple ci-dessous suppose que vous ayez configuré votre session et vos paramètres comme décrit précédemment. La condition vérifie si le local_testing_mode
est défini sur local
, et, si c’est le cas, le cas de test est ignoré avec un message expliquant pourquoi il a été ignoré.
import pytest
@pytest.mark.skipif(
condition="config.getvalue('local_testing_mode') == 'local'",
reason="Test case disabled for local testing"
)
def test_case(session):
...
Limitations¶
Les fonctionnalités suivantes ne sont pas prises en charge :
Chaînes et opérations SQL brutes nécessitant l’analyse des chaînes SQL. Par exemple,
session.sql
etDataFrame.filter("col1 > 12")
ne sont pas pris en charge.UDFs, UDTFs et procédures stockées.
Fonctions de table.
AsyncJobs.
Opérations de session telles que la modification des entrepôts, des schémas et d’autres propriétés de session.
Types de données
Geometry
etGeography
.Agrégation des fonctions de fenêtre.
# Selecting window function expressions is supported df.select("key", "value", sum_("value").over(), avg("value").over()) # Aggregating window function expressions is NOT supported df.group_by("key").agg([sum_("value"), sum_(sum_("value")).over(window) - sum_("value")])
Les autres limites sont les suivantes :
Les types de données
Variant
,Array
etObject
sont pris en charge uniquement avec le codage et le décodage JSON standards. Des expressions telles que [1,2,,3,] sont considérées comme des valeurs JSON valides dans Snowflake, mais pas dans les tests locaux, dans lesquels les fonctionnalités JSON intégrées de Python sont utilisées. Vous pouvez spécifier les variablessnowflake.snowpark.mock.CUSTOM_JSON_ENCODER
etsnowflake.snowpark.mock.CUSTOM_JSON_DECODER
au niveau du module pour remplacer les paramètres par défaut.Seul un sous-ensemble des fonctions de Snowflake (y compris les fonctions de fenêtre) est mis en œuvre. Voir Correction des fonctions intégrées pour savoir comment injecter votre propre définition de fonction.
La correction de fonctions associées au classement n’est actuellement pas prise en charge.
La sélection de colonnes portant le même nom renvoie une seule colonne. Pour contourner le problème, renommez les colonnes pour qu’elles aient des noms distincts via
Column.alias
.df.select(lit(1), lit(1)).show() # col("a"), col("a") #--------- #|"'1'" | #--------- #|1 | #|... | #--------- # Workaround: Column.alias DataFrame.select(lit(1).alias("col1_1"), lit(1).alias("col1_2")) # "col1_1", "col1_2"
La conversion de type explicite via
Column.cast
est limitée, en ce sens que les chaînes de format ne sont pas prises en charge pour les entrées :to_decimal
,to_number
,to_numeric
,to_double
,to_date
,to_time
,to_timestamp
, ni pour les pour sorties :to_char
,to_varchar
,to_binary
.Les chaînes JSON stockées dans
VariantType
ne peuvent pas être converties en typesDatetime
.Pour
Table.merge
etTable.update
, la mise en œuvre ne prend en charge le comportement que lorsque les paramètres de sessionERROR_ON_NONDETERMINISTIC_UPDATE
etERROR_ON_NONDETERMINISTIC_MERGE
sont définis surFalse
. Cela signifie que pour les multi-jointures, une des lignes correspondantes est mise à jour.
Liste des APIs prises en charge¶
Session Snowpark¶
Session.createDataFrame
Session.create_dataframe
Session.flatten
Session.range
Session.table
Entrée/sortie¶
DataFrameReader.csv
DataFrameReader.table
DataFrameWriter.saveAsTable
DataFrameWriter.save_as_table
DataFrame¶
DataFrame.agg
DataFrame.cache_result
DataFrame.col
DataFrame.collect
DataFrame.collect_nowait
DataFrame.copy_into_table
DataFrame.count
DataFrame.createOrReplaceTempView
DataFrame.createOrReplaceView
DataFrame.create_or_replace_temp_view
DataFrame.create_or_replace_view
DataFrame.crossJoin
DataFrame.cross_join
DataFrame.distinct
DataFrame.drop
DataFrame.dropDuplicates
DataFrame.drop_duplicates
DataFrame.dropna
DataFrame.except_
DataFrame.explain
DataFrame.fillna
DataFrame.filter
DataFrame.first
DataFrame.groupBy
DataFrame.group_by
DataFrame.intersect
DataFrame.join
DataFrame.limit
DataFrame.minus
DataFrame.natural_join
DataFrame.orderBy
DataFrame.order_by
DataFrame.rename
DataFrame.replace
DataFrame.rollup
DataFrame.sample
DataFrame.select
DataFrame.show
DataFrame.sort
DataFrame.subtract
DataFrame.take
DataFrame.toDF
DataFrame.toLocalIterator
DataFrame.toPandas
DataFrame.to_df
DataFrame.to_local_iterator
DataFrame.to_pandas
DataFrame.to_pandas_batches
DataFrame.union
DataFrame.unionAll
DataFrame.unionAllByName
DataFrame.unionByName
DataFrame.union_all
DataFrame.union_all_by_name
DataFrame.union_by_name
DataFrame.unpivot
DataFrame.where
DataFrame.withColumn
DataFrame.withColumnRenamed
DataFrame.with_column
DataFrame.with_column_renamed
DataFrame.with_columns
DataFrameNaFunctions.drop
DataFrameNaFunctions.fill
DataFrameNaFunctions.replace
Colonne¶
Column.alias
Column.as_
Column.asc
Column.asc_nulls_first
Column.asc_nulls_last
Column.astype
Column.between
Column.bitand
Column.bitor
Column.bitwiseAnd
Column.bitwiseOR
Column.bitwiseXOR
Column.bitxor
Column.cast
Column.collate
Column.desc
Column.desc_nulls_first
Column.desc_nulls_last
Column.endswith
Column.eqNullSafe
Column.equal_nan
Column.equal_null
Column.getItem
Column.getName
Column.get_name
Column.in_
Column.isNotNull
Column.isNull
Column.is_not_null
Column.is_null
Column.isin
Column.like
Column.name
Column.over
Column.regexp
Column.rlike
Column.startswith
Column.substr
Column.substring
Column.try_cast
Column.within_group
CaseExpr.when
CaseExpr.otherwise
Types de données¶
ArrayType
BinaryType
BooleanType
ByteType
ColumnIdentifier
DataType
DateType
DecimalType
DoubleType
FloatType
IntegerType
LongType
MapType
NullType
ShortType
StringType
StructField
StructType
Timestamp
TimestampType
TimeType
Variant
VariantType
Ligne¶
Row.asDict
Row.as_dict
Row.count
Row.index
Fonction¶
abs
avg
coalesce
contains
count
count_distinct
covar_pop
endswith
first_value
iff
lag
last_value
lead
list_agg
max
median
min
parse_json
row_number
startswith
substring
sum
to_array
to_binary
to_boolean
to_char
to_date
to_decimal
to_double
to_object
to_time
to_timestamp
to_variant
Window¶
Window.orderBy
Window.order_by
Window.partitionBy
Window.partition_by
Window.rangeBetween
Window.range_between
Window.rowsBetween
Window.rows_between
WindowSpec.orderBy
WindowSpec.order_by
WindowSpec.partitionBy
WindowSpec.partition_by
WindowSpec.rangeBetween
WindowSpec.range_between
WindowSpec.rowsBetween
WindowSpec.rows_between
Groupement¶
RelationalGroupedDataFrame.agg
RelationalGroupedDataFrame.apply_in_pandas
RelationalGroupedDataFrame.applyInPandas
RelationalGroupedDataFrame.avg
RelationalGroupedDataFrame.builtin
RelationalGroupedDataFrame.count
RelationalGroupedDataFrame.function
RelationalGroupedDataFrame.max
RelationalGroupedDataFrame.mean
RelationalGroupedDataFrame.median
RelationalGroupedDataFrame.min
RelationalGroupedDataFrame.sum
Table¶
Table.delete
Table.drop_table
Table.merge
Table.sample
Table.update
WhenMatchedClause.delete
WhenMatchedClause.update
WhenNotMatchedClause.insert