チュートリアル: Python Snowparkのテスト

紹介

このチュートリアルでは、Snowpark Pythonコードのテストの基本を紹介します。

学習内容

このチュートリアルでは、次の方法を学習します。

  • Snowflakeに接続した状態でSnowparkコードをテストします。

    PyTest のような標準的なテストユーティリティを使用して、Snowpark Python UDFs、 DataFrame 変換、ストアドプロシージャをテストすることができます。

  • ローカルテストフレームワークを使用して、Snowflakeアカウントに接続せずにローカルでSnowpark Python DataFrames をテストします。

    コードの変更を展開する前に、ローカルテストフレームワークを使って、開発マシン上でローカルテストを行うことができます。

前提条件

ローカルテストフレームワークを使用するには

  • バージョン1.11.1以上のSnowpark Pythonライブラリを使用する必要があります。

  • サポートされているPythonのバージョンは次のとおりです。

    • 3.8

    • 3.9

    • 3.10

    • 3.11

プロジェクトを設定する

このセクションでは、プロジェクトのリポジトリをクローニングし、チュートリアルに必要な環境を設定します。

  1. プロジェクトリポジトリをクローニングする。

    git clone https://github.com/Snowflake-Labs/sftutorial-snowpark-testing
    
    Copy

    gitがインストールされていない場合は、リポジトリページに行き、 Code » Download Contents をクリックしてコンテンツをダウンロードしてください。

  2. アカウント認証情報を使って環境変数を設定します。Snowpark API は、これらを使用してSnowflakeアカウントを認証します。

    # Linux/MacOS
    export SNOWSQL_ACCOUNT=<replace with your account identifer>
    export SNOWSQL_USER=<replace with your username>
    export SNOWSQL_ROLE=<replace with your role>
    export SNOWSQL_PWD=<replace with your password>
    export SNOWSQL_DATABASE=<replace with your database>
    export SNOWSQL_SCHEMA=<replace with your schema>
    export SNOWSQL_WAREHOUSE=<replace with your warehouse>
    
    Copy
    # Windows/PowerShell
    $env:SNOWSQL_ACCOUNT = "<replace with your account identifer>"
    $env:SNOWSQL_USER = "<replace with your username>"
    $env:SNOWSQL_ROLE = "<replace with your role>"
    $env:SNOWSQL_PWD = "<replace with your password>"
    $env:SNOWSQL_DATABASE = "<replace with your database>"
    $env:SNOWSQL_SCHEMA = "<replace with your schema>"
    $env:SNOWSQL_WAREHOUSE = "<replace with your warehouse>"
    
    Copy

    オプション: このenv varを永久に設定するには、bashプロファイルを編集するか(Linux/MacOS の場合)、 System Properties メニューを使います(Windowsの場合)。

  3. Anacondaを使用してconda環境を作成し、アクティブ化します。

    conda env create --file environment.yml
    conda activate snowpark-testing
    
    Copy
  4. setup/create_table.py を実行してサンプルテーブルを作成します。このPythonスクリプトは CITIBIKE というデータベース、PUBLIC というスキーマ、TRIPS という小さなテーブルを作成します。

    python setup/create_table.py
    
    Copy

これで次のセクションに進む準備が整いました。このセクションでは、以下を実行しました。

  • チュートリアルリポジトリをクローニングした。

  • アカウント情報で環境変数を作成した。

  • プロジェクト用にconda環境を作成した。

  • Snowpark API を使用してSnowflakeに接続し、サンプルデータベース、スキーマ、およびテーブルを作成した。

ストアドプロシージャを試す

サンプルプロジェクトには、ストアドプロシージャハンドラー(sproc.py)と3つの DataFrames 変換メソッド(transformers.py)が含まれています。ストアドプロシージャハンドラーは UDF および DataFrame 変換器を使用して、ソーステーブル CITIBIKE.PUBLIC.TRIPS から読み取り、2つのファクトテーブル: MONTH_FACTSBIKE_FACTS を作成します。

このコマンドを実行することで、コマンドラインからストアドプロシージャを実行できます。

python project/sproc.py
Copy

プロジェクトに慣れたところで、次のセクションではtestディレクトリを設定し、Snowflakeセッション用の PyTest フィクスチャを作成します。

Snowflakeセッション用の PyTest フィクスチャを作成する

PyTest フィクスチャ は、テスト(またはテストのモジュール)の前に実行される関数で、通常はテストにデータや接続を提供するためのものです。このプロジェクトでは、Snowpark Session オブジェクトを返す PyTest フィクスチャを作成します。テストケースでは、このセッションを使用してSnowflakeに接続します。

  1. プロジェクトルートディレクトリの下に test ディレクトリを作成します。

    mkdir test
    
    Copy
  2. test ディレクトリの下に、 conftest.py という新しいPythonファイルを作成します。 conftest.py 内で、 Session オブジェクト用の PyTest フィクスチャを作成します。

    import pytest
    from project.utils import get_env_var_config
    from snowflake.snowpark.session import Session
    
    @pytest.fixture
    def session() -> Session:
        return Session.builder.configs(get_env_var_config()).create()
    
    Copy

DataFrame 変換器用のユニットテストを追加する

  1. test ディレクトリで、 test_transformers.py という新しいPythonファイルを作成します。

  2. test_transformers.py ファイルで、変換器メソッドをインポートします。

    # test/test_transformers.py
    
    from project.transformers import add_rider_age, calc_bike_facts, calc_month_facts
    
    Copy
  3. 次に、これらの変換器用のユニットテストを作成します。通常の規則では、 test_<というメソッド名 > を持つメソッドをテスト毎に作成することになっています。このケースでは、テストは次のようになります。

    # test/test_transformers.py
    from project.transformers import add_rider_age, calc_bike_facts, calc_month_facts
    def test_add_rider_age(session):
        ...
    
    def test_calc_bike_facts(session):
        ...
    
    
    def test_calc_month_facts(session):
        ...
    
    Copy

    各テストケースの session パラメーターは、前のセクションで作成した PyTest フィクスチャを参照しています。

  4. 次に、各変圧器のテストケースを実装します。次のパターンを使用します。

    1. 入力 DataFrame を作成します。

    2. 予想される出力 DataFrame を作成します。

    3. ステップ1の入力 DataFrame を変換器メソッドに渡します。

    4. ステップ3の出力を、ステップ2からの予想出力と比較します。

    # test/test_transformers.py
    from project.transformers import add_rider_age, calc_bike_facts, calc_month_facts
    from snowflake.snowpark.types import StructType, StructField, IntegerType, FloatType
    
    def test_add_rider_age(session: Session):
        input = session.create_dataframe(
            [
                [1980],
                [1995],
                [2000]
            ],
            schema=StructType([StructField("BIRTH_YEAR", IntegerType())])
        )
    
        expected = session.create_dataframe(
            [
                [1980, 43],
                [1995, 28],
                [2000, 23]
            ],
            schema=StructType([StructField("BIRTH_YEAR", IntegerType()), StructField("RIDER_AGE", IntegerType())])
        )
    
        actual = add_rider_age(input)
        assert expected.collect() == actual.collect()
    
    
    def test_calc_bike_facts(session: Session):
        input = session.create_dataframe([
                [1, 10, 20],
                [1, 5, 30],
                [2, 20, 50],
                [2, 10, 60]
            ],
            schema=StructType([
                StructField("BIKEID", IntegerType()),
                StructField("TRIPDURATION", IntegerType()),
                StructField("RIDER_AGE", IntegerType())
            ])
        )
    
        expected = session.create_dataframe([
                [1, 2, 7.5, 25.0],
                [2, 2, 15.0, 55.0],
            ],
            schema=StructType([
                StructField("BIKEID", IntegerType()),
                StructField("COUNT", IntegerType()),
                StructField("AVG_TRIPDURATION", FloatType()),
                StructField("AVG_RIDER_AGE", FloatType())
            ])
        )
    
        actual = calc_bike_facts(input)
        assert expected.collect() == actual.collect()
    
    
    def test_calc_month_facts(session: Session):
        from patches import patch_to_timestamp
    
        input = session.create_dataframe(
            data=[
                ['2018-03-01 09:47:00.000 +0000', 1, 10,  15],
                ['2018-03-01 09:47:14.000 +0000', 2, 20, 12],
                ['2018-04-01 09:47:04.000 +0000', 3, 6,  30]
            ],
            schema=['STARTTIME', 'BIKE_ID', 'TRIPDURATION', 'RIDER_AGE']
        )
    
        expected = session.create_dataframe(
            data=[
                ['Mar', 2, 15, 13.5],
                ['Apr', 1, 6, 30.0]
            ],
            schema=['MONTH', 'COUNT', 'AVG_TRIPDURATION', 'AVG_RIDER_AGE']
        )
    
        actual = calc_month_facts(input)
    
        assert expected.collect() == actual.collect()
    
    Copy
  5. これで PyTest を実行して、すべてのユニットテストを実行できるようになりました。

    pytest test/test_transformers.py
    
    Copy

ストアドプロシージャ用の統合テストを追加する

DataFrame 変換器メソッド用のユニットテストができたので、ストアドプロシージャ用の統合テストを追加しましょう。テストケースは、以下のパターンに従います。

  1. ストアドプロシージャへの入力データを表すテーブルを作成します。

  2. ストアドプロシージャの2つの出力テーブルの予想されるコンテンツで2つの DataFrames を作成します。

  3. ストアドプロシージャを呼び出します。

  4. 実際の出力テーブルをステップ2の DataFrames と比較します。

  5. クリーンアップ: ステップ1の入力テーブルとステップ3の出力テーブルを削除します。

test ディレクトリの下に test_sproc.py というPythonファイルを作成します。

プロジェクトディレクトリからストアドプロシージャハンドラーをインポートし、テストケースを作成します。

# test/test_sproc.py
from project.sproc import create_fact_tables

def test_create_fact_tables(session):
    ...
Copy

入力テーブルの作成から始めて、テストケースを実装します。

# test/test_sproc.py
from project.sproc import create_fact_tables
from snowflake.snowpark.types import *

def test_create_fact_tables(session):
    DB = 'CITIBIKE'
    SCHEMA = 'TEST'

    # Set up source table
    tbl = session.create_dataframe(
        data=[
            [1983, '2018-03-01 09:47:00.000 +0000', 551, 30958],
            [1988, '2018-03-01 09:47:01.000 +0000', 242, 19278],
            [1992, '2018-03-01 09:47:01.000 +0000', 768, 18461],
            [1980, '2018-03-01 09:47:03.000 +0000', 690, 15533],
            [1991, '2018-03-01 09:47:03.000 +0000', 490, 32449],
            [1959, '2018-03-01 09:47:04.000 +0000', 457, 29411],
            [1971, '2018-03-01 09:47:08.000 +0000', 279, 28015],
            [1964, '2018-03-01 09:47:09.000 +0000', 546, 15148],
            [1983, '2018-03-01 09:47:11.000 +0000', 358, 16967],
            [1985, '2018-03-01 09:47:12.000 +0000', 848, 20644],
            [1984, '2018-03-01 09:47:14.000 +0000', 295, 16365]
        ],
        schema=['BIRTH_YEAR', 'STARTTIME', 'TRIPDURATION',    'BIKEID'],
    )

    tbl.write.mode('overwrite').save_as_table([DB, SCHEMA, 'TRIPS_TEST'], mode='overwrite')
Copy

次に、 DataFrames を予想される出力テーブル用に作成します。

# test/test_sproc.py
from project.sproc import create_fact_tables
from snowflake.snowpark.types import *

def test_create_fact_tables(session):
    DB = 'CITIBIKE'
    SCHEMA = 'TEST'

    # Set up source table
    tbl = session.create_dataframe(
        data=[
            [1983, '2018-03-01 09:47:00.000 +0000', 551, 30958],
            [1988, '2018-03-01 09:47:01.000 +0000', 242, 19278],
            [1992, '2018-03-01 09:47:01.000 +0000', 768, 18461],
            [1980, '2018-03-01 09:47:03.000 +0000', 690, 15533],
            [1991, '2018-03-01 09:47:03.000 +0000', 490, 32449],
            [1959, '2018-03-01 09:47:04.000 +0000', 457, 29411],
            [1971, '2018-03-01 09:47:08.000 +0000', 279, 28015],
            [1964, '2018-03-01 09:47:09.000 +0000', 546, 15148],
            [1983, '2018-03-01 09:47:11.000 +0000', 358, 16967],
            [1985, '2018-03-01 09:47:12.000 +0000', 848, 20644],
            [1984, '2018-03-01 09:47:14.000 +0000', 295, 16365]
        ],
        schema=['BIRTH_YEAR', 'STARTTIME', 'TRIPDURATION',    'BIKEID'],
    )

    tbl.write.mode('overwrite').save_as_table([DB, SCHEMA, 'TRIPS_TEST'], mode='overwrite')

    # Expected values
    n_rows_expected = 12
    bike_facts_expected = session.create_dataframe(
        data=[
            [30958, 1, 551.0, 40.0],
            [19278, 1, 242.0, 35.0],
            [18461, 1, 768.0, 31.0],
            [15533, 1, 690.0, 43.0],
            [32449, 1, 490.0, 32.0],
            [29411, 1, 457.0, 64.0],
            [28015, 1, 279.0, 52.0],
            [15148, 1, 546.0, 59.0],
            [16967, 1, 358.0, 40.0],
            [20644, 1, 848.0, 38.0],
            [16365, 1, 295.0, 39.0]
        ],
        schema=StructType([
            StructField("BIKEID", IntegerType()),
            StructField("COUNT", IntegerType()),
            StructField("AVG_TRIPDURATION", FloatType()),
            StructField("AVG_RIDER_AGE", FloatType())
        ])
    ).collect()

    month_facts_expected = session.create_dataframe(
        data=[['Mar', 11, 502.18182, 43.00000]],
        schema=StructType([
            StructField("MONTH", StringType()),
            StructField("COUNT", IntegerType()),
            StructField("AVG_TRIPDURATION", DecimalType()),
            StructField("AVG_RIDER_AGE", DecimalType())
        ])
    ).collect()
Copy

最後に、ストアドプロシージャを呼び出し、出力テーブルを読み込みます。実際のテーブルと DataFrame のコンテンツを比較します。

# test/test_sproc.py
from project.sproc import create_fact_tables
from snowflake.snowpark.types import *

def test_create_fact_tables(session):
    DB = 'CITIBIKE'
    SCHEMA = 'TEST'

    # Set up source table
    tbl = session.create_dataframe(
        data=[
            [1983, '2018-03-01 09:47:00.000 +0000', 551, 30958],
            [1988, '2018-03-01 09:47:01.000 +0000', 242, 19278],
            [1992, '2018-03-01 09:47:01.000 +0000', 768, 18461],
            [1980, '2018-03-01 09:47:03.000 +0000', 690, 15533],
            [1991, '2018-03-01 09:47:03.000 +0000', 490, 32449],
            [1959, '2018-03-01 09:47:04.000 +0000', 457, 29411],
            [1971, '2018-03-01 09:47:08.000 +0000', 279, 28015],
            [1964, '2018-03-01 09:47:09.000 +0000', 546, 15148],
            [1983, '2018-03-01 09:47:11.000 +0000', 358, 16967],
            [1985, '2018-03-01 09:47:12.000 +0000', 848, 20644],
            [1984, '2018-03-01 09:47:14.000 +0000', 295, 16365]
        ],
        schema=['BIRTH_YEAR', 'STARTTIME', 'TRIPDURATION',    'BIKEID'],
    )

    tbl.write.mode('overwrite').save_as_table([DB, SCHEMA, 'TRIPS_TEST'], mode='overwrite')

    # Expected values
    n_rows_expected = 12
    bike_facts_expected = session.create_dataframe(
        data=[
            [30958, 1, 551.0, 40.0],
            [19278, 1, 242.0, 35.0],
            [18461, 1, 768.0, 31.0],
            [15533, 1, 690.0, 43.0],
            [32449, 1, 490.0, 32.0],
            [29411, 1, 457.0, 64.0],
            [28015, 1, 279.0, 52.0],
            [15148, 1, 546.0, 59.0],
            [16967, 1, 358.0, 40.0],
            [20644, 1, 848.0, 38.0],
            [16365, 1, 295.0, 39.0]
        ],
        schema=StructType([
            StructField("BIKEID", IntegerType()),
            StructField("COUNT", IntegerType()),
            StructField("AVG_TRIPDURATION", FloatType()),
            StructField("AVG_RIDER_AGE", FloatType())
        ])
    ).collect()

    month_facts_expected = session.create_dataframe(
        data=[['Mar', 11, 502.18182, 43.00000]],
        schema=StructType([
            StructField("MONTH", StringType()),
            StructField("COUNT", IntegerType()),
            StructField("AVG_TRIPDURATION", DecimalType()),
            StructField("AVG_RIDER_AGE", DecimalType())
        ])
    ).collect()

    # Call sproc, get actual values
    n_rows_actual = create_fact_tables(session, 'TRIPS_TEST')
    bike_facts_actual = session.table([DB, SCHEMA, 'bike_facts']).collect()
    month_facts_actual = session.table([DB, SCHEMA, 'month_facts']).collect()

    # Comparisons
    assert n_rows_expected == n_rows_actual
    assert bike_facts_expected == bike_facts_actual
    assert month_facts_expected ==  month_facts_actual
Copy

テストケースを実行するには、端末から pytest を実行します。

pytest test/test_sproc.py
Copy

プロジェクト内のすべてのテストを実行するには、他のオプションなしで pytest を実行します。

pytest
Copy

ローカルテストを構成する

この時点で、 PyTest 個の DataFrame 変換器とストアドプロシージャ用のテストスイートができました。各テストケースでは、 Session フィクスチャを使用してSnowflakeアカウントに接続し、Snowpark Python API から SQL を送信し、応答を取得します。

あるいは、ローカルテストフレームワークを使用して、Snowflakeに接続せずにローカルで変換を実行することもできます。大規模なテストスイートでは、これによりテスト実行の大幅な高速化が期待できます。このセクションでは、ローカルテストフレームワークの機能を使用するようにテストスイートを更新する方法を示します。

  1. PyTest Session フィクスチャを更新することから始めます。PyTest にコマンドラインオプションを追加して、ローカルとライブのテストモードを切り替えます。

    # test/conftest.py
    
    import pytest
    from project.utils import get_env_var_config
    from snowflake.snowpark.session import Session
    
    def pytest_addoption(parser):
        parser.addoption("--snowflake-session", action="store", default="live")
    
    @pytest.fixture(scope='module')
    def session(request) -> Session:
        if request.config.getoption('--snowflake-session') == 'local':
            return Session.builder.configs('local_testing', True).create()
        else:
            return Session.builder.configs(get_env_var_config()).create()
    
    Copy
  2. たとえば、 calc_month_facts() 変換器で使用される monthname() 関数のように、すべての組み込み関数がローカルテストフレームワークでサポートされているわけではないため、まずこのメソッドにパッチを適用する必要があります。testsディレクトリの下に patches.py という名前のファイルを作成します。このファイルに次のコードを貼り付けます。

    from snowflake.snowpark.mock.functions import patch
    from snowflake.snowpark.functions import monthname
    from snowflake.snowpark.mock.snowflake_data_type import ColumnEmulator, ColumnType
    from snowflake.snowpark.types import StringType
    import datetime
    import calendar
    
    @patch(monthname)
    def patch_monthname(column: ColumnEmulator) -> ColumnEmulator:
        ret_column = ColumnEmulator(data=[
            calendar.month_abbr[datetime.datetime.strptime(row, '%Y-%m-%d %H:%M:%S.%f %z').month]
            for row in column])
        ret_column.sf_type = ColumnType(StringType(), True)
        return ret_column
    
    Copy

    上のパッチは、 column という1つのパラメーターを受け取りますが、これは列内にデータ行を含む pandas.Series のようなオブジェクトです。次に、Pythonモジュール datetimecalendar のメソッドの組み合わせを使って、組み込みの monthname() 列の機能をエミュレートします。最後に、組み込みメソッドは月に対応する文字列(「Jan」、「Feb」、「Mar」など)を返すので、戻り値の型を String に設定します。

  3. 次に、このメソッドを DataFrame 変換器とストアドプロシージャ用のテストにインポートします。

    # test/test_transformers.py
    
    # No changes to the other unit test methods
    
    def test_calc_month_facts(request, session):
        # Add conditional to include the patch if local testing is being used
        if request.config.getoption('--snowflake-session') == 'local':
            from patches import patch_monthname
    
        # No other changes
    
    Copy
  4. ローカルフラグで pytest を再実行します。

    pytest test/test_transformers.py --snowflake-session local
    
    Copy
  5. 次に、同じパッチをストアドプロシージャのテストに適用します。

    #test/test_sproc.py
    
    def test_create_fact_tables(request, session):
        # Add conditional to include the patch if local testing is being used
        if request.config.getoption('--snowflake-session') == 'local':
            from patches import patch_monthname
    
        # No other changes required
    
    Copy
  6. ローカルフラグでpytestを再実行します。

    pytest test/test_sproc.py --snowflake-session local
    
    Copy
  7. 最後に、完全なテストスイートをローカルで実行した場合と、ライブ接続で実行した場合の時間を比較してみましょう。 time コマンドを使って、両方のコマンドにかかった時間を計測します。まずはライブ接続から始めましょう。

    time pytest
    
    Copy

    このケースでは、テストスイートの実行には7.89秒かかりました。(正確な時間は、お使いのコンピューター、ネットワーク接続、その他の要因によって異なる場合があります。)

    =================================== test session starts ==========================
    platform darwin -- Python 3.9.18, pytest-7.4.3, pluggy-1.3.0
    rootdir: /Users/jfreeberg/Desktop/snowpark-testing-tutorial
    configfile: pytest.ini
    collected 4 items
    
    test/test_sproc.py .                                                             [ 25%]
    test/test_transformers.py ...                                                    [100%]
    
    =================================== 4 passed in 6.86s =================================
    pytest  1.63s user 1.86s system 44% cpu 7.893 total
    

    次は、ローカルテストフレームワークで試してみましょう。

    time pytest --snowflake-session local
    
    Copy

    ローカルテストフレームワークのテストスイートでは、実行にかかった時間はわずか1秒でした。

    ================================== test session starts ================================
    platform darwin -- Python 3.9.18, pytest-7.4.3, pluggy-1.3.0
    rootdir: /Users/jfreeberg/Desktop/snowpark-testing-tutorial
    configfile: pytest.ini
    collected 4 items
    
    test/test_sproc.py .                                                             [ 25%]
    test/test_transformers.py ...                                                    [100%]
    
    =================================== 4 passed in 0.10s ==================================
    pytest --snowflake-session local  1.37s user 1.70s system 281% cpu 1.093 total
    

詳細

これで完了しました。上出来です。

このチュートリアルでは、Python Snowparkのコードをテストする方法についてエンドツーエンドの視点が得られました。その際、次も学びました。