ローカルテストフレームワーク

このトピックでは、Snowpark Pythonライブラリを使用する際にローカルでコードをテストする方法について説明します。

このトピックの内容:

Snowpark Pythonローカルテストフレームワークはエミュレーターです。Snowflakeアカウントに接続することなく、ローカルでSnowpark Python DataFrames を作成および操作できます。ローカルテストフレームワークを使用すると、コードの変更をアカウントに展開する前に、開発マシンまたは CI(継続的統合)パイプラインで DataFrame 操作をローカルでテストできます。API は同じであるため、コードを変更せずに、テストをローカルで実行することも、Snowflakeアカウントに対して実行することもできます。

前提条件

ローカルテストフレームワークを使用するには

  • オプションで依存関係 localtest があるバージョン1.18.0以上のSnowpark Pythonライブラリを使用する必要があります。

  • サポートされているPythonのバージョンは次のとおりです。

    • 3.9

    • 3.10

    • 3.11

    • 3.12

Snowpark Pythonライブラリをインストールする

  • ライブラリをオプションの依存関係とともにインストールするには、以下のコマンドを実行します。

    pip install "snowflake-snowpark-python[localtest]"
    
    Copy

セッションを作成し、ローカルテストを有効にする

  1. 開始するには、Snowpark Session を作成し、ローカルテスト構成を True に設定します。

    from snowflake.snowpark import Session
    
    session = Session.builder.config('local_testing', True).create()
    
    Copy
  2. セッションを使用して、 DataFrames を作成し、操作します。

    df = session.create_dataframe([[1,2],[3,4]],['a','b'])
    df.with_column('c', df['a']+df['b']).show()
    
    Copy

データのロード

Pythonのプリミティブ、ファイル、Pandas DataFrames からSnowpark DataFrames を作成できます。これは、テストケースの入力と予想される出力を指定するのに役立ちます。このようにすることで、データがソース管理内に置かれ、テストデータとテストケースの同期の維持が簡単になります。

CSV データをロードする

  • CSV ファイルをSnowpark DataFrame にロードするには、まず Session.file.put() を呼び出してファイルをイン・メモリ・ステージにロードし、次に Session.read() を使って内容を読み込みます。

次のような内容のファイル、 data.csv があるとします。

col1,col2,col3,col4
1,a,true,1.23
2,b,false,4.56
Copy

以下のコードを使用して、 data.csv を Snowpark DataFrame にロードすることができます。ファイルをステージに置く必要があります。そうしないと、「ファイルが見つかりません」というエラーが出ます。

from snowflake.snowpark.types import StructType, StructField, IntegerType, BooleanType, StringType, DoubleType


# Put file onto stage
session.file.put("data.csv", "@mystage", auto_compress=False)
schema = StructType(
    [
        StructField("col1", IntegerType()),
        StructField("col2", StringType()),
        StructField("col3", BooleanType()),
        StructField("col4", DoubleType()),
    ]
)

# with option SKIP_HEADER set to 1, the header will be skipped when the csv file is loaded
dataframe = session.read.schema(schema).option("SKIP_HEADER", 1).csv("@mystage/data.csv")
dataframe.show()
Copy

期待される出力

-------------------------------------
|"COL1"  |"COL2"  |"COL3"  |"COL4"  |
-------------------------------------
|1       |a       |True    |1.23    |
|2       |b       |False   |4.56    |
-------------------------------------

pandasのデータを読み込む

  • pandas DataFrame から Snowpark Python DataFrame を作成するには、 create_dataframe メソッドを呼び出し、pandas DataFrame としてデータを渡します。

import pandas as pd

pandas_df = pd.DataFrame(
    data={
        "col1": pd.Series(["value1", "value2"]),
        "col2": pd.Series([1.23, 4.56]),
        "col3": pd.Series([123, 456]),
        "col4": pd.Series([True, False]),
    }
)

dataframe = session.create_dataframe(data=pandas_df)
dataframe.show()
Copy

期待される出力

-------------------------------------
|"col1"  |"col2"  |"col3"  |"col4"  |
-------------------------------------
|value1  |1.23    |123     |True    |
|value2  |4.56    |456     |False   |
-------------------------------------
  • Snowpark Python DataFrameを pandas DataFrame に変換するには、 DataFrame の to_pandas メソッドを呼び出します。

from snowflake.snowpark.types import StructType, StructField, StringType, DoubleType, LongType, BooleanType

dataframe = session.create_dataframe(
    data=[
        ["value1", 1.23, 123, True],
        ["value2", 4.56, 456, False],
    ],
    schema=StructType([
        StructField("col1", StringType()),
        StructField("col2", DoubleType()),
        StructField("col3", LongType()),
        StructField("col4", BooleanType()),
    ])
)

pandas_dataframe = dataframe.to_pandas()
print(pandas_dataframe.to_string())
Copy

期待される出力

    COL1  COL2  COL3   COL4
0  value1  1.23   123   True
1  value2  4.56   456  False

セッション用に PyTest Fixtureを作成する

PyTest フィクスチャ は、テスト(またはテストのモジュール)の前に実行される関数で、通常はテストにデータや接続を提供します。このシナリオでは、Snowpark Session オブジェクトを返すフィクスチャを作成します。

  1. まだ test ディレクトリがない場合は、作成してください。

  2. 次に、 test ディレクトリに次のコンテンツのファイル conftest.py を作成します。 connection_parameters は、Snowflakeアカウントの認証情報があるディクショナリです。

    # test/conftest.py
    import pytest
    from snowflake.snowpark.session import Session
    
    def pytest_addoption(parser):
        parser.addoption("--snowflake-session", action="store", default="live")
    
    @pytest.fixture(scope='module')
    def session(request) -> Session:
        if request.config.getoption('--snowflake-session') == 'local':
            return Session.builder.config('local_testing', True).create()
        else:
            return Session.builder.configs(CONNECTION_PARAMETERS).create()
    
    Copy

ディクショナリフォーマットの詳細については、 セッションの作成 をご参照ください。

pytest_addoption の呼び出しにより、 pytest コマンドに snowflake-session というコマンドラインオプションが追加されます。 Session フィクスチャはこのコマンドラインオプションをチェックし、その値に応じてローカルかライブの Session を作成します。これにより、以下のコマンドラインの例に示すように、テストのためにローカルモードとライブモードを簡単に切り替えることができます。

# Using local mode:
pytest --snowflake-session local

# Using live mode
pytest
Copy

SQL 操作 。

Session.sql(...) はローカルテストフレームワークではサポートされていません。可能な限りSnowparkの DataFrame APIs を使用します。 Session.sql(...) を使用しなければならない場合は、Pythonの unittest.mock.patch を使用して表形式の戻り値をモックし、指定された Session.sql() 呼び出しから予想される応答をパッチすることができます。

以下の例では、 mock_sql() は SQL クエリテキストを DataFrame 応答にマッピングします。次の条件ステートメントは、現在のセッションがローカルテストを使っているかどうかをチェックし、そうであれば Session.sql() メソッドにパッチを適用します。

from unittest import mock
from functools import partial

def test_something(pytestconfig, session):

    def mock_sql(session, sql_string):  # patch for SQL operations
        if sql_string == "select 1,2,3":
            return session.create_dataframe([[1,2,3]])
        else:
            raise RuntimeError(f"Unexpected query execution: {sql_string}")

    if pytestconfig.getoption('--snowflake-session') == 'local':
        with mock.patch.object(session, 'sql', wraps=partial(mock_sql, session)): # apply patch for SQL operations
            assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
    else:
        assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
Copy

ローカルテストが有効な場合、 DataFrame.save_as_table() で作成されたすべてのテーブルは仮テーブルとしてメモリに保存され、 Session.table() を使用して取得することができます。サポートされている DataFrame 操作は、通常通りテーブル上で使用できます。

組み込み関数のパッチ

snowflake.snowpark.functions の組み込み関数のすべてが、ローカルテストフレームワークでサポートされているわけではありません。サポートされていない関数を使用する場合は、 snowflake.snowpark.mock@patch デコレーターを使用してパッチを作成する必要があります。

パッチされた関数を定義し実装するには、署名(パラメーターリスト)が組み込み関数のパラメーターと一致している必要があります。ローカルテストフレームワークは、次のルールを使ってパッチされた関数にパラメーターを渡します。

  • 組み込み関数の署名の ColumnOrName 型のパラメーターでは、 ColumnEmulator がパッチされた関数のパラメーターとして渡されます。 ColumnEmulator は列データを含む pandas.Series オブジェクトに似ています。

  • 組み込み関数の署名の LiteralType 型のパラメーターについては、リテラル値がパッチされた関数のパラメーターとして渡されます。

  • そうでない場合は、生の値がパッチされた関数のパラメーターとして渡されます。

パッチされた関数の戻り型に関しては、 ColumnEmulator のインスタンスを返すことが、組み込み関数の Column の戻り型に対して予想されます。

たとえば、組み込み関数 to_timestamp() は次のようにパッチすることができます。

import datetime
from snowflake.snowpark.mock import patch, ColumnEmulator, ColumnType
from snowflake.snowpark.functions import to_timestamp
from snowflake.snowpark.types import TimestampType

@patch(to_timestamp)
def mock_to_timestamp(column: ColumnEmulator, format = None) -> ColumnEmulator:
    ret_column = ColumnEmulator(data=[datetime.datetime.strptime(row, '%Y-%m-%dT%H:%M:%S%z') for row in column])
    ret_column.sf_type = ColumnType(TimestampType(), True)
    return ret_column
Copy

テストケースのスキップ

PyTest テストスイートにローカルテストでうまくサポートされないテストケースが含まれている場合は、 PyTest の mark.skipif デコレーターを使用してそれらのケースをスキップすることができます。次の例は、前述のようにセッションとパラメーターが構成されていると想定します。この条件は、 local_testing_modelocal に設定されているかどうかをチェックします。設定されている場合、テストケースは説明メッセージとともにスキップされます。

import pytest

@pytest.mark.skipif(
    condition="config.getvalue('local_testing_mode') == 'local'",
reason="Test case disabled for local testing"
)
def test_case(session):
    ...
Copy

UDFs、およびストアドプロシージャの登録

ローカルテストフレームワークでは、ユーザー定義関数(UDFs)とストアドプロシージャを作成し、呼び出すことができます。オブジェクトを作成するには、以下の構文オプションを使用します。

構文

UDF

ストアドプロシージャ

デコレーター

@udf

@sproc

登録メソッド

udf.register()

sproc.register()

ファイルからの登録メソッド

udf.register_from_file()

sproc.register_from_file()

次のコード例では、デコレーターを使用して UDF とストアドプロシージャを作成し、その両方を名前で呼び出します。

from snowflake.snowpark.session import Session
from snowflake.snowpark.dataframe import col, DataFrame
from snowflake.snowpark.functions import udf, sproc, call_udf
from snowflake.snowpark.types import IntegerType, StringType

# Create local session
session = Session.builder.config('local_testing', True).create()

# Create local table
table = 'example'
session.create_dataframe([[1,2],[3,4]],['a','b']).write.save_as_table(table)

# Register a UDF, which is called from the stored procedure
@udf(name='example_udf', return_type=IntegerType(), input_types=[IntegerType(), IntegerType()])
def example_udf(a, b):
    return a + b

# Register stored procedure
@sproc(name='example_proc', return_type=IntegerType(), input_types=[StringType()])
def example_proc(session, table_name):
    return session.table(table_name)\
        .with_column('c', call_udf('example_udf', col('a'), col('b')))\
        .count()

# Call the stored procedure by name
output = session.call('example_proc', table)

print(output)
Copy

制限事項

以下のリストには、ローカルテストフレームワークにおける既知の制限と動作のギャップが含まれています。 現在のところ、Snowflake ではこれらの項目に対処する予定はありません。

  • 生の SQL 文字列と SQL 文字列の解析を必要とする操作(例: session.sqlDataFrame.filter("col1 > 12"))はサポートされていません。

  • 非同期操作はサポートされていません。

  • テーブル、ストアドプロシージャ、 UDFs などのデータベースオブジェクトは、セッションレベルを超えて永続化されることはなく、すべての操作はメモリ内で実行されます。たとえば、あるモックセッションに登録された永続的なストアドプロシージャは、他のモックセッションからは見ることができません。

  • Column.collate のような 文字列照合順序 関連の機能はサポートされていません。

  • VariantArrayObject のデータ型は、標準の JSON エンコードとデコードでのみサポートされています。{1,2,,3,} のような式は、Snowflakeでは有効な JSON とみなされますが、Pythonの組み込み JSON 関数が使用されるローカルテストでは無効です。モジュールレベルの変数 snowflake.snowpark.mock.CUSTOM_JSON_ENCODERsnowflake.snowpark.mock.CUSTOM_JSON_DECODER を指定して、デフォルト設定を上書きすることができます。

  • Snowflakeの関数のサブセット(ウィンドウ関数を含む)のみが実装されます。独自の関数定義を注入する方法については、 組み込み関数のパッチ をご参照ください。

    • ランク関連機能のパッチは現在サポートされていません。

  • SQL 形式モデル はサポートされていません。たとえば、 to_decimal のモック実装は、オプションのパラメーター format を処理しません。

  • Snowpark Pythonライブラリには、ステージを作成またはドロップするための組み込みPython API がないため、ローカルテストフレームワークは、すべての入力ステージがすでに作成されていると仮定します。

  • UDFs とストアドプロシージャの現在の実装では、パッケージが検証されません。コード内で参照されているパッケージは、プログラムを実行する前にすべてインストールする必要があります。

  • クエリタグはサポートされていません。

  • クエリ履歴はサポートされていません。

  • 系列はサポートされていません。

  • UDF またはストアドプロシージャを登録する場合、 parallelexecute_asstatement_paramssource_code_displayexternal_access_integrationssecretscomment などのオプションのパラメーターは無視されます。

  • Table.sample の場合、 SYSTEM または BLOCK のサンプリングは、 ROW のサンプリングと同じです。

  • Snowflakeは、ストアドプロシージャ内でローカルテストフレームワークを実行することを公式にはサポートしていません。ストアドプロシージャ内のローカルテストモードのセッションで、予期しないエラーが発生したりトリガーされたりすることがあります。

サポートされていない機能

以下は、現在ローカルテストフレームワークに実装されていない機能のリストです。 Snowflakeはこれらのアイテムについては、積極的に対処するように取り組んでいます。

通常、これらの機能を参照すると、 NotImplementedError が発生します。

  • UDTFs (ユーザー定義テーブル関数)

  • UDAFs (ユーザー定義集計関数)

  • ベクトル化された UDFs および UDTFs

  • 組み込みテーブル関数

  • テーブルストアドプロシージャ

  • GeometryGeography、および Vector のデータ型

  • 区間表現

  • JSON および CSV 以外の読み取りファイル形式

    • サポートされているファイル形式の場合、すべての読み取りオプションがサポートされているわけではありません。たとえば、 infer_schema は CSV 形式をサポートしていません。

未サポートまたは既知の制限事項としてここにリストアップされていない機能については、 ローカルテストのための機能リクエスト の最新リストを確認するか、 snowpark-python GitHubリポジトリで 機能リクエストを作成 します。

既知の問題

以下は、ローカルテストフレームワークに存在する既知の問題や動作ギャップのリストです。 Snowflakeはこれらの問題には積極的に対処する予定です。

  • DataFrame.groupby またはその他の集計操作の内部における、ウィンドウ関数の使用はサポートされていません。

    # Selecting window function expressions is supported
    df.select("key", "value", sum_("value").over(), avg("value").over())
    
    # Aggregating window function expressions is NOT supported
    df.group_by("key").agg([sum_("value"), sum_(sum_("value")).over(window) - sum_("value")])
    
    Copy
  • 同じ名前の列を選択すると、1つの列のみが返されます。回避策として、 Column.alias を使用して列の名前を変更して明確な名前にします。

    df.select(lit(1), lit(1)).show() # col("a"), col("a")
    #---------
    #|"'1'"  |
    #---------
    #|1      |
    #|...    |
    #---------
    
    # Workaround: Column.alias
    DataFrame.select(lit(1).alias("col1_1"), lit(1).alias("col1_2"))
    # "col1_1", "col1_2"
    
    Copy
  • Table.mergeTable.update の場合、セッションパラメーター ERROR_ON_NONDETERMINISTIC_UPDATEERROR_ON_NONDETERMINISTIC_MERGE は、 False に設定する必要があります。これは、複数結合の場合、マッチした行の1つが更新されることを意味します。

  • GET および PUT ファイル操作での完全修飾ステージ名はサポートされていません。データベース名とスキーマ名はステージ名の一部として扱われます。

  • mock_to_char の実装は、異なる時間部分の間にセパレーターを持つ形式のタイムスタンプのみをサポートしています。

  • DataFrame.pivot には、ピボットを特定の値に制限できる values と呼ばれるパラメーターがあります。現時点では、統計的に定義された値のみを使用することができます。サブクエリを使用して提供された値はエラーとなります。

  • タイムゾーン情報のあるタイムスタンプを含むpandas DataFrame から DataFrame を作成することはサポートされていません。

このリストに記載されていない問題については、 最新のオープンな問題リスト を確認するか、 snowpark-python GitHubリポジトリに バグレポートを作成 してください。