Lokales Test-Framework¶
Unter diesem Thema wird erklärt, wie Sie Ihren Code lokal testen können, wenn Sie die Snowpark-Bibliothek verwenden.
Unter diesem Thema:
Das lokale Test-Framework für Snowpark Python bietet Ihnen die Möglichkeit, Snowpark Python-DataFrames lokal zu erstellen und zu nutzen, ohne sich mit einem Snowflake-Konto verbinden zu müssen. Sie können das lokale Test-Framework verwenden, um Ihre DataFrame-Operationen lokal entweder auf Ihrem Entwicklungsrechner oder in einer CI-Pipeline (Pipeline für kontinuierliche Integration) testen, bevor Sie Codeänderungen in Ihrem Konto bereitstellen. Die API ist dieselbe, sodass Sie Ihre Tests entweder lokal oder für ein Snowflake-Konto ausführen können, ohne Codeänderungen vornehmen zu müssen.
Voraussetzungen¶
Für die Verwendung des lokalen Test-Frameworks:
Sie müssen Version 1.11.1 oder höher der Snowpark Python-Bibliothek mit der optionalen Abhängigkeit
pandas
verwenden. Installation durch Ausführen vonpip install "snowflake-snowpark-python[pandas]"
Folgende Versionen von Python werden unterstützt:
3.8
3.9
3.10
3.11
Erstellen einer Sitzung und Aktivieren des lokalen Testens¶
Erstellen Sie zunächst ein Snowpark-Session
-Objekt, und setzen Sie die lokale Testkonfiguration auf True
.
from snowflake.snowpark import Session
session = Session.builder.config('local_testing', True).create()
Nachdem die Sitzung erstellt wurde, können Sie diese zum Erstellen und Bearbeiten von DataFrames verwenden.
df = session.create_dataframe([[1,2],[3,4]],['a','b'])
df.with_column('c', df['a']+df['b']).show()
Laden von Daten¶
Sie können Snowpark-DataFrames aus Python-Primitiven, Dateien und Pandas-DataFrames erstellen. Dies ist nützlich, um die Eingabe und die erwartete Ausgabe von Testfällen zu spezifizieren. Auf diese Weise befinden sich die Daten in der Versionskontrolle, was es einfacher macht, die Testdaten mit den Testfällen synchron zu halten.
Laden von CSV-Daten¶
Sie können CSV-Dateien in einen Snowpark-DataFrame laden, indem Sie zuerst die Datei durch Aufrufen von Session.file.put()
in den speicherinternen Stagingbereich laden und dann Session.read()
verwenden, um den Inhalt zu lesen. Angenommen, es gibt eine Datei data.csv
mit folgendem Inhalt:
col1,col2,col3,col4
1,a,true,1.23
2,b,false,4.56
Mit folgendem Code können Sie data.csv
in einen Snowpark-DataFrame laden. Sie müssen die Datei zunächst in einen Stagingbereich speichern. Andernfalls erhalten Sie die Fehlermeldung, dass die Datei nicht gefunden werden kann.
from snowflake.snowpark.types import StructType, StructField, IntegerType, BooleanType, StringType, DoubleType
# Put file onto stage
session.file.put("data.csv", "@mystage", auto_compress=False)
schema = StructType(
[
StructField("col1", IntegerType()),
StructField("col2", StringType()),
StructField("col3", BooleanType()),
StructField("col4", DoubleType()),
]
)
# with option SKIP_HEADER set to 1, the header will be skipped when the csv file is loaded
dataframe = session.read.schema(schema).option("SKIP_HEADER", 1).csv("@mystage/data.csv")
dataframe.show()
Die Ausgabe von dataframe.show()
ist wie folgt:
-------------------------------------
|"COL1" |"COL2" |"COL3" |"COL4" |
-------------------------------------
|1 |a |True |1.23 |
|2 |b |False |4.56 |
-------------------------------------
Laden von Pandas-Daten¶
Sie können einen Snowpark Python-DataFrame aus einem Pandas-DataFrame erstellen, indem Sie die Methode create_dataframe
aufrufen und die Daten als Pandas-DataFrame übergeben.
import pandas as pd
pandas_df = pd.DataFrame(
data={
"col1": pd.Series(["value1", "value2"]),
"col2": pd.Series([1.23, 4.56]),
"col3": pd.Series([123, 456]),
"col4": pd.Series([True, False]),
}
)
dataframe = session.create_dataframe(data=pandas_df)
dataframe.show()
Die Ausgabe von dataframe.show()
ist wie folgt:
-------------------------------------
|"col1" |"col2" |"col3" |"col4" |
-------------------------------------
|value1 |1.23 |123 |True |
|value2 |4.56 |456 |False |
-------------------------------------
Ein Snowpark Python-DataFrame kann auch in einen Pandas-DataFrame umgewandelt werden, indem die to_pandas
-Methode auf dem DataFrame aufgerufen wird.
from snowflake.snowpark.types import StructType, StructField, StringType, DoubleType, LongType, BooleanType
dataframe = session.create_dataframe(
data=[
["value1", 1.23, 123, True],
["value2", 4.56, 456, False],
],
schema=StructType([
StructField("col1", StringType()),
StructField("col2", DoubleType()),
StructField("col3", LongType()),
StructField("col4", BooleanType()),
])
)
pandas_dataframe = dataframe.to_pandas()
print(pandas_dataframe.to_string())
Der Aufruf von print(pandas_dataframe.to_string())
gibt Folgendes aus:
COL1 COL2 COL3 COL4
0 value1 1.23 123 True
1 value2 4.56 456 False
Erstellen einer PyTest-Fixture für die Sitzung¶
PyTest-Fixtures sind Funktionen, die vor einem Test (oder einem Modul von Tests) ausgeführt werden, typischerweise um Daten oder Verbindungen für Tests bereitzustellen. In diesem Fall erstellen Sie eine Fixture, das ein Snowpark-Session
-Objekt zurückgibt. Erstellen Sie zunächst ein Verzeichnis test
, wenn Sie noch keines haben. Erstellen Sie dann im Verzeichnis test
eine Datei conftest.py
mit folgendem Inhalt, wobei connection_parameters
ein Dictionary mit den Anmeldeinformationen Ihres Snowflake-Kontos ist. Weitere Informationen zum Dictionary-Format finden Sie unter Erstellen einer Sitzung.
# test/conftest.py
import pytest
from snowflake.snowpark.session import Session
def pytest_addoption(parser):
parser.addoption("--snowflake-session", action="store", default="live")
@pytest.fixture(scope='module')
def session(request) -> Session:
if request.config.getoption('--snowflake-session') == 'local':
return Session.builder.config('local_testing', True).create()
else:
return Session.builder.configs(CONNECTION_PARAMETERS).create()
Durch den Aufruf von pytest_addoption
wird dem Befehl pytest
eine Befehlszeilenoption namens snowflake-session
hinzugefügt. Das Session
-Fixture prüft diese Befehlszeilenoption und erstellt je nach ihrem Wert ein lokales oder ein Live-Session
-Objekt. Auf diese Weise können Sie zu Testzwecken problemlos zwischen dem lokalen und dem Live-Modus wechseln.
# Using local mode:
pytest --snowflake-session local
# Using live mode
pytest
SQL-Operationen¶
Session.sql(...)
wird von dem lokalen Test-Framework nicht unterstützt. Verwenden Sie wann immer möglich die Snowpark-DataFrame-APIs, und in Fällen, in denen Sie Session.sql(...)
verwenden müssen, können Sie den tabellarischen Rückgabewert mit unittest.mock.patch
von Python nachahmen, um die erwartete Antwort eines bestimmten Session.sql()
-Aufrufs zu patchen.
Im folgenden Beispiel ordnet mock_sql()
den SQL-Abfragetext der gewünschten DataFrame-Antwort zu. Die folgende bedingte Anweisung prüft, ob in der aktuellen Sitzung lokale Tests verwendet werden, und wendet in diesem Fall den Patch auf die Methode Session.sql()
an.
from unittest import mock
from functools import partial
def test_something(pytestconfig, session):
def mock_sql(session, sql_string): # patch for SQL operations
if sql_string == "select 1,2,3":
return session.create_dataframe([[1,2,3]])
else:
raise RuntimeError(f"Unexpected query execution: {sql_string}")
if pytestconfig.getoption('--snowflake-session') == 'local':
with mock.patch.object(session, 'sql', wraps=partial(mock_sql, session)): # apply patch for SQL operations
assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
else:
assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
Wenn das lokale Testen aktiviert ist, werden alle mit DataFrame.save_as_table()
erstellten Tabellen als temporäre Tabellen im Speicher abgelegt und können mit Session.table()
abgerufen werden. Die unterstützten DataFrame-Operationen können wie gewohnt auf die Tabelle angewendet werden.
Patching der integrierten Funktionen¶
Nicht alle integrierten Funktionen unter snowflake.snowpark.functions
werden vom lokalen Test-Framework unterstützt. Wenn Sie eine Funktion verwenden, die nicht unterstützt wird, müssen Sie das @patch
-Decorator-Element von snowflake.snowpark.mock
verwenden, um einen Patch zu erstellen.
Um die gepatchte Funktion zu definieren und zu implementieren, muss die Signatur (Parameterliste) mit den Parametern der integrierten Funktion übereinstimmen. Das lokale Test-Framework übergibt Parameter an die gepatchte Funktion nach folgenden Regeln:
Bei Parametern des Typs
ColumnOrName
in der Signatur von integrierten Funktionen wirdColumnEmulator
als Parameter der gepatchten Funktionen übergeben.ColumnEmulator
ist vergleichbar mit einempandas.Series
-Objekt, das die Spaltendaten enthält.Bei Parametern des Typs
LiteralType
in der Signatur von integrierten Funktionen wird der Literalwert als Parameter der gepatchten Funktionen übergeben.In allen anderen Fällen wird der Rohwert als Parameter der gepatchten Funktionen übergeben.
Als Rückgabetyp der gepatchten Funktionen wird eine Instanz von ColumnEmulator
erwartet, entsprechend dem Rückgabetyp Column
der integrierten Funktionen.
Beispielsweise könnte die integrierte Funktion to_timestamp()
wie folgt gepatcht werden:
import datetime
from snowflake.snowpark.mock import patch, ColumnEmulator, ColumnType
from snowflake.snowpark.functions import to_timestamp
from snowflake.snowpark.types import TimestampType
@patch(to_timestamp)
def mock_to_timestamp(column: ColumnEmulator, format = None) -> ColumnEmulator:
ret_column = ColumnEmulator(data=[datetime.datetime.strptime(row, '%Y-%m-%dT%H:%M:%S%z') for row in column])
ret_column.sf_type = ColumnType(TimestampType(), True)
return ret_column
Überspringen von Testfällen¶
Wenn Ihre PyTest-Testsuite einen Testfall enthält, der für das lokale Testen nicht gut unterstützt wird, können Sie diese Fälle mit dem mark.skipif
-Decorator-Element von PyTest überspringen. Im folgenden Beispiel wird davon ausgegangen, dass Ihre Sitzung und die Parameter wie oben beschrieben konfiguriert sind. Die Bedingung prüft, ob local_testing_mode
auf local
gesetzt ist. Ist dies der Fall, wird der Testfall übersprungen, wobei eine Meldung den Grund für das Überspringen erklärt.
import pytest
@pytest.mark.skipif(
condition="config.getvalue('local_testing_mode') == 'local'",
reason="Test case disabled for local testing"
)
def test_case(session):
...
Einschränkungen¶
Die folgenden Funktionalitäten werden nicht unterstützt:
Raw-SQL-Zeichenfolgen und -Operationen, die das Parsen von SQL-Zeichenfolgen erfordern. Beispielsweise werden
session.sql
undDataFrame.filter("col1 > 12")
nicht unterstützt.UDFs, UDTFs und gespeicherte Prozeduren
Tabellenfunktionen
AsyncJobs
Sitzungsoperationen, wie das Ändern von Warehouses, Schemas und anderen Sitzungseigenschaften
Datentypen
Geometry
undGeography
Aggregieren von Fensterfunktionen
# Selecting window function expressions is supported df.select("key", "value", sum_("value").over(), avg("value").over()) # Aggregating window function expressions is NOT supported df.group_by("key").agg([sum_("value"), sum_(sum_("value")).over(window) - sum_("value")])
Weiterhin bestehen folgende Einschränkungen:
Für die Datentypen
Variant
,Array
undObject
wird nur die Standard-JSON-Kodierung und -Dekodierung unterstützt. Ausdrücke wie [1,2,,3,] werden in Snowflake als gültiges JSON betrachtet, nicht aber beim lokalen Testen, wo die Python-integrierten JSON-Funktionalitäten verwendet werden. Sie können die Variablen auf Modulebenesnowflake.snowpark.mock.CUSTOM_JSON_ENCODER
undsnowflake.snowpark.mock.CUSTOM_JSON_DECODER
angeben, um die Standardeinstellungen außer Kraft zu setzen.Nur eine Teilmenge der Funktionen von Snowflake (einschließlich der Fensterfunktionen) ist implementiert. Unter Patching der integrierten Funktionen erfahren Sie, wie Sie Ihre eigene Funktionsdefinition einfügen können.
Das Patchen von rangbezogenen Funktionen wird derzeit nicht unterstützt.
Wenn Sie Spalten mit demselben Namen auswählen, wird nur eine Spalte zurückgegeben. Als Problemumgehung können Sie die Spalten mit
Column.alias
so umbenennen, dass sie eindeutige Namen haben.df.select(lit(1), lit(1)).show() # col("a"), col("a") #--------- #|"'1'" | #--------- #|1 | #|... | #--------- # Workaround: Column.alias DataFrame.select(lit(1).alias("col1_1"), lit(1).alias("col1_2")) # "col1_1", "col1_2"
Explizite Typumwandlung mit
Column.cast
hat die Einschränkung, dass Formatzeichenfolgen nicht unterstützt werden. Dies betrifft für Eingabento_decimal
,to_number
,to_numeric
,to_double
,to_date
,to_time
,to_timestamp
und für Ausgabento_char
,to_varchar
,to_binary
.JSON-Zeichenfolgen, die in
VariantType
gespeichert sind, können nicht inDatetime
-Typen konvertiert werden.Bei
Table.merge
undTable.update
unterstützt die Implementierung nur das Verhalten, wenn die SitzungsparameterERROR_ON_NONDETERMINISTIC_UPDATE
undERROR_ON_NONDETERMINISTIC_MERGE
aufFalse
gesetzt sind. Das bedeutet, dass bei Mehrfachverknüpfungen (Multi-Joins) eine der übereinstimmenden Zeilen aktualisiert wird.
Liste der unterstützten APIs¶
Snowpark-Sitzung¶
Session.createDataFrame
Session.create_dataframe
Session.flatten
Session.range
Session.table
Eingabe/Ausgabe¶
DataFrameReader.csv
DataFrameReader.table
DataFrameWriter.saveAsTable
DataFrameWriter.save_as_table
DataFrame¶
DataFrame.agg
DataFrame.cache_result
DataFrame.col
DataFrame.collect
DataFrame.collect_nowait
DataFrame.copy_into_table
DataFrame.count
DataFrame.createOrReplaceTempView
DataFrame.createOrReplaceView
DataFrame.create_or_replace_temp_view
DataFrame.create_or_replace_view
DataFrame.crossJoin
DataFrame.cross_join
DataFrame.distinct
DataFrame.drop
DataFrame.dropDuplicates
DataFrame.drop_duplicates
DataFrame.dropna
DataFrame.except_
DataFrame.explain
DataFrame.fillna
DataFrame.filter
DataFrame.first
DataFrame.groupBy
DataFrame.group_by
DataFrame.intersect
DataFrame.join
DataFrame.limit
DataFrame.minus
DataFrame.natural_join
DataFrame.orderBy
DataFrame.order_by
DataFrame.rename
DataFrame.replace
DataFrame.rollup
DataFrame.sample
DataFrame.select
DataFrame.show
DataFrame.sort
DataFrame.subtract
DataFrame.take
DataFrame.toDF
DataFrame.toLocalIterator
DataFrame.toPandas
DataFrame.to_df
DataFrame.to_local_iterator
DataFrame.to_pandas
DataFrame.to_pandas_batches
DataFrame.union
DataFrame.unionAll
DataFrame.unionAllByName
DataFrame.unionByName
DataFrame.union_all
DataFrame.union_all_by_name
DataFrame.union_by_name
DataFrame.unpivot
DataFrame.where
DataFrame.withColumn
DataFrame.withColumnRenamed
DataFrame.with_column
DataFrame.with_column_renamed
DataFrame.with_columns
DataFrameNaFunctions.drop
DataFrameNaFunctions.fill
DataFrameNaFunctions.replace
Spalte¶
Column.alias
Column.as_
Column.asc
Column.asc_nulls_first
Column.asc_nulls_last
Column.astype
Column.between
Column.bitand
Column.bitor
Column.bitwiseAnd
Column.bitwiseOR
Column.bitwiseXOR
Column.bitxor
Column.cast
Column.collate
Column.desc
Column.desc_nulls_first
Column.desc_nulls_last
Column.endswith
Column.eqNullSafe
Column.equal_nan
Column.equal_null
Column.getItem
Column.getName
Column.get_name
Column.in_
Column.isNotNull
Column.isNull
Column.is_not_null
Column.is_null
Column.isin
Column.like
Column.name
Column.over
Column.regexp
Column.rlike
Column.startswith
Column.substr
Column.substring
Column.try_cast
Column.within_group
CaseExpr.when
CaseExpr.otherwise
Datentypen¶
ArrayType
BinaryType
BooleanType
ByteType
ColumnIdentifier
DataType
DateType
DecimalType
DoubleType
FloatType
IntegerType
LongType
MapType
NullType
ShortType
StringType
StructField
StructType
Timestamp
TimestampType
TimeType
Variant
VariantType
Zeile¶
Row.asDict
Row.as_dict
Row.count
Row.index
Funktion¶
abs
avg
coalesce
contains
count
count_distinct
covar_pop
endswith
first_value
iff
lag
last_value
lead
list_agg
max
median
min
parse_json
row_number
startswith
substring
sum
to_array
to_binary
to_boolean
to_char
to_date
to_decimal
to_double
to_object
to_time
to_timestamp
to_variant
Window¶
Window.orderBy
Window.order_by
Window.partitionBy
Window.partition_by
Window.rangeBetween
Window.range_between
Window.rowsBetween
Window.rows_between
WindowSpec.orderBy
WindowSpec.order_by
WindowSpec.partitionBy
WindowSpec.partition_by
WindowSpec.rangeBetween
WindowSpec.range_between
WindowSpec.rowsBetween
WindowSpec.rows_between
Gruppieren¶
RelationalGroupedDataFrame.agg
RelationalGroupedDataFrame.apply_in_pandas
RelationalGroupedDataFrame.applyInPandas
RelationalGroupedDataFrame.avg
RelationalGroupedDataFrame.builtin
RelationalGroupedDataFrame.count
RelationalGroupedDataFrame.function
RelationalGroupedDataFrame.max
RelationalGroupedDataFrame.mean
RelationalGroupedDataFrame.median
RelationalGroupedDataFrame.min
RelationalGroupedDataFrame.sum
Tabelle¶
Table.delete
Table.drop_table
Table.merge
Table.sample
Table.update
WhenMatchedClause.delete
WhenMatchedClause.update
WhenNotMatchedClause.insert