ローカルテストフレームワーク¶
このトピックでは、Snowpark Pythonライブラリを使用する際にローカルでコードをテストする方法について説明します。
このトピックの内容:
Snowpark Pythonローカルテストフレームワークはエミュレーターです。Snowflakeアカウントに接続することなく、ローカルでSnowpark Python DataFrames を作成および操作できます。ローカルテストフレームワークを使用すると、コードの変更をアカウントに展開する前に、開発マシンまたは CI(継続的統合)パイプラインで DataFrame 操作をローカルでテストできます。API は同じであるため、コードを変更せずに、テストをローカルで実行することも、Snowflakeアカウントに対して実行することもできます。
前提条件¶
ローカルテストフレームワークを使用するには
オプションで依存関係
localtest
があるバージョン1.18.0以上のSnowpark Pythonライブラリを使用する必要があります。サポートされているPythonのバージョンは次のとおりです。
3.9
3.10
3.11
3.12
Snowpark Pythonライブラリをインストールする¶
ライブラリをオプションの依存関係とともにインストールするには、以下のコマンドを実行します。
pip install "snowflake-snowpark-python[localtest]"
セッションを作成し、ローカルテストを有効にする¶
開始するには、Snowpark
Session
を作成し、ローカルテスト構成をTrue
に設定します。from snowflake.snowpark import Session session = Session.builder.config('local_testing', True).create()
セッションを使用して、 DataFrames を作成し、操作します。
df = session.create_dataframe([[1,2],[3,4]],['a','b']) df.with_column('c', df['a']+df['b']).show()
データのロード¶
Pythonのプリミティブ、ファイル、Pandas DataFrames からSnowpark DataFrames を作成できます。これは、テストケースの入力と予想される出力を指定するのに役立ちます。このようにすることで、データがソース管理内に置かれ、テストデータとテストケースの同期の維持が簡単になります。
CSV データをロードする¶
CSV ファイルをSnowpark DataFrame にロードするには、まず
Session.file.put()
を呼び出してファイルをイン・メモリ・ステージにロードし、次にSession.read()
を使って内容を読み込みます。
例
次のような内容のファイル、 data.csv
があるとします。
col1,col2,col3,col4
1,a,true,1.23
2,b,false,4.56
以下のコードを使用して、 data.csv
を Snowpark DataFrame にロードすることができます。ファイルをステージに置く必要があります。そうしないと、「ファイルが見つかりません」というエラーが出ます。
from snowflake.snowpark.types import StructType, StructField, IntegerType, BooleanType, StringType, DoubleType
# Put file onto stage
session.file.put("data.csv", "@mystage", auto_compress=False)
schema = StructType(
[
StructField("col1", IntegerType()),
StructField("col2", StringType()),
StructField("col3", BooleanType()),
StructField("col4", DoubleType()),
]
)
# with option SKIP_HEADER set to 1, the header will be skipped when the csv file is loaded
dataframe = session.read.schema(schema).option("SKIP_HEADER", 1).csv("@mystage/data.csv")
dataframe.show()
期待される出力
-------------------------------------
|"COL1" |"COL2" |"COL3" |"COL4" |
-------------------------------------
|1 |a |True |1.23 |
|2 |b |False |4.56 |
-------------------------------------
pandasのデータを読み込む¶
pandas DataFrame から Snowpark Python DataFrame を作成するには、
create_dataframe
メソッドを呼び出し、pandas DataFrame としてデータを渡します。
例
import pandas as pd
pandas_df = pd.DataFrame(
data={
"col1": pd.Series(["value1", "value2"]),
"col2": pd.Series([1.23, 4.56]),
"col3": pd.Series([123, 456]),
"col4": pd.Series([True, False]),
}
)
dataframe = session.create_dataframe(data=pandas_df)
dataframe.show()
期待される出力
-------------------------------------
|"col1" |"col2" |"col3" |"col4" |
-------------------------------------
|value1 |1.23 |123 |True |
|value2 |4.56 |456 |False |
-------------------------------------
Snowpark Python DataFrameを pandas DataFrame に変換するには、 DataFrame の
to_pandas
メソッドを呼び出します。
例
from snowflake.snowpark.types import StructType, StructField, StringType, DoubleType, LongType, BooleanType
dataframe = session.create_dataframe(
data=[
["value1", 1.23, 123, True],
["value2", 4.56, 456, False],
],
schema=StructType([
StructField("col1", StringType()),
StructField("col2", DoubleType()),
StructField("col3", LongType()),
StructField("col4", BooleanType()),
])
)
pandas_dataframe = dataframe.to_pandas()
print(pandas_dataframe.to_string())
期待される出力
COL1 COL2 COL3 COL4
0 value1 1.23 123 True
1 value2 4.56 456 False
セッション用に PyTest Fixtureを作成する¶
PyTest フィクスチャ は、テスト(またはテストのモジュール)の前に実行される関数で、通常はテストにデータや接続を提供します。このシナリオでは、Snowpark Session
オブジェクトを返すフィクスチャを作成します。
まだ
test
ディレクトリがない場合は、作成してください。次に、
test
ディレクトリに次のコンテンツのファイルconftest.py
を作成します。connection_parameters
は、Snowflakeアカウントの認証情報があるディクショナリです。# test/conftest.py import pytest from snowflake.snowpark.session import Session def pytest_addoption(parser): parser.addoption("--snowflake-session", action="store", default="live") @pytest.fixture(scope='module') def session(request) -> Session: if request.config.getoption('--snowflake-session') == 'local': return Session.builder.config('local_testing', True).create() else: return Session.builder.configs(CONNECTION_PARAMETERS).create()
ディクショナリフォーマットの詳細については、 セッションの作成 をご参照ください。
pytest_addoption
の呼び出しにより、 pytest
コマンドに snowflake-session
というコマンドラインオプションが追加されます。 Session
フィクスチャはこのコマンドラインオプションをチェックし、その値に応じてローカルかライブの Session
を作成します。これにより、以下のコマンドラインの例に示すように、テストのためにローカルモードとライブモードを簡単に切り替えることができます。
# Using local mode:
pytest --snowflake-session local
# Using live mode
pytest
SQL 操作 。¶
Session.sql(...)
はローカルテストフレームワークではサポートされていません。可能な限りSnowparkの DataFrame APIs を使用します。 Session.sql(...)
を使用しなければならない場合は、Pythonの unittest.mock.patch
を使用して表形式の戻り値をモックし、指定された Session.sql()
呼び出しから予想される応答をパッチすることができます。
以下の例では、 mock_sql()
は SQL クエリテキストを DataFrame 応答にマッピングします。次の条件ステートメントは、現在のセッションがローカルテストを使っているかどうかをチェックし、そうであれば Session.sql()
メソッドにパッチを適用します。
from unittest import mock
from functools import partial
def test_something(pytestconfig, session):
def mock_sql(session, sql_string): # patch for SQL operations
if sql_string == "select 1,2,3":
return session.create_dataframe([[1,2,3]])
else:
raise RuntimeError(f"Unexpected query execution: {sql_string}")
if pytestconfig.getoption('--snowflake-session') == 'local':
with mock.patch.object(session, 'sql', wraps=partial(mock_sql, session)): # apply patch for SQL operations
assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
else:
assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
ローカルテストが有効な場合、 DataFrame.save_as_table()
で作成されたすべてのテーブルは仮テーブルとしてメモリに保存され、 Session.table()
を使用して取得することができます。サポートされている DataFrame 操作は、通常通りテーブル上で使用できます。
組み込み関数のパッチ¶
snowflake.snowpark.functions
の組み込み関数のすべてが、ローカルテストフレームワークでサポートされているわけではありません。サポートされていない関数を使用する場合は、 snowflake.snowpark.mock
の @patch
デコレーターを使用してパッチを作成する必要があります。
パッチされた関数を定義し実装するには、署名(パラメーターリスト)が組み込み関数のパラメーターと一致している必要があります。ローカルテストフレームワークは、次のルールを使ってパッチされた関数にパラメーターを渡します。
組み込み関数の署名の
ColumnOrName
型のパラメーターでは、ColumnEmulator
がパッチされた関数のパラメーターとして渡されます。ColumnEmulator
は列データを含むpandas.Series
オブジェクトに似ています。組み込み関数の署名の
LiteralType
型のパラメーターについては、リテラル値がパッチされた関数のパラメーターとして渡されます。そうでない場合は、生の値がパッチされた関数のパラメーターとして渡されます。
パッチされた関数の戻り型に関しては、 ColumnEmulator
のインスタンスを返すことが、組み込み関数の Column
の戻り型に対して予想されます。
たとえば、組み込み関数 to_timestamp()
は次のようにパッチすることができます。
import datetime
from snowflake.snowpark.mock import patch, ColumnEmulator, ColumnType
from snowflake.snowpark.functions import to_timestamp
from snowflake.snowpark.types import TimestampType
@patch(to_timestamp)
def mock_to_timestamp(column: ColumnEmulator, format = None) -> ColumnEmulator:
ret_column = ColumnEmulator(data=[datetime.datetime.strptime(row, '%Y-%m-%dT%H:%M:%S%z') for row in column])
ret_column.sf_type = ColumnType(TimestampType(), True)
return ret_column
テストケースのスキップ¶
PyTest テストスイートにローカルテストでうまくサポートされないテストケースが含まれている場合は、 PyTest の mark.skipif
デコレーターを使用してそれらのケースをスキップすることができます。次の例は、前述のようにセッションとパラメーターが構成されていると想定します。この条件は、 local_testing_mode
が local
に設定されているかどうかをチェックします。設定されている場合、テストケースは説明メッセージとともにスキップされます。
import pytest
@pytest.mark.skipif(
condition="config.getvalue('local_testing_mode') == 'local'",
reason="Test case disabled for local testing"
)
def test_case(session):
...
UDFs、およびストアドプロシージャの登録¶
ローカルテストフレームワークでは、ユーザー定義関数(UDFs)とストアドプロシージャを作成し、呼び出すことができます。オブジェクトを作成するには、以下の構文オプションを使用します。
構文 |
UDF |
ストアドプロシージャ |
---|---|---|
デコレーター |
|
|
登録メソッド |
|
|
ファイルからの登録メソッド |
|
|
例
次のコード例では、デコレーターを使用して UDF とストアドプロシージャを作成し、その両方を名前で呼び出します。
from snowflake.snowpark.session import Session
from snowflake.snowpark.dataframe import col, DataFrame
from snowflake.snowpark.functions import udf, sproc, call_udf
from snowflake.snowpark.types import IntegerType, StringType
# Create local session
session = Session.builder.config('local_testing', True).create()
# Create local table
table = 'example'
session.create_dataframe([[1,2],[3,4]],['a','b']).write.save_as_table(table)
# Register a UDF, which is called from the stored procedure
@udf(name='example_udf', return_type=IntegerType(), input_types=[IntegerType(), IntegerType()])
def example_udf(a, b):
return a + b
# Register stored procedure
@sproc(name='example_proc', return_type=IntegerType(), input_types=[StringType()])
def example_proc(session, table_name):
return session.table(table_name)\
.with_column('c', call_udf('example_udf', col('a'), col('b')))\
.count()
# Call the stored procedure by name
output = session.call('example_proc', table)
print(output)
制限事項¶
以下のリストには、ローカルテストフレームワークにおける既知の制限と動作のギャップが含まれています。 現在のところ、Snowflake ではこれらの項目に対処する予定はありません。
生の SQL 文字列と SQL 文字列の解析を必要とする操作(例:
session.sql
とDataFrame.filter("col1 > 12")
)はサポートされていません。非同期操作はサポートされていません。
テーブル、ストアドプロシージャ、 UDFs などのデータベースオブジェクトは、セッションレベルを超えて永続化されることはなく、すべての操作はメモリ内で実行されます。たとえば、あるモックセッションに登録された永続的なストアドプロシージャは、他のモックセッションからは見ることができません。
Column.collate
のような 文字列照合順序 関連の機能はサポートされていません。Variant
、Array
、Object
のデータ型は、標準の JSON エンコードとデコードでのみサポートされています。{1,2,,3,} のような式は、Snowflakeでは有効な JSON とみなされますが、Pythonの組み込み JSON 関数が使用されるローカルテストでは無効です。モジュールレベルの変数snowflake.snowpark.mock.CUSTOM_JSON_ENCODER
とsnowflake.snowpark.mock.CUSTOM_JSON_DECODER
を指定して、デフォルト設定を上書きすることができます。Snowflakeの関数のサブセット(ウィンドウ関数を含む)のみが実装されます。独自の関数定義を注入する方法については、 組み込み関数のパッチ をご参照ください。
ランク関連機能のパッチは現在サポートされていません。
SQL 形式モデル はサポートされていません。たとえば、
to_decimal
のモック実装は、オプションのパラメーターformat
を処理しません。Snowpark Pythonライブラリには、ステージを作成またはドロップするための組み込みPython API がないため、ローカルテストフレームワークは、すべての入力ステージがすでに作成されていると仮定します。
UDFs とストアドプロシージャの現在の実装では、パッケージが検証されません。コード内で参照されているパッケージは、プログラムを実行する前にすべてインストールする必要があります。
クエリタグはサポートされていません。
クエリ履歴はサポートされていません。
系列はサポートされていません。
UDF またはストアドプロシージャを登録する場合、
parallel
、execute_as
、statement_params
、source_code_display
、external_access_integrations
、secrets
、comment
などのオプションのパラメーターは無視されます。Table.sample
の場合、 SYSTEM または BLOCK のサンプリングは、 ROW のサンプリングと同じです。Snowflakeは、ストアドプロシージャ内でローカルテストフレームワークを実行することを公式にはサポートしていません。ストアドプロシージャ内のローカルテストモードのセッションで、予期しないエラーが発生したりトリガーされたりすることがあります。
サポートされていない機能¶
以下は、現在ローカルテストフレームワークに実装されていない機能のリストです。 Snowflakeはこれらのアイテムについては、積極的に対処するように取り組んでいます。
通常、これらの機能を参照すると、 NotImplementedError
が発生します。
UDTFs (ユーザー定義テーブル関数)
UDAFs (ユーザー定義集計関数)
ベクトル化された UDFs および UDTFs
組み込みテーブル関数
テーブルストアドプロシージャ
Geometry
、Geography
、およびVector
のデータ型区間表現
JSON および CSV 以外の読み取りファイル形式
サポートされているファイル形式の場合、すべての読み取りオプションがサポートされているわけではありません。たとえば、
infer_schema
は CSV 形式をサポートしていません。
未サポートまたは既知の制限事項としてここにリストアップされていない機能については、 ローカルテストのための機能リクエスト の最新リストを確認するか、 snowpark-python
GitHubリポジトリで 機能リクエストを作成 します。
既知の問題¶
以下は、ローカルテストフレームワークに存在する既知の問題や動作ギャップのリストです。 Snowflakeはこれらの問題には積極的に対処する予定です。
DataFrame.groupby
またはその他の集計操作の内部における、ウィンドウ関数の使用はサポートされていません。# Selecting window function expressions is supported df.select("key", "value", sum_("value").over(), avg("value").over()) # Aggregating window function expressions is NOT supported df.group_by("key").agg([sum_("value"), sum_(sum_("value")).over(window) - sum_("value")])
同じ名前の列を選択すると、1つの列のみが返されます。回避策として、
Column.alias
を使用して列の名前を変更して明確な名前にします。df.select(lit(1), lit(1)).show() # col("a"), col("a") #--------- #|"'1'" | #--------- #|1 | #|... | #--------- # Workaround: Column.alias DataFrame.select(lit(1).alias("col1_1"), lit(1).alias("col1_2")) # "col1_1", "col1_2"
Table.merge
とTable.update
の場合、セッションパラメーターERROR_ON_NONDETERMINISTIC_UPDATE
とERROR_ON_NONDETERMINISTIC_MERGE
は、False
に設定する必要があります。これは、複数結合の場合、マッチした行の1つが更新されることを意味します。GET および PUT ファイル操作での完全修飾ステージ名はサポートされていません。データベース名とスキーマ名はステージ名の一部として扱われます。
mock_to_char
の実装は、異なる時間部分の間にセパレーターを持つ形式のタイムスタンプのみをサポートしています。DataFrame.pivot
には、ピボットを特定の値に制限できるvalues
と呼ばれるパラメーターがあります。現時点では、統計的に定義された値のみを使用することができます。サブクエリを使用して提供された値はエラーとなります。タイムゾーン情報のあるタイムスタンプを含むpandas
DataFrame
からDataFrame
を作成することはサポートされていません。
このリストに記載されていない問題については、 最新のオープンな問題リスト を確認するか、 snowpark-python
GitHubリポジトリに バグレポートを作成 してください。