Lokales Test-Framework

Unter diesem Thema wird erklärt, wie Sie Ihren Code lokal testen können, wenn Sie die Snowpark-Bibliothek verwenden.

Unter diesem Thema:

Das lokale Test-Framework für Snowpark Python bietet Ihnen die Möglichkeit, Snowpark Python-DataFrames lokal zu erstellen und zu nutzen, ohne sich mit einem Snowflake-Konto verbinden zu müssen. Sie können das lokale Test-Framework verwenden, um Ihre DataFrame-Operationen lokal entweder auf Ihrem Entwicklungsrechner oder in einer CI-Pipeline (Pipeline für kontinuierliche Integration) testen, bevor Sie Codeänderungen in Ihrem Konto bereitstellen. Die API ist dieselbe, sodass Sie Ihre Tests entweder lokal oder für ein Snowflake-Konto ausführen können, ohne Codeänderungen vornehmen zu müssen.

Voraussetzungen

Für die Verwendung des lokalen Test-Frameworks:

  • Sie müssen Version 1.11.1 oder höher der Snowpark Python-Bibliothek mit der optionalen Abhängigkeit pandas verwenden. Installation durch Ausführen von pip install "snowflake-snowpark-python[pandas]"

  • Folgende Versionen von Python werden unterstützt:

    • 3.8

    • 3.9

    • 3.10

    • 3.11

Erstellen einer Sitzung und Aktivieren des lokalen Testens

Erstellen Sie zunächst ein Snowpark-Session-Objekt, und setzen Sie die lokale Testkonfiguration auf True.

from snowflake.snowpark import Session

session = Session.builder.config('local_testing', True).create()
Copy

Nachdem die Sitzung erstellt wurde, können Sie diese zum Erstellen und Bearbeiten von DataFrames verwenden.

df = session.create_dataframe([[1,2],[3,4]],['a','b'])
df.with_column('c', df['a']+df['b']).show()
Copy

Laden von Daten

Sie können Snowpark-DataFrames aus Python-Primitiven, Dateien und Pandas-DataFrames erstellen. Dies ist nützlich, um die Eingabe und die erwartete Ausgabe von Testfällen zu spezifizieren. Auf diese Weise befinden sich die Daten in der Versionskontrolle, was es einfacher macht, die Testdaten mit den Testfällen synchron zu halten.

Laden von CSV-Daten

Sie können CSV-Dateien in einen Snowpark-DataFrame laden, indem Sie zuerst die Datei durch Aufrufen von Session.file.put() in den speicherinternen Stagingbereich laden und dann Session.read() verwenden, um den Inhalt zu lesen. Angenommen, es gibt eine Datei data.csv mit folgendem Inhalt:

col1,col2,col3,col4
1,a,true,1.23
2,b,false,4.56
Copy

Mit folgendem Code können Sie data.csv in einen Snowpark-DataFrame laden. Sie müssen die Datei zunächst in einen Stagingbereich speichern. Andernfalls erhalten Sie die Fehlermeldung, dass die Datei nicht gefunden werden kann.

from snowflake.snowpark.types import StructType, StructField, IntegerType, BooleanType, StringType, DoubleType


# Put file onto stage
session.file.put("data.csv", "@mystage", auto_compress=False)
schema = StructType(
    [
        StructField("col1", IntegerType()),
        StructField("col2", StringType()),
        StructField("col3", BooleanType()),
        StructField("col4", DoubleType()),
    ]
)

# with option SKIP_HEADER set to 1, the header will be skipped when the csv file is loaded
dataframe = session.read.schema(schema).option("SKIP_HEADER", 1).csv("@mystage/data.csv")
dataframe.show()
Copy

Die Ausgabe von dataframe.show() ist wie folgt:

-------------------------------------
|"COL1"  |"COL2"  |"COL3"  |"COL4"  |
-------------------------------------
|1       |a       |True    |1.23    |
|2       |b       |False   |4.56    |
-------------------------------------

Laden von Pandas-Daten

Sie können einen Snowpark Python-DataFrame aus einem Pandas-DataFrame erstellen, indem Sie die Methode create_dataframe aufrufen und die Daten als Pandas-DataFrame übergeben.

import pandas as pd

pandas_df = pd.DataFrame(
    data={
        "col1": pd.Series(["value1", "value2"]),
        "col2": pd.Series([1.23, 4.56]),
        "col3": pd.Series([123, 456]),
        "col4": pd.Series([True, False]),
    }
)

dataframe = session.create_dataframe(data=pandas_df)
dataframe.show()
Copy

Die Ausgabe von dataframe.show() ist wie folgt:

-------------------------------------
|"col1"  |"col2"  |"col3"  |"col4"  |
-------------------------------------
|value1  |1.23    |123     |True    |
|value2  |4.56    |456     |False   |
-------------------------------------

Ein Snowpark Python-DataFrame kann auch in einen Pandas-DataFrame umgewandelt werden, indem die to_pandas-Methode auf dem DataFrame aufgerufen wird.

from snowflake.snowpark.types import StructType, StructField, StringType, DoubleType, LongType, BooleanType

dataframe = session.create_dataframe(
    data=[
        ["value1", 1.23, 123, True],
        ["value2", 4.56, 456, False],
    ],
    schema=StructType([
        StructField("col1", StringType()),
        StructField("col2", DoubleType()),
        StructField("col3", LongType()),
        StructField("col4", BooleanType()),
    ])
)

pandas_dataframe = dataframe.to_pandas()
print(pandas_dataframe.to_string())
Copy

Der Aufruf von print(pandas_dataframe.to_string()) gibt Folgendes aus:

    COL1  COL2  COL3   COL4
0  value1  1.23   123   True
1  value2  4.56   456  False

Erstellen einer PyTest-Fixture für die Sitzung

PyTest-Fixtures sind Funktionen, die vor einem Test (oder einem Modul von Tests) ausgeführt werden, typischerweise um Daten oder Verbindungen für Tests bereitzustellen. In diesem Fall erstellen Sie eine Fixture, das ein Snowpark-Session-Objekt zurückgibt. Erstellen Sie zunächst ein Verzeichnis test, wenn Sie noch keines haben. Erstellen Sie dann im Verzeichnis test eine Datei conftest.py mit folgendem Inhalt, wobei connection_parameters ein Dictionary mit den Anmeldeinformationen Ihres Snowflake-Kontos ist. Weitere Informationen zum Dictionary-Format finden Sie unter Erstellen einer Sitzung.

# test/conftest.py
import pytest
from snowflake.snowpark.session import Session

def pytest_addoption(parser):
    parser.addoption("--snowflake-session", action="store", default="live")

@pytest.fixture(scope='module')
def session(request) -> Session:
    if request.config.getoption('--snowflake-session') == 'local':
        return Session.builder.config('local_testing', True).create()
    else:
        return Session.builder.configs(CONNECTION_PARAMETERS).create()
Copy

Durch den Aufruf von pytest_addoption wird dem Befehl pytest eine Befehlszeilenoption namens snowflake-session hinzugefügt. Das Session-Fixture prüft diese Befehlszeilenoption und erstellt je nach ihrem Wert ein lokales oder ein Live-Session-Objekt. Auf diese Weise können Sie zu Testzwecken problemlos zwischen dem lokalen und dem Live-Modus wechseln.

# Using local mode:
pytest --snowflake-session local

# Using live mode
pytest
Copy

SQL-Operationen

Session.sql(...) wird von dem lokalen Test-Framework nicht unterstützt. Verwenden Sie wann immer möglich die Snowpark-DataFrame-APIs, und in Fällen, in denen Sie Session.sql(...) verwenden müssen, können Sie den tabellarischen Rückgabewert mit unittest.mock.patch von Python nachahmen, um die erwartete Antwort eines bestimmten Session.sql()-Aufrufs zu patchen.

Im folgenden Beispiel ordnet mock_sql() den SQL-Abfragetext der gewünschten DataFrame-Antwort zu. Die folgende bedingte Anweisung prüft, ob in der aktuellen Sitzung lokale Tests verwendet werden, und wendet in diesem Fall den Patch auf die Methode Session.sql() an.

from unittest import mock
from functools import partial

def test_something(pytestconfig, session):

    def mock_sql(session, sql_string):  # patch for SQL operations
        if sql_string == "select 1,2,3":
            return session.create_dataframe([[1,2,3]])
        else:
            raise RuntimeError(f"Unexpected query execution: {sql_string}")

    if pytestconfig.getoption('--snowflake-session') == 'local':
        with mock.patch.object(session, 'sql', wraps=partial(mock_sql, session)): # apply patch for SQL operations
            assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
    else:
        assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
Copy

Wenn das lokale Testen aktiviert ist, werden alle mit DataFrame.save_as_table() erstellten Tabellen als temporäre Tabellen im Speicher abgelegt und können mit Session.table() abgerufen werden. Die unterstützten DataFrame-Operationen können wie gewohnt auf die Tabelle angewendet werden.

Patching der integrierten Funktionen

Nicht alle integrierten Funktionen unter snowflake.snowpark.functions werden vom lokalen Test-Framework unterstützt. Wenn Sie eine Funktion verwenden, die nicht unterstützt wird, müssen Sie das @patch-Decorator-Element von snowflake.snowpark.mock verwenden, um einen Patch zu erstellen.

Um die gepatchte Funktion zu definieren und zu implementieren, muss die Signatur (Parameterliste) mit den Parametern der integrierten Funktion übereinstimmen. Das lokale Test-Framework übergibt Parameter an die gepatchte Funktion nach folgenden Regeln:

  • Bei Parametern des Typs ColumnOrName in der Signatur von integrierten Funktionen wird ColumnEmulator als Parameter der gepatchten Funktionen übergeben. ColumnEmulator ist vergleichbar mit einem pandas.Series-Objekt, das die Spaltendaten enthält.

  • Bei Parametern des Typs LiteralType in der Signatur von integrierten Funktionen wird der Literalwert als Parameter der gepatchten Funktionen übergeben.

  • In allen anderen Fällen wird der Rohwert als Parameter der gepatchten Funktionen übergeben.

Als Rückgabetyp der gepatchten Funktionen wird eine Instanz von ColumnEmulator erwartet, entsprechend dem Rückgabetyp Column der integrierten Funktionen.

Beispielsweise könnte die integrierte Funktion to_timestamp() wie folgt gepatcht werden:

import datetime
from snowflake.snowpark.mock import patch, ColumnEmulator, ColumnType
from snowflake.snowpark.functions import to_timestamp
from snowflake.snowpark.types import TimestampType

@patch(to_timestamp)
def mock_to_timestamp(column: ColumnEmulator, format = None) -> ColumnEmulator:
    ret_column = ColumnEmulator(data=[datetime.datetime.strptime(row, '%Y-%m-%dT%H:%M:%S%z') for row in column])
    ret_column.sf_type = ColumnType(TimestampType(), True)
    return ret_column
Copy

Überspringen von Testfällen

Wenn Ihre PyTest-Testsuite einen Testfall enthält, der für das lokale Testen nicht gut unterstützt wird, können Sie diese Fälle mit dem mark.skipif-Decorator-Element von PyTest überspringen. Im folgenden Beispiel wird davon ausgegangen, dass Ihre Sitzung und die Parameter wie oben beschrieben konfiguriert sind. Die Bedingung prüft, ob local_testing_mode auf local gesetzt ist. Ist dies der Fall, wird der Testfall übersprungen, wobei eine Meldung den Grund für das Überspringen erklärt.

import pytest

@pytest.mark.skipif(
    condition="config.getvalue('local_testing_mode') == 'local'",
reason="Test case disabled for local testing"
)
def test_case(session):
    ...
Copy

Einschränkungen

Die folgenden Funktionalitäten werden nicht unterstützt:

  • Raw-SQL-Zeichenfolgen und -Operationen, die das Parsen von SQL-Zeichenfolgen erfordern. Beispielsweise werden session.sql und DataFrame.filter("col1 > 12") nicht unterstützt.

  • UDFs, UDTFs und gespeicherte Prozeduren

  • Tabellenfunktionen

  • AsyncJobs

  • Sitzungsoperationen, wie das Ändern von Warehouses, Schemas und anderen Sitzungseigenschaften

  • Datentypen Geometry und Geography

  • Aggregieren von Fensterfunktionen

    # Selecting window function expressions is supported
    df.select("key", "value", sum_("value").over(), avg("value").over())
    
    # Aggregating window function expressions is NOT supported
    df.group_by("key").agg([sum_("value"), sum_(sum_("value")).over(window) - sum_("value")])
    
    Copy

Weiterhin bestehen folgende Einschränkungen:

  • Für die Datentypen Variant, Array und Object wird nur die Standard-JSON-Kodierung und -Dekodierung unterstützt. Ausdrücke wie [1,2,,3,] werden in Snowflake als gültiges JSON betrachtet, nicht aber beim lokalen Testen, wo die Python-integrierten JSON-Funktionalitäten verwendet werden. Sie können die Variablen auf Modulebene snowflake.snowpark.mock.CUSTOM_JSON_ENCODER und snowflake.snowpark.mock.CUSTOM_JSON_DECODER angeben, um die Standardeinstellungen außer Kraft zu setzen.

  • Nur eine Teilmenge der Funktionen von Snowflake (einschließlich der Fensterfunktionen) ist implementiert. Unter Patching der integrierten Funktionen erfahren Sie, wie Sie Ihre eigene Funktionsdefinition einfügen können.

  • Das Patchen von rangbezogenen Funktionen wird derzeit nicht unterstützt.

  • Wenn Sie Spalten mit demselben Namen auswählen, wird nur eine Spalte zurückgegeben. Als Problemumgehung können Sie die Spalten mit Column.alias so umbenennen, dass sie eindeutige Namen haben.

    df.select(lit(1), lit(1)).show() # col("a"), col("a")
    #---------
    #|"'1'"  |
    #---------
    #|1      |
    #|...    |
    #---------
    
    # Workaround: Column.alias
    DataFrame.select(lit(1).alias("col1_1"), lit(1).alias("col1_2"))
    # "col1_1", "col1_2"
    
    Copy
  • Explizite Typumwandlung mit Column.cast hat die Einschränkung, dass Formatzeichenfolgen nicht unterstützt werden. Dies betrifft für Eingaben to_decimal, to_number, to_numeric, to_double, to_date, to_time, to_timestamp und für Ausgaben to_char, to_varchar, to_binary.

  • JSON-Zeichenfolgen, die in VariantType gespeichert sind, können nicht in Datetime-Typen konvertiert werden.

  • Bei Table.merge und Table.update unterstützt die Implementierung nur das Verhalten, wenn die Sitzungsparameter ERROR_ON_NONDETERMINISTIC_UPDATE und ERROR_ON_NONDETERMINISTIC_MERGE auf False gesetzt sind. Das bedeutet, dass bei Mehrfachverknüpfungen (Multi-Joins) eine der übereinstimmenden Zeilen aktualisiert wird.

Liste der unterstützten APIs

Snowpark-Sitzung

Session.createDataFrame

Session.create_dataframe

Session.flatten

Session.range

Session.table

Eingabe/Ausgabe

DataFrameReader.csv

DataFrameReader.table

DataFrameWriter.saveAsTable

DataFrameWriter.save_as_table

DataFrame

DataFrame.agg

DataFrame.cache_result

DataFrame.col

DataFrame.collect

DataFrame.collect_nowait

DataFrame.copy_into_table

DataFrame.count

DataFrame.createOrReplaceTempView

DataFrame.createOrReplaceView

DataFrame.create_or_replace_temp_view

DataFrame.create_or_replace_view

DataFrame.crossJoin

DataFrame.cross_join

DataFrame.distinct

DataFrame.drop

DataFrame.dropDuplicates

DataFrame.drop_duplicates

DataFrame.dropna

DataFrame.except_

DataFrame.explain

DataFrame.fillna

DataFrame.filter

DataFrame.first

DataFrame.groupBy

DataFrame.group_by

DataFrame.intersect

DataFrame.join

DataFrame.limit

DataFrame.minus

DataFrame.natural_join

DataFrame.orderBy

DataFrame.order_by

DataFrame.rename

DataFrame.replace

DataFrame.rollup

DataFrame.sample

DataFrame.select

DataFrame.show

DataFrame.sort

DataFrame.subtract

DataFrame.take

DataFrame.toDF

DataFrame.toLocalIterator

DataFrame.toPandas

DataFrame.to_df

DataFrame.to_local_iterator

DataFrame.to_pandas

DataFrame.to_pandas_batches

DataFrame.union

DataFrame.unionAll

DataFrame.unionAllByName

DataFrame.unionByName

DataFrame.union_all

DataFrame.union_all_by_name

DataFrame.union_by_name

DataFrame.unpivot

DataFrame.where

DataFrame.withColumn

DataFrame.withColumnRenamed

DataFrame.with_column

DataFrame.with_column_renamed

DataFrame.with_columns

DataFrameNaFunctions.drop

DataFrameNaFunctions.fill

DataFrameNaFunctions.replace

Spalte

Column.alias

Column.as_

Column.asc

Column.asc_nulls_first

Column.asc_nulls_last

Column.astype

Column.between

Column.bitand

Column.bitor

Column.bitwiseAnd

Column.bitwiseOR

Column.bitwiseXOR

Column.bitxor

Column.cast

Column.collate

Column.desc

Column.desc_nulls_first

Column.desc_nulls_last

Column.endswith

Column.eqNullSafe

Column.equal_nan

Column.equal_null

Column.getItem

Column.getName

Column.get_name

Column.in_

Column.isNotNull

Column.isNull

Column.is_not_null

Column.is_null

Column.isin

Column.like

Column.name

Column.over

Column.regexp

Column.rlike

Column.startswith

Column.substr

Column.substring

Column.try_cast

Column.within_group

CaseExpr.when

CaseExpr.otherwise

Datentypen

ArrayType

BinaryType

BooleanType

ByteType

ColumnIdentifier

DataType

DateType

DecimalType

DoubleType

FloatType

IntegerType

LongType

MapType

NullType

ShortType

StringType

StructField

StructType

Timestamp

TimestampType

TimeType

Variant

VariantType

Zeile

Row.asDict

Row.as_dict

Row.count

Row.index

Funktion

abs

avg

coalesce

contains

count

count_distinct

covar_pop

endswith

first_value

iff

lag

last_value

lead

list_agg

max

median

min

parse_json

row_number

startswith

substring

sum

to_array

to_binary

to_boolean

to_char

to_date

to_decimal

to_double

to_object

to_time

to_timestamp

to_variant

Window

Window.orderBy

Window.order_by

Window.partitionBy

Window.partition_by

Window.rangeBetween

Window.range_between

Window.rowsBetween

Window.rows_between

WindowSpec.orderBy

WindowSpec.order_by

WindowSpec.partitionBy

WindowSpec.partition_by

WindowSpec.rangeBetween

WindowSpec.range_between

WindowSpec.rowsBetween

WindowSpec.rows_between

Gruppieren

RelationalGroupedDataFrame.agg

RelationalGroupedDataFrame.apply_in_pandas

RelationalGroupedDataFrame.applyInPandas

RelationalGroupedDataFrame.avg

RelationalGroupedDataFrame.builtin

RelationalGroupedDataFrame.count

RelationalGroupedDataFrame.function

RelationalGroupedDataFrame.max

RelationalGroupedDataFrame.mean

RelationalGroupedDataFrame.median

RelationalGroupedDataFrame.min

RelationalGroupedDataFrame.sum

Tabelle

Table.delete

Table.drop_table

Table.merge

Table.sample

Table.update

WhenMatchedClause.delete

WhenMatchedClause.update

WhenNotMatchedClause.insert