Lokales Test-Framework¶
Unter diesem Thema wird erklärt, wie Sie Ihren Code lokal testen können, wenn Sie die Snowpark Python-Bibliothek verwenden.
Unter diesem Thema:
Das lokale Test-Framework für Snowpark Python ist ein Emulator, der Ihnen die Möglichkeit bietet, Snowpark Python-DataFrames lokal zu erstellen und zu nutzen, ohne sich mit einem Snowflake-Konto verbinden zu müssen. Sie können das lokale Test-Framework verwenden, um Ihre DataFrame-Operationen lokal entweder auf Ihrem Entwicklungsrechner oder in einer CI-Pipeline (Pipeline für kontinuierliche Integration) testen, bevor Sie Codeänderungen in Ihrem Konto bereitstellen. Die API ist dieselbe, so dass Sie Ihre Tests entweder lokal oder gegen ein Snowflake-Konto ausführen können, ohne Codeänderungen vornehmen zu müssen.
Voraussetzungen¶
Für die Verwendung des lokalen Test-Frameworks:
Sie müssen die Version 1.18.0 oder höher der Snowpark Python-Bibliothek mit der optionalen Abhängigkeit
localtest
verwenden.Folgende Versionen von Python werden unterstützt:
3.9
3.10
3.11
3,12
Installieren Sie die Snowpark Python-Bibliothek¶
Um die Bibliothek mit der optionalen Abhängigkeit zu installieren, führen Sie den folgenden Befehl aus:
pip install "snowflake-snowpark-python[localtest]"
Erstellen Sie eine Sitzung und aktivieren Sie lokales Testen¶
Erstellen Sie einen Snowpark
Session
, und setzen Sie die lokale Konfiguration für das Testen aufTrue
:from snowflake.snowpark import Session session = Session.builder.config('local_testing', True).create()
Verwenden Sie die Sitzung, um DataFrames zu erstellen und zu bedienen:
df = session.create_dataframe([[1,2],[3,4]],['a','b']) df.with_column('c', df['a']+df['b']).show()
Laden von Daten¶
Sie können Snowpark-DataFrames aus Python-Primitiven, Dateien und pandas-DataFrames erstellen. Dies ist nützlich, um die Eingabe und die erwartete Ausgabe von Testfällen zu spezifizieren. Bei dieser Methode befinden sich die Daten in der Quellcodekontrolle, was es einfacher macht, die Testdaten mit den Testfällen synchron zu halten.
CSV-Daten laden¶
Um CSV-Dateien in einen Snowpark-DataFrame zu laden, rufen Sie zunächst
Session.file.put()
auf, um die Datei in den Stagingbereich zu laden, und verwenden dannSession.read()
, um den Inhalt zu lesen.
Beispiel
Angenommen, es gibt eine Datei data.csv
mit folgendem Inhalt:
col1,col2,col3,col4
1,a,true,1.23
2,b,false,4.56
Sie können den folgenden Code verwenden, um data.csv
in einen Snowpark-DataFrame zu laden. Sie müssen die Datei zuerst in einen Stagingbereich speichern. Andernfalls erhalten Sie den Fehler „Datei kann nicht gefunden werden“.
from snowflake.snowpark.types import StructType, StructField, IntegerType, BooleanType, StringType, DoubleType
# Put file onto stage
session.file.put("data.csv", "@mystage", auto_compress=False)
schema = StructType(
[
StructField("col1", IntegerType()),
StructField("col2", StringType()),
StructField("col3", BooleanType()),
StructField("col4", DoubleType()),
]
)
# with option SKIP_HEADER set to 1, the header will be skipped when the csv file is loaded
dataframe = session.read.schema(schema).option("SKIP_HEADER", 1).csv("@mystage/data.csv")
dataframe.show()
Erwartete Ausgabe:
-------------------------------------
|"COL1" |"COL2" |"COL3" |"COL4" |
-------------------------------------
|1 |a |True |1.23 |
|2 |b |False |4.56 |
-------------------------------------
Laden von Daten in Pandas¶
Um ein Snowpark Python-DataFrame aus einem pandas-DataFrame zu erstellen, rufen Sie die Methode
create_dataframe
auf und übergeben die Daten als pandas-DataFrame.
Beispiel
import pandas as pd
pandas_df = pd.DataFrame(
data={
"col1": pd.Series(["value1", "value2"]),
"col2": pd.Series([1.23, 4.56]),
"col3": pd.Series([123, 456]),
"col4": pd.Series([True, False]),
}
)
dataframe = session.create_dataframe(data=pandas_df)
dataframe.show()
Erwartete Ausgabe:
-------------------------------------
|"col1" |"col2" |"col3" |"col4" |
-------------------------------------
|value1 |1.23 |123 |True |
|value2 |4.56 |456 |False |
-------------------------------------
Um eine Snowpark Python DataFrame in eine pandas-DataFrame zu konvertieren, rufen Sie die Methode
to_pandas
auf dem DataFrame auf.
Beispiel
from snowflake.snowpark.types import StructType, StructField, StringType, DoubleType, LongType, BooleanType
dataframe = session.create_dataframe(
data=[
["value1", 1.23, 123, True],
["value2", 4.56, 456, False],
],
schema=StructType([
StructField("col1", StringType()),
StructField("col2", DoubleType()),
StructField("col3", LongType()),
StructField("col4", BooleanType()),
])
)
pandas_dataframe = dataframe.to_pandas()
print(pandas_dataframe.to_string())
Erwartete Ausgabe:
COL1 COL2 COL3 COL4
0 value1 1.23 123 True
1 value2 4.56 456 False
Erstellen Sie eine PyTest-Fixture für eine Sitzung¶
PyTest-Fixtures sind Funktionen, die vor einem Test (oder einem Modul von Tests) ausgeführt werden, typischerweise um Daten oder Verbindungen für Tests bereitzustellen. In dieser Prozedur erstellen Sie ein Fixture, das ein Snowpark-Session
-Objekt zurückgibt.
Wenn Sie noch kein
test
-Verzeichnis haben, erstellen Sie eines.Erstellen Sie im
test
-Verzeichnis eine Datei namensconftest.py
mit folgendem Inhalt, wobeiconnection_parameters
ein Dictionary mit Ihren Anmeldeinformationen für das Snowflake-Konto ist:# test/conftest.py import pytest from snowflake.snowpark.session import Session def pytest_addoption(parser): parser.addoption("--snowflake-session", action="store", default="live") @pytest.fixture(scope='module') def session(request) -> Session: if request.config.getoption('--snowflake-session') == 'local': return Session.builder.config('local_testing', True).create() else: return Session.builder.configs(CONNECTION_PARAMETERS).create()
Weitere Informationen zum Dictionary-Format finden Sie unter Erstellen einer Sitzung.
Durch den Aufruf von pytest_addoption
wird dem Befehl pytest
eine Befehlszeilenoption namens snowflake-session
hinzugefügt. Das Session
-Fixture prüft diese Befehlszeilenoption und erstellt je nach ihrem Wert ein lokales oder ein Live-Session
-Objekt. So können Sie zum Testen ganz einfach zwischen dem lokalen und dem Live-Modus wechseln, wie in den folgenden Beispielen aus der Befehlszeile gezeigt:
# Using local mode:
pytest --snowflake-session local
# Using live mode
pytest
SQL-Operationen¶
Session.sql(...)
wird von dem lokalen Test-Framework nicht unterstützt. Verwenden Sie Snowparks-DataFrame APIs, wann immer dies möglich ist, und in Fällen, in denen Sie Session.sql(...)
verwenden müssen, können Sie den tabellarischen Rückgabewert nachahmen, indem Sie unittest.mock.patch
von Python verwenden, um die erwartete Antwort von einem bestimmten Session.sql()
-Aufruf zu patchen.
Im folgenden Beispiel ordnet mock_sql()
den SQL-Abfragetext der gewünschten DataFrame-Antwort zu. Die bedingte Anweisung prüft, ob in der aktuellen Sitzung lokale Tests verwendet werden, und wendet in diesem Fall den Patch auf die Session.sql()
-Methode an.
from unittest import mock
from functools import partial
def test_something(pytestconfig, session):
def mock_sql(session, sql_string): # patch for SQL operations
if sql_string == "select 1,2,3":
return session.create_dataframe([[1,2,3]])
else:
raise RuntimeError(f"Unexpected query execution: {sql_string}")
if pytestconfig.getoption('--snowflake-session') == 'local':
with mock.patch.object(session, 'sql', wraps=partial(mock_sql, session)): # apply patch for SQL operations
assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
else:
assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
Wenn das lokale Testen aktiviert ist, werden alle mit DataFrame.save_as_table()
erstellten Tabellen als temporäre Tabellen im Speicher abgelegt und können mit Session.table()
abgerufen werden. Die unterstützten DataFrame-Operationen können wie gewohnt auf die Tabelle angewendet werden.
Patching der integrierten Funktionen¶
Einige der eingebauten Funktionen unter snowflake.snowpark.functions
werden vom lokalen Test-Framework nicht unterstützt. Wenn Sie eine Funktion verwenden, die nicht unterstützt wird, können Sie den @patch
-Decorator von snowflake.snowpark.mock
verwenden, um einen Patch zu erstellen.
Damit die gepatchte Funktion definiert und implementiert werden kann, muss die Signatur (Parameterliste) mit den Parametern der integrierten Funktion übereinstimmen. Das lokale Test-Framework übergibt Parameter an die gepatchte Funktion nach folgenden Regeln:
Bei Parametern des Typs
ColumnOrName
in der Signatur von integrierten Funktionen wirdColumnEmulator
als Parameter der gepatchten Funktionen übergeben.ColumnEmulator
ist vergleichbar mit einempandas.Series
-Objekt, das die Spaltendaten enthält.Bei Parametern des Typs
LiteralType
in der Signatur von integrierten Funktionen wird der Literalwert als Parameter der gepatchten Funktionen übergeben.In allen anderen Fällen wird der Rohwert als Parameter der gepatchten Funktionen übergeben.
Als Rückgabetyp der gepatchten Funktionen wird eine Instanz von ColumnEmulator
erwartet, entsprechend dem Rückgabetyp Column
der integrierten Funktionen.
Beispielsweise könnte die integrierte Funktion to_timestamp()
wie folgt gepatcht werden:
import datetime
from snowflake.snowpark.mock import patch, ColumnEmulator, ColumnType
from snowflake.snowpark.functions import to_timestamp
from snowflake.snowpark.types import TimestampType
@patch(to_timestamp)
def mock_to_timestamp(column: ColumnEmulator, format = None) -> ColumnEmulator:
ret_column = ColumnEmulator(data=[datetime.datetime.strptime(row, '%Y-%m-%dT%H:%M:%S%z') for row in column])
ret_column.sf_type = ColumnType(TimestampType(), True)
return ret_column
Überspringen von Testfällen¶
Wenn Ihre PyTest-Testsuite einen Testfall enthält, der von lokalen Tests nicht gut unterstützt wird, können Sie diese Fälle überspringen, indem Sie den mark.skipif
-Decorator von PyTest verwenden. Im folgenden Beispiel wird davon ausgegangen, dass Ihre Sitzung und die Parameter wie oben beschrieben konfiguriert sind. Die Bedingung prüft, ob local_testing_mode
auf local
gesetzt ist. Ist dies der Fall, wird der Testfall mit einer erklärenden Meldung übersprungen.
import pytest
@pytest.mark.skipif(
condition="config.getvalue('local_testing_mode') == 'local'",
reason="Test case disabled for local testing"
)
def test_case(session):
...
Registrieren von UDFs und gespeicherten Prozeduren¶
Sie können benutzerdefinierte Funktionen (UDFs) und gespeicherte Prozeduren im lokalen Test-Framework erstellen und aufrufen. Zum Erstellen der Objekte können Sie folgende Syntaxoptionen verwenden:
Syntax |
UDF |
Gespeicherte Prozeduren |
---|---|---|
Decorator-Elemente |
|
|
Register-Methoden |
|
|
Register-from-file-Methoden |
|
|
Beispiel
Das folgende Codebeispiel erstellt eine UDF und eine gespeicherte Prozedur unter Verwendung der Decorator-Elemente und ruft dann beide über den Namen auf:
from snowflake.snowpark.session import Session
from snowflake.snowpark.dataframe import col, DataFrame
from snowflake.snowpark.functions import udf, sproc, call_udf
from snowflake.snowpark.types import IntegerType, StringType
# Create local session
session = Session.builder.config('local_testing', True).create()
# Create local table
table = 'example'
session.create_dataframe([[1,2],[3,4]],['a','b']).write.save_as_table(table)
# Register a UDF, which is called from the stored procedure
@udf(name='example_udf', return_type=IntegerType(), input_types=[IntegerType(), IntegerType()])
def example_udf(a, b):
return a + b
# Register stored procedure
@sproc(name='example_proc', return_type=IntegerType(), input_types=[StringType()])
def example_proc(session, table_name):
return session.table(table_name)\
.with_column('c', call_udf('example_udf', col('a'), col('b')))\
.count()
# Call the stored procedure by name
output = session.call('example_proc', table)
print(output)
Einschränkungen¶
Die folgende Liste enthält die bekannten Beschränkungen und Verhaltenslücken im lokalen Test-Framework. Snowflake hat derzeit keine Pläne, diese Punkte zu beheben.
Raw-SQL-Zeichenfolgen und -Operationen, die das Parsen von SQL-Zeichenfolgen erfordern, wie
session.sql
undDataFrame.filter("col1 > 12")
, werden nicht unterstützt.Asynchrone Operationen werden nicht unterstützt.
Datenbankobjekte wie Tabellen, gespeicherte Prozeduren und UDFs werden nicht über die Sitzungsebene hinaus persistiert, und alle Operationen werden im Arbeitsspeicher ausgeführt. So sind beispielsweise permanente gespeicherte Prozeduren, die in einer Mock-Sitzung registriert sind, für andere Mock-Sitzungen nicht sichtbar.
Verwandet Features zur Zeichenfolgensortierung wie
Column.collate
werden nicht unterstützt.Für die Datentypen
Variant
,Array
undObject
wird nur die Standard-JSON-Kodierung und -Dekodierung unterstützt. Ausdrücke wie [1,2,,3,] werden in Snowflake als gültiges JSON betrachtet, nicht aber beim lokalen Testen, wo die Python-integrierten JSON-Funktionalitäten verwendet werden. Sie können die Variablen auf Modulebenesnowflake.snowpark.mock.CUSTOM_JSON_ENCODER
undsnowflake.snowpark.mock.CUSTOM_JSON_DECODER
angeben, um die Standardeinstellungen außer Kraft zu setzen.Nur eine Teilmenge der Funktionen von Snowflake (einschließlich der Fensterfunktionen) ist implementiert. Unter Patching der integrierten Funktionen erfahren Sie, wie Sie Ihre eigene Funktionsdefinition einfügen können.
Das Patchen von rangbezogenen Funktionen wird derzeit nicht unterstützt.
SQL-Formatmodelle werden nicht unterstützt. Die Mock-Implementierung von
to_decimal
verarbeitet zum Beispiel nicht den optionalen Parameterformat
.Die Snowpark Python-Bibliothek verfügt nicht über ein integrierte Python-API, um Stagingbereiche zu erstellen oder zu löschen. Daher geht das lokale Test-Framework davon aus, dass jeder eingehende Stagingbereich bereits erstellt wurde.
Die aktuelle Implementierung von UDFs und gespeicherten Prozeduren führt keine Paketvalidierung durch. Alle Pakete, die in Ihrem Code referenziert werden, müssen installiert sein, bevor das Programm ausgeführt wird.
Abfrage-Tags werden nicht unterstützt.
Der Abfrageverlauf wird nicht unterstützt.
Die Herkunft wird nicht unterstützt.
Wenn eine UDF oder gespeicherte Prozedur registriert wird, werden optionale Parameter wie
parallel
,execute_as
,statement_params
,source_code_display
,external_access_integrations
,secrets
undcomment
ignoriert.Die Beispiele für
Table.sample
, SYSTEM oder BLOCK sind die gleichen wie die Beispiele für ROW.Snowflake unterstützt offiziell nicht die Ausführung des lokalen Test-Frameworks innerhalb von gespeicherten Prozeduren. Bei Sitzungen im lokalen Testmodus innerhalb von gespeicherten Prozeduren können unerwartete Fehler auftreten oder ausgelöst werden.
Nicht unterstützte Features¶
Im Folgenden finden Sie eine Liste von Features, die derzeit nicht im lokalen Test-Framework implementiert sind. Snowflake arbeitet aktiv daran, diese Punkte anzugehen.
Im Allgemeinen sollte jede Referenz auf diese Funktionalitäten einen NotImplementedError
-Fehler auslösen:
UDTFs (benutzerdefinierte Tabellenfunktionen)
UDAFs (benutzerdefinierte Aggregatfunktionen)
Vektorisierte UDFs und UDTFs
Integrierte Tabellenfunktionen
Gespeicherte Tabellen-Prozeduren
Datentypen
Geometry
,Geography
undVector
Intervall-Ausdrücke
Lesen von Dateiformaten, die nicht JSON oder CSV sind
Für ein unterstütztes Dateiformat werden nicht alle Leseoptionen unterstützt. Beispielsweise wird
infer_schema
für das CSV-Format nicht unterstützt.
Für alle Funktionen, die hier nicht als nicht unterstützt oder als bekannte Einschränkung aufgeführt sind, prüfen Sie die aktuelle Liste der Feature-Anfragen für lokale Tests, oder erstellen Sie eine Feature-Anfrage im snowpark-python
-GitHub-Repository.
Bekannte Probleme¶
Im Folgenden finden Sie eine Liste bekannter Probleme oder Verhaltenslücken, die im lokalen Test-Framework bestehen. Snowflake plant aktiv, diese Probleme zu beheben.
Die Verwendung von Fensterfunktionen innerhalb von
DataFrame.groupby
oder anderen Aggregationsoperationen wird nicht unterstützt.# Selecting window function expressions is supported df.select("key", "value", sum_("value").over(), avg("value").over()) # Aggregating window function expressions is NOT supported df.group_by("key").agg([sum_("value"), sum_(sum_("value")).over(window) - sum_("value")])
Wenn Sie Spalten mit demselben Namen auswählen, wird nur eine Spalte zurückgegeben. Als Problemumgehung können Sie
Column.alias
verwenden, um die Spalten so umzubenennen, dass sie eindeutige Namen haben.df.select(lit(1), lit(1)).show() # col("a"), col("a") #--------- #|"'1'" | #--------- #|1 | #|... | #--------- # Workaround: Column.alias DataFrame.select(lit(1).alias("col1_1"), lit(1).alias("col1_2")) # "col1_1", "col1_2"
Für
Table.merge
undTable.update
müssen die SitzungsparameterERROR_ON_NONDETERMINISTIC_UPDATE
undERROR_ON_NONDETERMINISTIC_MERGE
aufFalse
gesetzt werden. Das bedeutet, dass bei Multi-Joins eine der übereinstimmenden Zeilen aktualisiert wird.Vollqualifizierte Stagingbereichsnamen in GET- und PUT-Dateioperationen werden nicht unterstützt. Datenbank- und Schemanamen werden als Teil des Stagingbereichsnamens behandelt.
Die Implementierung von
mock_to_char
unterstützt nur Zeitstempel in einem Format, das Trennzeichen zwischen den verschiedenen Zeitwerten enthält.DataFrame.pivot
hat einen Parameter namensvalues
, mit dem ein Pivot auf bestimmte Werte beschränkt werden kann. Derzeit können nur statistisch definierte Werte verwendet werden. Werte, die über eine Unterabfrage bereitgestellt werden, lösen einen Fehler aus.Das Erstellen eines
DataFrame
aus einem pandas-DataFrame
, das einen Zeitstempel mit Zeitzoneninformationen enthält, wird nicht unterstützt.
Bei Problemen, die nicht in dieser Liste aufgeführt sind, prüfen Sie die aktuellen Liste der offenen Probleme oder erstellen Sie einen Fehlerbericht im snowpark-python
-GitHub-Repository.