Tutorial: Testen von Snowpark Python¶
Einführung¶
Dieses Tutorial führt Sie in die Grundlagen des Testens Ihres Snowpark Python-Codes ein.
Lerninhalte¶
In diesem Tutorial erfahren Sie Folgendes:
Testen von Snowpark-Code bei bestehender Verbindung zu Snowflake.
Sie können Standardtestprogramme wie PyTest verwenden, um Ihre Snowpark Python-UDFs, DataFrame-Transformationen und gespeicherten Prozeduren zu testen.
Lokales Testen von Snowpark Python-DataFrames ohne Verbindung zu einem Snowflake-Konto durch Verwendung des lokalen Test-Frameworks.
Sie können das lokale Test-Framework verwenden, um lokal auf Ihrem Entwicklungsrechner zu testen, bevor Sie Codeänderungen bereitstellen.
Voraussetzungen¶
Für die Verwendung des lokalen Test-Frameworks:
Sie müssen Version 1.11.1 oder höher der Snowpark Python-Bibliothek verwenden.
Folgende Versionen von Python werden unterstützt:
3.8
3.9
3.10
3.11
Projekt einrichten¶
In diesem Abschnitt klonen Sie das Projekt-Repository und richten die Umgebung ein, die Sie für das Tutorial benötigen.
Klonen Sie das Projekt-Repository.
git clone https://github.com/Snowflake-Labs/sftutorial-snowpark-testing
Wenn Sie git nicht installiert haben, rufen Sie die Repository-Seite auf, und laden Sie die Inhalte herunter, indem Sie auf Code » Download Contents klicken.
Setzen Sie Umgebungsvariablen auf die Anmeldeinformationen für Ihr Konto. Die Snowpark-API verwendet diese, um sich bei Ihrem Snowflake-Konto zu authentifizieren.
# Linux/MacOS export SNOWSQL_ACCOUNT=<replace with your account identifer> export SNOWSQL_USER=<replace with your username> export SNOWSQL_ROLE=<replace with your role> export SNOWSQL_PWD=<replace with your password> export SNOWSQL_DATABASE=<replace with your database> export SNOWSQL_SCHEMA=<replace with your schema> export SNOWSQL_WAREHOUSE=<replace with your warehouse>
# Windows/PowerShell $env:SNOWSQL_ACCOUNT = "<replace with your account identifer>" $env:SNOWSQL_USER = "<replace with your username>" $env:SNOWSQL_ROLE = "<replace with your role>" $env:SNOWSQL_PWD = "<replace with your password>" $env:SNOWSQL_DATABASE = "<replace with your database>" $env:SNOWSQL_SCHEMA = "<replace with your schema>" $env:SNOWSQL_WAREHOUSE = "<replace with your warehouse>"
Optional: Sie können diese Umgebungsvariablen dauerhaft festlegen, indem Sie Ihr Bash-Profil (unter Linux/MacOS) bearbeiten oder das Menü System Properties (unter Windows) verwenden.
Erstellen und aktivieren Sie eine conda-Umgebung mit Anaconda:
conda env create --file environment.yml conda activate snowpark-testing
Erstellen Sie die Beispieltabelle in Ihrem Konto, indem Sie
setup/create_table.py
ausführen. Dieses Python-Skript erstellt eine Datenbank namens CITIBIKE, ein Schema namens PUBLIC und eine kleine Tabelle namens TRIPS.python setup/create_table.py
Sie können nun mit dem nächsten Abschnitt fortfahren. In diesem Abschnitt haben Sie:
Das Repository des Tutorials geklont.
Umgebungsvariablen mit Ihren Kontoinformationen erstellt.
Eine conda-Umgebung für das Projekt erstellt.
Eine Verbindung zu Snowflake über die Snowpark-API hergestellt und eine Beispieldatenbank, ein Schema und eine Tabelle erstellt.
Gespeicherte Prozedur testen¶
Das Beispielprojekt enthält einen Handler der gespeicherten Prozedur (sproc.py
) und drei DataFrames-Transformer-Methode (transformers.py
). Der Handler der gespeicherten Prozedur verwendet die UDF- und DataFrame-Transformer zum Lesen aus der Quelltabelle CITIBIKE.PUBLIC.TRIPS
und erstellt zwei Faktentabellen: MONTH_FACTS
und BIKE_FACTS
.
Sie können die gespeicherte Prozedur über die Befehlszeile ausführen, indem Sie folgenden Befehl ausführen.
python project/sproc.py
Nachdem Sie sich nun mit dem Projekt vertraut gemacht haben, werden Sie im nächsten Abschnitt das Testverzeichnis einrichten und eine PyTest-Fixture für die Snowflake-Sitzung erstellen.
PyTest-Fixture für die Snowflake-Sitzung erstellen¶
PyTest-Fixtures sind Funktionen, die vor einem Test (oder einem Modul von Tests) ausgeführt werden, typischerweise um Daten oder Verbindungen für Tests bereitzustellen. Für dieses Projekt werden Sie eine PyTest-Fixture erstellen, die ein Snowpark-Session
-Objekt zurückgibt. Ihre Testfälle werden dieses Sitzungsobjekt verwenden, um eine Verbindung zu Snowflake herzustellen.
Erstellen Sie im Stammverzeichnis des Projekts ein Verzeichnis namens
test
.mkdir test
Erstellen Sie im Verzeichnis
test
eine neue Python-Datei namensconftest.py
. Erstellen Sie innerhalb vonconftest.py
eine PyTest-Fixture für dasSession
-Objekt:import pytest from project.utils import get_env_var_config from snowflake.snowpark.session import Session @pytest.fixture def session() -> Session: return Session.builder.configs(get_env_var_config()).create()
Unit-Tests für DataFrame-Transformer hinzufügen¶
Erstellen Sie im Verzeichnis
test
eine neue Python-Datei namenstest_transformers.py
.Importieren Sie in die Datei
test_transformers.py
die Transformer-Methoden.# test/test_transformers.py from project.transformers import add_rider_age, calc_bike_facts, calc_month_facts
Erstellen Sie anschließend Unit-Tests für diese Transformer. In der Regel wird für jeden Test eine Methode mit dem Namen
test_<Name_der_Methode>
erstellt. In unserem Fall werden dies folgende Tests sein:# test/test_transformers.py from project.transformers import add_rider_age, calc_bike_facts, calc_month_facts def test_add_rider_age(session): ... def test_calc_bike_facts(session): ... def test_calc_month_facts(session): ...
Der
session
-Parameter jedes Testfalls bezieht sich auf die PyTest-Fixture, die Sie im vorherigen Abschnitt erstellt haben.Implementieren Sie nun die Testfälle für jeden Transformer. Verwenden Sie das folgende Muster.
Erstellen Sie eine Eingabe-DataFrame.
Erstellen Sie das erwartete Ausgabe-DataFrame.
Übergeben Sie das Eingabe-DataFrame aus Schritt 1 an die Transformer-Methode.
Vergleichen Sie die Ausgabe von Schritt 3 mit der erwarteten Ausgabe von Schritt 2.
# test/test_transformers.py from project.transformers import add_rider_age, calc_bike_facts, calc_month_facts from snowflake.snowpark.types import StructType, StructField, IntegerType, FloatType def test_add_rider_age(session: Session): input = session.create_dataframe( [ [1980], [1995], [2000] ], schema=StructType([StructField("BIRTH_YEAR", IntegerType())]) ) expected = session.create_dataframe( [ [1980, 43], [1995, 28], [2000, 23] ], schema=StructType([StructField("BIRTH_YEAR", IntegerType()), StructField("RIDER_AGE", IntegerType())]) ) actual = add_rider_age(input) assert expected.collect() == actual.collect() def test_calc_bike_facts(session: Session): input = session.create_dataframe([ [1, 10, 20], [1, 5, 30], [2, 20, 50], [2, 10, 60] ], schema=StructType([ StructField("BIKEID", IntegerType()), StructField("TRIPDURATION", IntegerType()), StructField("RIDER_AGE", IntegerType()) ]) ) expected = session.create_dataframe([ [1, 2, 7.5, 25.0], [2, 2, 15.0, 55.0], ], schema=StructType([ StructField("BIKEID", IntegerType()), StructField("COUNT", IntegerType()), StructField("AVG_TRIPDURATION", FloatType()), StructField("AVG_RIDER_AGE", FloatType()) ]) ) actual = calc_bike_facts(input) assert expected.collect() == actual.collect() def test_calc_month_facts(session: Session): from patches import patch_to_timestamp input = session.create_dataframe( data=[ ['2018-03-01 09:47:00.000 +0000', 1, 10, 15], ['2018-03-01 09:47:14.000 +0000', 2, 20, 12], ['2018-04-01 09:47:04.000 +0000', 3, 6, 30] ], schema=['STARTTIME', 'BIKE_ID', 'TRIPDURATION', 'RIDER_AGE'] ) expected = session.create_dataframe( data=[ ['Mar', 2, 15, 13.5], ['Apr', 1, 6, 30.0] ], schema=['MONTH', 'COUNT', 'AVG_TRIPDURATION', 'AVG_RIDER_AGE'] ) actual = calc_month_facts(input) assert expected.collect() == actual.collect()
Sie können nun PyTest ausführen, um alle Unit-Tests zu starten.
pytest test/test_transformers.py
Integrationstests für gespeicherte Prozeduren hinzufügen¶
Da wir nun Unit-Tests für die DataFrame-Transformer-Methoden haben, können wir einen Integrationstest für die gespeicherte Prozedur hinzufügen. Für den Testfall wird folgendes Muster verwendet:
Erstellen Sie eine Tabelle mit den Eingabedaten für die gespeicherte Prozedur.
Erstellen Sie zwei DataFrames mit den erwarteten Inhalten der beiden Ausgabetabellen der gespeicherten Prozedur.
Rufen Sie die gespeicherte Prozedur auf.
Vergleichen Sie die aktuellen Ausgabetabellen mit den DataFrames aus Schritt 2.
Bereinigung: Löschen Sie die Eingabetabelle aus Schritt 1 und die Ausgabetabellen aus Schritt 3.
Erstellen Sie im Verzeichnis test
eine Python-Datei namens test_sproc.py
.
Importieren Sie aus dem Projektverzeichnis den Handler der gespeicherten Prozedur, und erstellen Sie einen Testfall.
# test/test_sproc.py
from project.sproc import create_fact_tables
def test_create_fact_tables(session):
...
Implementieren Sie den Testfall, und beginnen Sie dabei mit dem Erstellen der Eingabetabelle.
# test/test_sproc.py
from project.sproc import create_fact_tables
from snowflake.snowpark.types import *
def test_create_fact_tables(session):
DB = 'CITIBIKE'
SCHEMA = 'TEST'
# Set up source table
tbl = session.create_dataframe(
data=[
[1983, '2018-03-01 09:47:00.000 +0000', 551, 30958],
[1988, '2018-03-01 09:47:01.000 +0000', 242, 19278],
[1992, '2018-03-01 09:47:01.000 +0000', 768, 18461],
[1980, '2018-03-01 09:47:03.000 +0000', 690, 15533],
[1991, '2018-03-01 09:47:03.000 +0000', 490, 32449],
[1959, '2018-03-01 09:47:04.000 +0000', 457, 29411],
[1971, '2018-03-01 09:47:08.000 +0000', 279, 28015],
[1964, '2018-03-01 09:47:09.000 +0000', 546, 15148],
[1983, '2018-03-01 09:47:11.000 +0000', 358, 16967],
[1985, '2018-03-01 09:47:12.000 +0000', 848, 20644],
[1984, '2018-03-01 09:47:14.000 +0000', 295, 16365]
],
schema=['BIRTH_YEAR', 'STARTTIME', 'TRIPDURATION', 'BIKEID'],
)
tbl.write.mode('overwrite').save_as_table([DB, SCHEMA, 'TRIPS_TEST'], mode='overwrite')
Erstellen Sie dann DataFrames für die erwarteten Ausgabetabellen.
# test/test_sproc.py
from project.sproc import create_fact_tables
from snowflake.snowpark.types import *
def test_create_fact_tables(session):
DB = 'CITIBIKE'
SCHEMA = 'TEST'
# Set up source table
tbl = session.create_dataframe(
data=[
[1983, '2018-03-01 09:47:00.000 +0000', 551, 30958],
[1988, '2018-03-01 09:47:01.000 +0000', 242, 19278],
[1992, '2018-03-01 09:47:01.000 +0000', 768, 18461],
[1980, '2018-03-01 09:47:03.000 +0000', 690, 15533],
[1991, '2018-03-01 09:47:03.000 +0000', 490, 32449],
[1959, '2018-03-01 09:47:04.000 +0000', 457, 29411],
[1971, '2018-03-01 09:47:08.000 +0000', 279, 28015],
[1964, '2018-03-01 09:47:09.000 +0000', 546, 15148],
[1983, '2018-03-01 09:47:11.000 +0000', 358, 16967],
[1985, '2018-03-01 09:47:12.000 +0000', 848, 20644],
[1984, '2018-03-01 09:47:14.000 +0000', 295, 16365]
],
schema=['BIRTH_YEAR', 'STARTTIME', 'TRIPDURATION', 'BIKEID'],
)
tbl.write.mode('overwrite').save_as_table([DB, SCHEMA, 'TRIPS_TEST'], mode='overwrite')
# Expected values
n_rows_expected = 12
bike_facts_expected = session.create_dataframe(
data=[
[30958, 1, 551.0, 40.0],
[19278, 1, 242.0, 35.0],
[18461, 1, 768.0, 31.0],
[15533, 1, 690.0, 43.0],
[32449, 1, 490.0, 32.0],
[29411, 1, 457.0, 64.0],
[28015, 1, 279.0, 52.0],
[15148, 1, 546.0, 59.0],
[16967, 1, 358.0, 40.0],
[20644, 1, 848.0, 38.0],
[16365, 1, 295.0, 39.0]
],
schema=StructType([
StructField("BIKEID", IntegerType()),
StructField("COUNT", IntegerType()),
StructField("AVG_TRIPDURATION", FloatType()),
StructField("AVG_RIDER_AGE", FloatType())
])
).collect()
month_facts_expected = session.create_dataframe(
data=[['Mar', 11, 502.18182, 43.00000]],
schema=StructType([
StructField("MONTH", StringType()),
StructField("COUNT", IntegerType()),
StructField("AVG_TRIPDURATION", DecimalType()),
StructField("AVG_RIDER_AGE", DecimalType())
])
).collect()
Rufen Sie schließlich die gespeicherte Prozedur auf, und lesen Sie die Ausgabetabellen. Vergleichen Sie die aktuellen Tabellen mit den DataFrame-Inhalten.
# test/test_sproc.py
from project.sproc import create_fact_tables
from snowflake.snowpark.types import *
def test_create_fact_tables(session):
DB = 'CITIBIKE'
SCHEMA = 'TEST'
# Set up source table
tbl = session.create_dataframe(
data=[
[1983, '2018-03-01 09:47:00.000 +0000', 551, 30958],
[1988, '2018-03-01 09:47:01.000 +0000', 242, 19278],
[1992, '2018-03-01 09:47:01.000 +0000', 768, 18461],
[1980, '2018-03-01 09:47:03.000 +0000', 690, 15533],
[1991, '2018-03-01 09:47:03.000 +0000', 490, 32449],
[1959, '2018-03-01 09:47:04.000 +0000', 457, 29411],
[1971, '2018-03-01 09:47:08.000 +0000', 279, 28015],
[1964, '2018-03-01 09:47:09.000 +0000', 546, 15148],
[1983, '2018-03-01 09:47:11.000 +0000', 358, 16967],
[1985, '2018-03-01 09:47:12.000 +0000', 848, 20644],
[1984, '2018-03-01 09:47:14.000 +0000', 295, 16365]
],
schema=['BIRTH_YEAR', 'STARTTIME', 'TRIPDURATION', 'BIKEID'],
)
tbl.write.mode('overwrite').save_as_table([DB, SCHEMA, 'TRIPS_TEST'], mode='overwrite')
# Expected values
n_rows_expected = 12
bike_facts_expected = session.create_dataframe(
data=[
[30958, 1, 551.0, 40.0],
[19278, 1, 242.0, 35.0],
[18461, 1, 768.0, 31.0],
[15533, 1, 690.0, 43.0],
[32449, 1, 490.0, 32.0],
[29411, 1, 457.0, 64.0],
[28015, 1, 279.0, 52.0],
[15148, 1, 546.0, 59.0],
[16967, 1, 358.0, 40.0],
[20644, 1, 848.0, 38.0],
[16365, 1, 295.0, 39.0]
],
schema=StructType([
StructField("BIKEID", IntegerType()),
StructField("COUNT", IntegerType()),
StructField("AVG_TRIPDURATION", FloatType()),
StructField("AVG_RIDER_AGE", FloatType())
])
).collect()
month_facts_expected = session.create_dataframe(
data=[['Mar', 11, 502.18182, 43.00000]],
schema=StructType([
StructField("MONTH", StringType()),
StructField("COUNT", IntegerType()),
StructField("AVG_TRIPDURATION", DecimalType()),
StructField("AVG_RIDER_AGE", DecimalType())
])
).collect()
# Call sproc, get actual values
n_rows_actual = create_fact_tables(session, 'TRIPS_TEST')
bike_facts_actual = session.table([DB, SCHEMA, 'bike_facts']).collect()
month_facts_actual = session.table([DB, SCHEMA, 'month_facts']).collect()
# Comparisons
assert n_rows_expected == n_rows_actual
assert bike_facts_expected == bike_facts_actual
assert month_facts_expected == month_facts_actual
Um den Testfall auszuführen, führen Sie pytest
über die Befehlszeile aus.
pytest test/test_sproc.py
Um alle Tests im Projekt auszuführen, führen Sie pytest
ohne weitere Optionen aus.
pytest
Lokales Testen konfigurieren¶
Zu diesem Zeitpunkt haben Sie eine PyTest-Testsuite für Ihre DataFrame-Transformer und die gespeicherte Prozedur. In jedem Testfall wird die Session
-Fixture verwendet, um eine Verbindung zu Ihrem Snowflake-Konto herzustellen, die SQL von der Snowpark Python-API zu senden und die Antwort abzurufen.
Alternativ können Sie auch das lokale Test-Framework verwenden, um die Transformationen lokal ohne Verbindung zu Snowflake auszuführen. Bei großen Testsuiten kann dies zu einer erheblich schnelleren Testausführung führen. In diesem Abschnitt wird gezeigt, wie Sie die Testsuite aktualisieren, um die Funktionen des lokalen Test-Frameworks zu nutzen.
Beginnen Sie mit der Aktualisierung der PyTest-
Session
-Fixture. Wir werden eine Befehlszeilenoption zu PyTest hinzufügen, um zwischen dem lokalen und dem Live-Testmodus zu wechseln.# test/conftest.py import pytest from project.utils import get_env_var_config from snowflake.snowpark.session import Session def pytest_addoption(parser): parser.addoption("--snowflake-session", action="store", default="live") @pytest.fixture(scope='module') def session(request) -> Session: if request.config.getoption('--snowflake-session') == 'local': return Session.builder.configs({'local_testing': True}).create() else: return Session.builder.configs(get_env_var_config()).create()
Wir müssen zuerst diese Methode patchen, da nicht alle integrierten Funktionen vom lokalen Test-Framework unterstützt werden, zum Beispiel die Funktion
monthname()
, die imcalc_month_facts()
-Transformer verwendet wird. Erstellen Sie im Testverzeichnis eine Datei namenspatches.py
. Kopieren Sie in diese Datei den folgenden Code.from snowflake.snowpark.mock.functions import patch from snowflake.snowpark.functions import monthname from snowflake.snowpark.mock.snowflake_data_type import ColumnEmulator, ColumnType from snowflake.snowpark.types import StringType import datetime import calendar @patch(monthname) def patch_monthname(column: ColumnEmulator) -> ColumnEmulator: ret_column = ColumnEmulator(data=[ calendar.month_abbr[datetime.datetime.strptime(row, '%Y-%m-%d %H:%M:%S.%f %z').month] for row in column]) ret_column.sf_type = ColumnType(StringType(), True) return ret_column
Der obige Patch akzeptiert einen einzigen Parameter
column
, der einpandas.Series
-ähnliches Objekt ist, das die Datenzeilen innerhalb der Spalte enthält. Wir verwenden dann eine Kombination von Methoden aus den Python-Modulendatetime
undcalendar
, um die Funktionalität der integrierten Spaltemonthname()
zu emulieren. Schließlich setzen wir den Rückgabetyp aufString
, da die integrierte Methode Zeichenfolgen zurückgibt, die den Monaten entsprechen („Jan, „Feb“, „Mar“ usw.).Anschließend importieren Sie diese Methode in die Tests für den DataFrame-Transformer und die gespeicherte Prozedur.
# test/test_transformers.py # No changes to the other unit test methods def test_calc_month_facts(request, session): # Add conditional to include the patch if local testing is being used if request.config.getoption('--snowflake-session') == 'local': from patches import patch_monthname # No other changes
Führen Sie
pytest
erneut aus, diesmal mit dem lokalen Flag.pytest test/test_transformers.py --snowflake-session local
Wenden Sie nun denselben Patch auf den Test der gespeicherten Prozedur an.
#test/test_sproc.py def test_create_fact_tables(request, session): # Add conditional to include the patch if local testing is being used if request.config.getoption('--snowflake-session') == 'local': from patches import patch_monthname # No other changes required
Führen Sie pytest erneut mit dem lokalen Flag aus.
pytest test/test_sproc.py --snowflake-session local
Zum Abschluss vergleichen wir die Zeit, die zum Ausführen der vollständigen Testsuite lokal und über eine Live-Verbindung benötigt wird. Wir verwenden den Befehl
time
, um die für beide Befehle benötigte Zeit zu messen. Beginnen wir mit der Live-Verbindung.time pytest
In diesem Fall dauerte die Ausführung der Testsuite 7,89 Sekunden. (Die genaue Zeit kann je nach Ihrem Computer, Ihrer Netzwerkverbindung und anderen Faktoren abweichen.)
=================================== test session starts ========================== platform darwin -- Python 3.9.18, pytest-7.4.3, pluggy-1.3.0 rootdir: /Users/jfreeberg/Desktop/snowpark-testing-tutorial configfile: pytest.ini collected 4 items test/test_sproc.py . [ 25%] test/test_transformers.py ... [100%] =================================== 4 passed in 6.86s ================================= pytest 1.63s user 1.86s system 44% cpu 7.893 total
Versuchen wir es nun mit dem lokalen Test-Framework:
time pytest --snowflake-session local
Mit dem lokalen Test-Framework dauerte die Ausführung der Testsuite nur 1 Sekunde.
================================== test session starts ================================ platform darwin -- Python 3.9.18, pytest-7.4.3, pluggy-1.3.0 rootdir: /Users/jfreeberg/Desktop/snowpark-testing-tutorial configfile: pytest.ini collected 4 items test/test_sproc.py . [ 25%] test/test_transformers.py ... [100%] =================================== 4 passed in 0.10s ================================== pytest --snowflake-session local 1.37s user 1.70s system 281% cpu 1.093 total
Mehr erfahren¶
Geschafft! Sehr gut gemacht.
In diesem Tutorial erhielten Sie einen umfassenden Überblick darüber, wie Sie Ihren Snowpark Python-Code testen können. Auf dem Weg dorthin, haben Sie folgende Aufgaben ausgeführt:
Eine PyTest Fixture erstellt und Unit-Tests und Integrationstests hinzugefügt
Weitere Informationen dazu finden Sie unter Schreiben von Tests für Snowpark Python.
Lokale Testen konfiguriert
Weitere Informationen dazu finden Sie unter Lokales Test-Framework.