Schreiben von Tests für Snowpark Python¶
Unter diesem Thema wird erläutert, wie Sie Ihren Snowpark-Code bei bestehender Verbindung zu Snowflake testen. Sie können Standardtestprogramme wie PyTest verwenden, um Ihre Snowpark Python-UDFs, DataFrame-Transformationen und gespeicherten Prozeduren zu testen.
Unter diesem Thema:
Gründliches Testen kann dazu beitragen, unbeabsichtigte Änderungen zu verhindern. Mit Unit-Tests wird überprüft, ob ein Abschnitt des Codes wie erwartet funktioniert. Integrationstests helfen sicherzustellen, dass die Komponenten für einen End-to-End-Anwendungsfall korrekt zusammenarbeiten.
Die Beispiele in diesem Dokument verwenden PyTest, ein häufig eingesetztes Test-Framework für Python. Weitere Hinweise und bewährte Verfahren finden Sie in der PyTest-Dokumentation.
Alternativ können Sie das lokale Test-Framework von Snowpark Python verwenden, um Snowpark Python-DataFrames lokal zu erstellen und zu nutzen, ohne sich mit einem Snowflake-Konto verbinden zu müssen. Weitere Informationen dazu finden Sie unter Lokales Test-Framework.
Einrichten Ihrer Tests¶
Installieren Sie PyTest in Ihrem Projekt, indem Sie pip install pytest
oder conda install pytest
ausführen. Sie können es auch zu Ihrer requirements.txt
- oder conda-Umgebungsdatei hinzufügen.
Erstellen Sie ein Verzeichnis test
neben Ihrem Quellcodeverzeichnis, und fügen Sie Ihre Unit- und Integrationstests zu diesem Verzeichnis hinzu. Ein Beispiel finden Sie in der Snowpark Python-Projektvorlage.
Erstellen einer PyTest-Fixture für die Snowpark-Sitzung¶
PyTest-Fixtures sind Funktionen, die vor einem Test (oder einem Modul von Tests) ausgeführt werden, um Daten oder Verbindungen für Tests bereitzustellen. In diesem Szenario erstellen Sie eine PyTest-Fixture, die ein Snowpark-Session
-Objekt zurückgibt.
Erstellen Sie ein Verzeichnis
test
, wenn Sie noch keines haben.Erstellen Sie dann im Verzeichnis
test
eine Dateiconftest.py
mit folgendem Inhalt, wobeiconnection_parameters
ein Dictionary mit den Anmeldeinformationen Ihres Snowflake-Kontos ist. Weitere Informationen zum Dictionary-Format finden Sie unter Erstellen einer Sitzung.Erstellen Sie die
Session
-Fixture als Modul-Fixture statt als Datei-Fixture, um zu verhindern, dass mehrere Sessions erstellt werden und Probleme aufgrund von kollidierenden Sitzungsobjekten verursachen.
from snowflake.snowpark.session import Session
@pytest.fixture(scope='module')
def session(request) -> Session:
connection_parameters = {}
return Session.builder.configs(...).create()
Unit-Tests für UDFs¶
Sie können Ihre Python-UDF-Logik testen, indem Sie den UDF-Handler als generische Python-Methode testen.
Erstellen Sie eine Datei in Ihrem
test
-Verzeichnis für die UDF-Unit-Tests. Geben Sie der Datei zum Beispiel den Namentest_functions.py
.Importieren Sie die zu testenden Python-Methoden.
Erstellen Sie für jedes Testszenario eine Python-Methode namens
test_<Szenario_für_Test>
.
Hier ist beispielsweise ein Python-UDF-Handler:
def fahrenheit_to_celsius(temp_f: float) -> float:
"""
Converts fahrenheit to celsius
"""
return (float(temp_f) - 32) * (5/9)
Sie können diese Methode in die Testdatei (test/test_functions.py
) importieren und sie als generische Python-Methode testen.
import farenheit_to_celsius
def test_farenheit_to_celsius():
expected = 0.0
actual = farenheit_to_celsius(32)
assert expected == actual
Unit-Tests für DataFrame-Transformationen¶
Das Hinzufügen von Unit-Tests für Ihre DataFrame-Transformationen hilft, sich vor unerwarteten Fehlern und Regressionen zu schützen. Damit sich Ihre DataFrame-Logik leichter testen lässt, kapseln Sie die Transformationen in eine Python-Methode, die als Eingabe die zu transformierenden DataFrames erhält und die transformierten DataFrames zurückgibt.
Im folgenden Beispiel enthält mf_df_transformer
die Transformationslogik. Es kann in andere Module des Python-Projekts importiert und leicht getestet werden.
from snowflake.snowpark.dataframe import DataFrame, col
def my_df_tranformer(df: DataFrame) -> DataFrame:
return df \
.with_column('c', df['a']+df['b']) \
.filter(col('c') > 3)
Um diese Transformation zu testen, gehen Sie wie folgt vor:
Erstellen Sie für die DataFrame-Tests eine Datei namens
test_transformers.py
in demtest
-Verzeichnis (test/test_transformers.py
).Erstellen Sie eine Testmethode für den zu prüfenden Transformer:
test_my_df_transformer(session)
. Der Parametersession
bezieht sich hier auf die im vorherigen Abschnitt erstellte Sitzungs-Fixture.Erstellen Sie mithilfe der Sitzungs-Fixture die Eingabe- und die erwarteten Ausgabe-DataFrames innerhalb der Testmethode.
Übergeben Sie das Eingabe-DataFrame an den Transformer, und vergleichen Sie den erwarteten DataFrame mit dem tatsächlichen DataFrame, den der Transformer zurückgibt.
# test/test_transformers.py
import my_df_transformer
def test_my_df_transformer(session):
input_df = session.create_dataframe([[1,2],[3,4]], ['a', 'b'])
expected_df = session.create_dataframe([3,4,7], ['a','b','c'])
actual_df = my_df_transformer(input_df)
assert input_df.collect() == actual_df.collect()
Integrationstests für gespeicherte Prozeduren¶
Zum Testen der Handler der gespeicherten Prozeduren verwenden Sie die Session-Fixture, um den Handler der gespeicherten Prozedur aufzurufen. Wenn Ihre gespeicherte Prozedur aus Tabellen liest, wie z. B. in einer ETL-Pipeline, können Sie diese Tabellen vor dem Aufrufe n des Handlers der gespeicherten Prozedur erstellen, wie im folgenden Beispiel gezeigt. Dieses Muster stellt sicher, dass Ihre Eingabedaten in der Versionskontrolle nachverfolgt werden und sich zwischen den Testausführungen nicht unerwartet ändern.
from project import my_sproc_handler # import stored proc handler
def test_my_sproc_handler(session: Session):
# Create input table
input_tbl = session.create_dataframe(
data=[...],
schema=[...],
)
input_tbl.write.mode('overwrite').save_as_table(['DB', 'SCHEMA', 'INPUT_TBL'], mode='overwrite')
# Create expected output dataframe
expected_df = session.create_dataframe(
data=[...],
schema=[...],
).collect()
# Call the stored procedure
my_sproc_handler()
# Get actual table
actual_tbl = session.table(['DB', 'SCHEMA', 'OUTPUT_TBL']).collect()
# Clean up tables
session.table(['DB', 'SCHEMA', 'OUTPUT_TBL']).delete()
session.table(['DB', 'SCHEMA', 'INPUT_TBL']).delete()
# Compare the actual and expected tables
assert expected_df == actual_tbl