チュートリアル: Python Snowparkのテスト¶
紹介¶
このチュートリアルでは、Snowpark Pythonコードのテストの基本を紹介します。
学習内容¶
このチュートリアルでは、次の方法を学習します。
Snowflakeに接続した状態でSnowparkコードをテストします。
PyTest のような標準的なテストユーティリティを使用して、Snowpark Python UDFs、 DataFrame 変換、ストアドプロシージャをテストすることができます。
ローカルテストフレームワークを使用して、Snowflakeアカウントに接続せずにローカルでSnowpark Python DataFrames をテストします。
コードの変更を展開する前に、ローカルテストフレームワークを使って、開発マシン上でローカルテストを行うことができます。
前提条件¶
ローカルテストフレームワークを使用するには
バージョン1.11.1以上のSnowpark Pythonライブラリを使用する必要があります。
サポートされているPythonのバージョンは次のとおりです。
3.8
3.9
3.10
3.11
プロジェクトを設定する¶
このセクションでは、プロジェクトのリポジトリをクローニングし、チュートリアルに必要な環境を設定します。
プロジェクトリポジトリをクローニングする。
git clone https://github.com/Snowflake-Labs/sftutorial-snowpark-testing
gitがインストールされていない場合は、リポジトリページに行き、 Code » Download Contents をクリックしてコンテンツをダウンロードしてください。
アカウント認証情報を使って環境変数を設定します。Snowpark API は、これらを使用してSnowflakeアカウントを認証します。
# Linux/MacOS export SNOWSQL_ACCOUNT=<replace with your account identifer> export SNOWSQL_USER=<replace with your username> export SNOWSQL_ROLE=<replace with your role> export SNOWSQL_PWD=<replace with your password> export SNOWSQL_DATABASE=<replace with your database> export SNOWSQL_SCHEMA=<replace with your schema> export SNOWSQL_WAREHOUSE=<replace with your warehouse>
# Windows/PowerShell $env:SNOWSQL_ACCOUNT = "<replace with your account identifer>" $env:SNOWSQL_USER = "<replace with your username>" $env:SNOWSQL_ROLE = "<replace with your role>" $env:SNOWSQL_PWD = "<replace with your password>" $env:SNOWSQL_DATABASE = "<replace with your database>" $env:SNOWSQL_SCHEMA = "<replace with your schema>" $env:SNOWSQL_WAREHOUSE = "<replace with your warehouse>"
オプション: このenv varを永久に設定するには、bashプロファイルを編集するか(Linux/MacOS の場合)、 System Properties メニューを使います(Windowsの場合)。
Anacondaを使用してconda環境を作成し、アクティブ化します。
conda env create --file environment.yml conda activate snowpark-testing
setup/create_table.py
を実行してサンプルテーブルを作成します。このPythonスクリプトは CITIBIKE というデータベース、PUBLIC というスキーマ、TRIPS という小さなテーブルを作成します。python setup/create_table.py
これで次のセクションに進む準備が整いました。このセクションでは、以下を実行しました。
チュートリアルリポジトリをクローニングした。
アカウント情報で環境変数を作成した。
プロジェクト用にconda環境を作成した。
Snowpark API を使用してSnowflakeに接続し、サンプルデータベース、スキーマ、およびテーブルを作成した。
ストアドプロシージャを試す¶
サンプルプロジェクトには、ストアドプロシージャハンドラー(sproc.py
)と3つの DataFrames 変換メソッド(transformers.py
)が含まれています。ストアドプロシージャハンドラーは UDF および DataFrame 変換器を使用して、ソーステーブル CITIBIKE.PUBLIC.TRIPS
から読み取り、2つのファクトテーブル: MONTH_FACTS
と BIKE_FACTS
を作成します。
このコマンドを実行することで、コマンドラインからストアドプロシージャを実行できます。
python project/sproc.py
プロジェクトに慣れたところで、次のセクションではtestディレクトリを設定し、Snowflakeセッション用の PyTest フィクスチャを作成します。
Snowflakeセッション用の PyTest フィクスチャを作成する¶
PyTest フィクスチャ は、テスト(またはテストのモジュール)の前に実行される関数で、通常はテストにデータや接続を提供するためのものです。このプロジェクトでは、Snowpark Session
オブジェクトを返す PyTest フィクスチャを作成します。テストケースでは、このセッションを使用してSnowflakeに接続します。
プロジェクトルートディレクトリの下に
test
ディレクトリを作成します。mkdir test
test
ディレクトリの下に、conftest.py
という新しいPythonファイルを作成します。conftest.py
内で、Session
オブジェクト用の PyTest フィクスチャを作成します。import pytest from project.utils import get_env_var_config from snowflake.snowpark.session import Session @pytest.fixture def session() -> Session: return Session.builder.configs(get_env_var_config()).create()
DataFrame 変換器用のユニットテストを追加する¶
test
ディレクトリで、test_transformers.py
という新しいPythonファイルを作成します。test_transformers.py
ファイルで、変換器メソッドをインポートします。# test/test_transformers.py from project.transformers import add_rider_age, calc_bike_facts, calc_month_facts
次に、これらの変換器用のユニットテストを作成します。通常の規則では、
test_<というメソッド名 >
を持つメソッドをテスト毎に作成することになっています。このケースでは、テストは次のようになります。# test/test_transformers.py from project.transformers import add_rider_age, calc_bike_facts, calc_month_facts def test_add_rider_age(session): ... def test_calc_bike_facts(session): ... def test_calc_month_facts(session): ...
各テストケースの
session
パラメーターは、前のセクションで作成した PyTest フィクスチャを参照しています。次に、各変圧器のテストケースを実装します。次のパターンを使用します。
入力 DataFrame を作成します。
予想される出力 DataFrame を作成します。
ステップ1の入力 DataFrame を変換器メソッドに渡します。
ステップ3の出力を、ステップ2からの予想出力と比較します。
# test/test_transformers.py from project.transformers import add_rider_age, calc_bike_facts, calc_month_facts from snowflake.snowpark.types import StructType, StructField, IntegerType, FloatType def test_add_rider_age(session: Session): input = session.create_dataframe( [ [1980], [1995], [2000] ], schema=StructType([StructField("BIRTH_YEAR", IntegerType())]) ) expected = session.create_dataframe( [ [1980, 43], [1995, 28], [2000, 23] ], schema=StructType([StructField("BIRTH_YEAR", IntegerType()), StructField("RIDER_AGE", IntegerType())]) ) actual = add_rider_age(input) assert expected.collect() == actual.collect() def test_calc_bike_facts(session: Session): input = session.create_dataframe([ [1, 10, 20], [1, 5, 30], [2, 20, 50], [2, 10, 60] ], schema=StructType([ StructField("BIKEID", IntegerType()), StructField("TRIPDURATION", IntegerType()), StructField("RIDER_AGE", IntegerType()) ]) ) expected = session.create_dataframe([ [1, 2, 7.5, 25.0], [2, 2, 15.0, 55.0], ], schema=StructType([ StructField("BIKEID", IntegerType()), StructField("COUNT", IntegerType()), StructField("AVG_TRIPDURATION", FloatType()), StructField("AVG_RIDER_AGE", FloatType()) ]) ) actual = calc_bike_facts(input) assert expected.collect() == actual.collect() def test_calc_month_facts(session: Session): from patches import patch_to_timestamp input = session.create_dataframe( data=[ ['2018-03-01 09:47:00.000 +0000', 1, 10, 15], ['2018-03-01 09:47:14.000 +0000', 2, 20, 12], ['2018-04-01 09:47:04.000 +0000', 3, 6, 30] ], schema=['STARTTIME', 'BIKE_ID', 'TRIPDURATION', 'RIDER_AGE'] ) expected = session.create_dataframe( data=[ ['Mar', 2, 15, 13.5], ['Apr', 1, 6, 30.0] ], schema=['MONTH', 'COUNT', 'AVG_TRIPDURATION', 'AVG_RIDER_AGE'] ) actual = calc_month_facts(input) assert expected.collect() == actual.collect()
これで PyTest を実行して、すべてのユニットテストを実行できるようになりました。
pytest test/test_transformers.py
ストアドプロシージャ用の統合テストを追加する¶
DataFrame 変換器メソッド用のユニットテストができたので、ストアドプロシージャ用の統合テストを追加しましょう。テストケースは、以下のパターンに従います。
ストアドプロシージャへの入力データを表すテーブルを作成します。
ストアドプロシージャの2つの出力テーブルの予想されるコンテンツで2つの DataFrames を作成します。
ストアドプロシージャを呼び出します。
実際の出力テーブルをステップ2の DataFrames と比較します。
クリーンアップ: ステップ1の入力テーブルとステップ3の出力テーブルを削除します。
test
ディレクトリの下に test_sproc.py
というPythonファイルを作成します。
プロジェクトディレクトリからストアドプロシージャハンドラーをインポートし、テストケースを作成します。
# test/test_sproc.py
from project.sproc import create_fact_tables
def test_create_fact_tables(session):
...
入力テーブルの作成から始めて、テストケースを実装します。
# test/test_sproc.py
from project.sproc import create_fact_tables
from snowflake.snowpark.types import *
def test_create_fact_tables(session):
DB = 'CITIBIKE'
SCHEMA = 'TEST'
# Set up source table
tbl = session.create_dataframe(
data=[
[1983, '2018-03-01 09:47:00.000 +0000', 551, 30958],
[1988, '2018-03-01 09:47:01.000 +0000', 242, 19278],
[1992, '2018-03-01 09:47:01.000 +0000', 768, 18461],
[1980, '2018-03-01 09:47:03.000 +0000', 690, 15533],
[1991, '2018-03-01 09:47:03.000 +0000', 490, 32449],
[1959, '2018-03-01 09:47:04.000 +0000', 457, 29411],
[1971, '2018-03-01 09:47:08.000 +0000', 279, 28015],
[1964, '2018-03-01 09:47:09.000 +0000', 546, 15148],
[1983, '2018-03-01 09:47:11.000 +0000', 358, 16967],
[1985, '2018-03-01 09:47:12.000 +0000', 848, 20644],
[1984, '2018-03-01 09:47:14.000 +0000', 295, 16365]
],
schema=['BIRTH_YEAR', 'STARTTIME', 'TRIPDURATION', 'BIKEID'],
)
tbl.write.mode('overwrite').save_as_table([DB, SCHEMA, 'TRIPS_TEST'], mode='overwrite')
次に、 DataFrames を予想される出力テーブル用に作成します。
# test/test_sproc.py
from project.sproc import create_fact_tables
from snowflake.snowpark.types import *
def test_create_fact_tables(session):
DB = 'CITIBIKE'
SCHEMA = 'TEST'
# Set up source table
tbl = session.create_dataframe(
data=[
[1983, '2018-03-01 09:47:00.000 +0000', 551, 30958],
[1988, '2018-03-01 09:47:01.000 +0000', 242, 19278],
[1992, '2018-03-01 09:47:01.000 +0000', 768, 18461],
[1980, '2018-03-01 09:47:03.000 +0000', 690, 15533],
[1991, '2018-03-01 09:47:03.000 +0000', 490, 32449],
[1959, '2018-03-01 09:47:04.000 +0000', 457, 29411],
[1971, '2018-03-01 09:47:08.000 +0000', 279, 28015],
[1964, '2018-03-01 09:47:09.000 +0000', 546, 15148],
[1983, '2018-03-01 09:47:11.000 +0000', 358, 16967],
[1985, '2018-03-01 09:47:12.000 +0000', 848, 20644],
[1984, '2018-03-01 09:47:14.000 +0000', 295, 16365]
],
schema=['BIRTH_YEAR', 'STARTTIME', 'TRIPDURATION', 'BIKEID'],
)
tbl.write.mode('overwrite').save_as_table([DB, SCHEMA, 'TRIPS_TEST'], mode='overwrite')
# Expected values
n_rows_expected = 12
bike_facts_expected = session.create_dataframe(
data=[
[30958, 1, 551.0, 40.0],
[19278, 1, 242.0, 35.0],
[18461, 1, 768.0, 31.0],
[15533, 1, 690.0, 43.0],
[32449, 1, 490.0, 32.0],
[29411, 1, 457.0, 64.0],
[28015, 1, 279.0, 52.0],
[15148, 1, 546.0, 59.0],
[16967, 1, 358.0, 40.0],
[20644, 1, 848.0, 38.0],
[16365, 1, 295.0, 39.0]
],
schema=StructType([
StructField("BIKEID", IntegerType()),
StructField("COUNT", IntegerType()),
StructField("AVG_TRIPDURATION", FloatType()),
StructField("AVG_RIDER_AGE", FloatType())
])
).collect()
month_facts_expected = session.create_dataframe(
data=[['Mar', 11, 502.18182, 43.00000]],
schema=StructType([
StructField("MONTH", StringType()),
StructField("COUNT", IntegerType()),
StructField("AVG_TRIPDURATION", DecimalType()),
StructField("AVG_RIDER_AGE", DecimalType())
])
).collect()
最後に、ストアドプロシージャを呼び出し、出力テーブルを読み込みます。実際のテーブルと DataFrame のコンテンツを比較します。
# test/test_sproc.py
from project.sproc import create_fact_tables
from snowflake.snowpark.types import *
def test_create_fact_tables(session):
DB = 'CITIBIKE'
SCHEMA = 'TEST'
# Set up source table
tbl = session.create_dataframe(
data=[
[1983, '2018-03-01 09:47:00.000 +0000', 551, 30958],
[1988, '2018-03-01 09:47:01.000 +0000', 242, 19278],
[1992, '2018-03-01 09:47:01.000 +0000', 768, 18461],
[1980, '2018-03-01 09:47:03.000 +0000', 690, 15533],
[1991, '2018-03-01 09:47:03.000 +0000', 490, 32449],
[1959, '2018-03-01 09:47:04.000 +0000', 457, 29411],
[1971, '2018-03-01 09:47:08.000 +0000', 279, 28015],
[1964, '2018-03-01 09:47:09.000 +0000', 546, 15148],
[1983, '2018-03-01 09:47:11.000 +0000', 358, 16967],
[1985, '2018-03-01 09:47:12.000 +0000', 848, 20644],
[1984, '2018-03-01 09:47:14.000 +0000', 295, 16365]
],
schema=['BIRTH_YEAR', 'STARTTIME', 'TRIPDURATION', 'BIKEID'],
)
tbl.write.mode('overwrite').save_as_table([DB, SCHEMA, 'TRIPS_TEST'], mode='overwrite')
# Expected values
n_rows_expected = 12
bike_facts_expected = session.create_dataframe(
data=[
[30958, 1, 551.0, 40.0],
[19278, 1, 242.0, 35.0],
[18461, 1, 768.0, 31.0],
[15533, 1, 690.0, 43.0],
[32449, 1, 490.0, 32.0],
[29411, 1, 457.0, 64.0],
[28015, 1, 279.0, 52.0],
[15148, 1, 546.0, 59.0],
[16967, 1, 358.0, 40.0],
[20644, 1, 848.0, 38.0],
[16365, 1, 295.0, 39.0]
],
schema=StructType([
StructField("BIKEID", IntegerType()),
StructField("COUNT", IntegerType()),
StructField("AVG_TRIPDURATION", FloatType()),
StructField("AVG_RIDER_AGE", FloatType())
])
).collect()
month_facts_expected = session.create_dataframe(
data=[['Mar', 11, 502.18182, 43.00000]],
schema=StructType([
StructField("MONTH", StringType()),
StructField("COUNT", IntegerType()),
StructField("AVG_TRIPDURATION", DecimalType()),
StructField("AVG_RIDER_AGE", DecimalType())
])
).collect()
# Call sproc, get actual values
n_rows_actual = create_fact_tables(session, 'TRIPS_TEST')
bike_facts_actual = session.table([DB, SCHEMA, 'bike_facts']).collect()
month_facts_actual = session.table([DB, SCHEMA, 'month_facts']).collect()
# Comparisons
assert n_rows_expected == n_rows_actual
assert bike_facts_expected == bike_facts_actual
assert month_facts_expected == month_facts_actual
テストケースを実行するには、端末から pytest
を実行します。
pytest test/test_sproc.py
プロジェクト内のすべてのテストを実行するには、他のオプションなしで pytest
を実行します。
pytest
ローカルテストを構成する¶
この時点で、 PyTest 個の DataFrame 変換器とストアドプロシージャ用のテストスイートができました。各テストケースでは、 Session
フィクスチャを使用してSnowflakeアカウントに接続し、Snowpark Python API から SQL を送信し、応答を取得します。
あるいは、ローカルテストフレームワークを使用して、Snowflakeに接続せずにローカルで変換を実行することもできます。大規模なテストスイートでは、これによりテスト実行の大幅な高速化が期待できます。このセクションでは、ローカルテストフレームワークの機能を使用するようにテストスイートを更新する方法を示します。
PyTest
Session
フィクスチャを更新することから始めます。PyTest にコマンドラインオプションを追加して、ローカルとライブのテストモードを切り替えます。# test/conftest.py import pytest from project.utils import get_env_var_config from snowflake.snowpark.session import Session def pytest_addoption(parser): parser.addoption("--snowflake-session", action="store", default="live") @pytest.fixture(scope='module') def session(request) -> Session: if request.config.getoption('--snowflake-session') == 'local': return Session.builder.configs({'local_testing': True}).create() else: return Session.builder.configs(get_env_var_config()).create()
たとえば、
calc_month_facts()
変換器で使用されるmonthname()
関数のように、すべての組み込み関数がローカルテストフレームワークでサポートされているわけではないため、まずこのメソッドにパッチを適用する必要があります。testsディレクトリの下にpatches.py
という名前のファイルを作成します。このファイルに次のコードを貼り付けます。from snowflake.snowpark.mock.functions import patch from snowflake.snowpark.functions import monthname from snowflake.snowpark.mock.snowflake_data_type import ColumnEmulator, ColumnType from snowflake.snowpark.types import StringType import datetime import calendar @patch(monthname) def patch_monthname(column: ColumnEmulator) -> ColumnEmulator: ret_column = ColumnEmulator(data=[ calendar.month_abbr[datetime.datetime.strptime(row, '%Y-%m-%d %H:%M:%S.%f %z').month] for row in column]) ret_column.sf_type = ColumnType(StringType(), True) return ret_column
上のパッチは、
column
という1つのパラメーターを受け取りますが、これは列内にデータ行を含むpandas.Series
のようなオブジェクトです。次に、Pythonモジュールdatetime
とcalendar
のメソッドの組み合わせを使って、組み込みのmonthname()
列の機能をエミュレートします。最後に、組み込みメソッドは月に対応する文字列(「Jan」、「Feb」、「Mar」など)を返すので、戻り値の型をString
に設定します。次に、このメソッドを DataFrame 変換器とストアドプロシージャ用のテストにインポートします。
# test/test_transformers.py # No changes to the other unit test methods def test_calc_month_facts(request, session): # Add conditional to include the patch if local testing is being used if request.config.getoption('--snowflake-session') == 'local': from patches import patch_monthname # No other changes
ローカルフラグで
pytest
を再実行します。pytest test/test_transformers.py --snowflake-session local
次に、同じパッチをストアドプロシージャのテストに適用します。
#test/test_sproc.py def test_create_fact_tables(request, session): # Add conditional to include the patch if local testing is being used if request.config.getoption('--snowflake-session') == 'local': from patches import patch_monthname # No other changes required
ローカルフラグでpytestを再実行します。
pytest test/test_sproc.py --snowflake-session local
最後に、完全なテストスイートをローカルで実行した場合と、ライブ接続で実行した場合の時間を比較してみましょう。
time
コマンドを使って、両方のコマンドにかかった時間を計測します。まずはライブ接続から始めましょう。time pytest
このケースでは、テストスイートの実行には7.89秒かかりました。(正確な時間は、お使いのコンピューター、ネットワーク接続、その他の要因によって異なる場合があります。)
=================================== test session starts ========================== platform darwin -- Python 3.9.18, pytest-7.4.3, pluggy-1.3.0 rootdir: /Users/jfreeberg/Desktop/snowpark-testing-tutorial configfile: pytest.ini collected 4 items test/test_sproc.py . [ 25%] test/test_transformers.py ... [100%] =================================== 4 passed in 6.86s ================================= pytest 1.63s user 1.86s system 44% cpu 7.893 total
次は、ローカルテストフレームワークで試してみましょう。
time pytest --snowflake-session local
ローカルテストフレームワークのテストスイートでは、実行にかかった時間はわずか1秒でした。
================================== test session starts ================================ platform darwin -- Python 3.9.18, pytest-7.4.3, pluggy-1.3.0 rootdir: /Users/jfreeberg/Desktop/snowpark-testing-tutorial configfile: pytest.ini collected 4 items test/test_sproc.py . [ 25%] test/test_transformers.py ... [100%] =================================== 4 passed in 0.10s ================================== pytest --snowflake-session local 1.37s user 1.70s system 281% cpu 1.093 total
詳細¶
これで完了しました。上出来です。
このチュートリアルでは、Python Snowparkのコードをテストする方法についてエンドツーエンドの視点が得られました。その際、次も学びました。
PyTest フィクスチャを作成し、ユニットテストと統合テストを追加しました。
詳細については、 Snowpark Pythonでのテストの記述 をご参照ください。
ローカルテストを構成しました
詳細については、 ローカルテストフレームワーク をご参照ください。