Schreiben von Tests für Snowpark Python

Unter diesem Thema wird erläutert, wie Sie Ihren Snowpark-Code bei bestehender Verbindung zu Snowflake testen. Sie können Standardtestprogramme wie PyTest verwenden, um Ihre Snowpark Python-UDFs, DataFrame-Transformationen und gespeicherten Prozeduren zu testen.

Unter diesem Thema:

Gründliches Testen kann dazu beitragen, unbeabsichtigte Änderungen zu verhindern. Mit Unit-Tests wird überprüft, ob ein Abschnitt des Codes wie erwartet funktioniert. Integrationstests helfen sicherzustellen, dass die Komponenten für einen End-to-End-Anwendungsfall korrekt zusammenarbeiten.

Die Beispiele in diesem Dokument verwenden PyTest, ein häufig eingesetztes Test-Framework für Python. Weitere Hinweise und bewährte Verfahren finden Sie in der PyTest-Dokumentation.

Alternativ können Sie das lokale Test-Framework von Snowpark Python verwenden, um Snowpark Python-DataFrames lokal zu erstellen und zu nutzen, ohne sich mit einem Snowflake-Konto verbinden zu müssen. Weitere Informationen dazu finden Sie unter Lokales Test-Framework.

Einrichten Ihrer Tests

Installieren Sie PyTest in Ihrem Projekt, indem Sie pip install pytest oder conda install pytest ausführen. Sie können es auch zu Ihrer requirements.txt- oder conda-Umgebungsdatei hinzufügen.

Erstellen Sie ein Verzeichnis test neben Ihrem Quellcodeverzeichnis, und fügen Sie Ihre Unit- und Integrationstests zu diesem Verzeichnis hinzu. Ein Beispiel finden Sie in der Snowpark Python-Projektvorlage.

Erstellen einer PyTest-Fixture für die Snowpark-Sitzung

PyTest-Fixtures sind Funktionen, die vor einem Test (oder einem Modul von Tests) ausgeführt werden, um Daten oder Verbindungen für Tests bereitzustellen. In diesem Szenario erstellen Sie eine PyTest-Fixture, die ein Snowpark-Session-Objekt zurückgibt.

  1. Erstellen Sie ein Verzeichnis test, wenn Sie noch keines haben.

  2. Erstellen Sie dann im Verzeichnis test eine Datei conftest.py mit folgendem Inhalt, wobei connection_parameters ein Dictionary mit den Anmeldeinformationen Ihres Snowflake-Kontos ist. Weitere Informationen zum Dictionary-Format finden Sie unter Erstellen einer Sitzung.

  3. Erstellen Sie die Session-Fixture als Modul-Fixture statt als Datei-Fixture, um zu verhindern, dass mehrere Sessions erstellt werden und Probleme aufgrund von kollidierenden Sitzungsobjekten verursachen.

from snowflake.snowpark.session import Session

@pytest.fixture(scope='module')
def session(request) -> Session:
    connection_parameters = {}
    return Session.builder.configs(...).create()
Copy

Unit-Tests für UDFs

Sie können Ihre Python-UDF-Logik testen, indem Sie den UDF-Handler als generische Python-Methode testen.

  1. Erstellen Sie eine Datei in Ihrem test-Verzeichnis für die UDF-Unit-Tests. Geben Sie der Datei zum Beispiel den Namen test_functions.py.

  2. Importieren Sie die zu testenden Python-Methoden.

  3. Erstellen Sie für jedes Testszenario eine Python-Methode namens test_<Szenario_für_Test>.

Hier ist beispielsweise ein Python-UDF-Handler:

def fahrenheit_to_celsius(temp_f: float) -> float:
    """
    Converts fahrenheit to celsius
    """
    return (float(temp_f) - 32) * (5/9)
Copy

Sie können diese Methode in die Testdatei (test/test_functions.py) importieren und sie als generische Python-Methode testen.

import farenheit_to_celsius

def test_farenheit_to_celsius():
    expected = 0.0
    actual = farenheit_to_celsius(32)
    assert expected == actual
Copy

Unit-Tests für DataFrame-Transformationen

Das Hinzufügen von Unit-Tests für Ihre DataFrame-Transformationen hilft, sich vor unerwarteten Fehlern und Regressionen zu schützen. Damit sich Ihre DataFrame-Logik leichter testen lässt, kapseln Sie die Transformationen in eine Python-Methode, die als Eingabe die zu transformierenden DataFrames erhält und die transformierten DataFrames zurückgibt.

Im folgenden Beispiel enthält mf_df_transformer die Transformationslogik. Es kann in andere Module des Python-Projekts importiert und leicht getestet werden.

from snowflake.snowpark.dataframe import DataFrame, col

def my_df_tranformer(df: DataFrame) -> DataFrame:
    return df \
        .with_column('c', df['a']+df['b']) \
        .filter(col('c') > 3)
Copy

Um diese Transformation zu testen, gehen Sie wie folgt vor:

  1. Erstellen Sie für die DataFrame-Tests eine Datei namens test_transformers.py in dem test-Verzeichnis (test/test_transformers.py).

  2. Erstellen Sie eine Testmethode für den zu prüfenden Transformer: test_my_df_transformer(session). Der Parameter session bezieht sich hier auf die im vorherigen Abschnitt erstellte Sitzungs-Fixture.

  3. Erstellen Sie mithilfe der Sitzungs-Fixture die Eingabe- und die erwarteten Ausgabe-DataFrames innerhalb der Testmethode.

  4. Übergeben Sie das Eingabe-DataFrame an den Transformer, und vergleichen Sie den erwarteten DataFrame mit dem tatsächlichen DataFrame, den der Transformer zurückgibt.

# test/test_transformers.py

import my_df_transformer

def test_my_df_transformer(session):
    input_df = session.create_dataframe([[1,2],[3,4]], ['a', 'b'])
    expected_df = session.create_dataframe([3,4,7], ['a','b','c'])
    actual_df = my_df_transformer(input_df)
    assert input_df.collect() == actual_df.collect()
Copy

Integrationstests für gespeicherte Prozeduren

Zum Testen der Handler der gespeicherten Prozeduren verwenden Sie die Session-Fixture, um den Handler der gespeicherten Prozedur aufzurufen. Wenn Ihre gespeicherte Prozedur aus Tabellen liest, wie z. B. in einer ETL-Pipeline, können Sie diese Tabellen vor dem Aufrufe n des Handlers der gespeicherten Prozedur erstellen, wie im folgenden Beispiel gezeigt. Dieses Muster stellt sicher, dass Ihre Eingabedaten in der Versionskontrolle nachverfolgt werden und sich zwischen den Testausführungen nicht unerwartet ändern.

from project import my_sproc_handler  # import stored proc handler

def test_my_sproc_handler(session: Session):

    # Create input table
    input_tbl = session.create_dataframe(
        data=[...],
        schema=[...],
    )

    input_tbl.write.mode('overwrite').save_as_table(['DB', 'SCHEMA', 'INPUT_TBL'], mode='overwrite')

    # Create expected output dataframe
    expected_df = session.create_dataframe(
        data=[...],
        schema=[...],
    ).collect()

    # Call the stored procedure
    my_sproc_handler()

    # Get actual table
    actual_tbl = session.table(['DB', 'SCHEMA', 'OUTPUT_TBL']).collect()

    # Clean up tables
    session.table(['DB', 'SCHEMA', 'OUTPUT_TBL']).delete()
    session.table(['DB', 'SCHEMA', 'INPUT_TBL']).delete()

    # Compare the actual and expected tables
    assert expected_df == actual_tbl
Copy