Snowpark Pythonでのテストの記述

このトピックでは、Snowflakeに接続した状態でSnowparkコードをテストする方法を説明します。PyTest のような標準的なテストユーティリティを使用して、Snowpark Python UDFs、 DataFrame 変換、ストアドプロシージャをテストすることができます。

このトピックの内容:

徹底的なテストは、意図しない互換性を破る変更を防ぐのに役立ちます。ユニットテストでは、コードの一部が期待通りに動くかどうかを検証します。統合テストは、コンポーネントがエンドツーエンドのユースケースで正しく連動することを確認するのに役立ちます。

このドキュメントの例では、Pythonの最も人気のあるテストフレームワークの1つである PyTest を使用します。追加ガイダンスとベストプラクティスについては、 PyTest ドキュメンテーション をご参照ください。

あるいは、Snowpark Pythonローカルテストフレームワークを使用して、Snowflakeアカウントに接続することなくローカルでSnowpark Python DataFrames を作成して操作することもできます。詳細については、 ローカルテストフレームワーク をご参照ください。

テストの設定

pip install pytest または conda install pytest を実行して、 PyTest をプロジェクトにインストールします。 requirements.txt またはconda環境ファイルに追加することもできます。

ソースコードディレクトリの隣に test ディレクトリを作成し、ユニットテストと統合テストをそこに追加します。例の表示については、 Snowpark Pythonプロジェクトのテンプレート ご参照ください。

Snowparkセッション用の PyTest フィクスチャの作成

PyTest フィクスチャは、テスト(またはテストモジュール)の前に実行され、テストにデータや接続を提供する関数です。このシナリオでは、Snowpark Session オブジェクトを返す PyTest フィクスチャを作成します。

  1. test ディレクトリがない場合は、それを作成します。

  2. conftest.pytest の下に次のコンテンツで作成します。 connection_parameters はSnowflakeアカウントの認証情報があるディクショナリです。ディクショナリフォーマットの詳細については、 セッションの作成 をご参照ください。

  3. ファイルスコープのフィクスチャとしてではなく、モジュールスコープのフィクスチャとして Session フィクスチャを作成します。これは、複数のセッションが作成され、セッションオブジェクトの競合による問題が発生するのを防ぎます。

from snowflake.snowpark.session import Session

@pytest.fixture(scope='module')
def session(request) -> Session:
    connection_parameters = {}
    return Session.builder.configs(...).create()
Copy

UDFs 用のユニットテスト

Pythonの UDF ロジックをテストするには、UDF ハンドラーをPythonの汎用メソッドとしてテストします。

  1. test ディレクトリの下に UDF ユニットテスト用のファイルを作成します。たとえば、ファイル名を test_functions.py とします。

  2. テストするPythonメソッドをインポートします。

  3. 各テストシナリオについて、 test_<scenario_to_test> というPythonメソッドを作成します。

たとえば、次のようなPython UDF ハンドラーになります。

def fahrenheit_to_celsius(temp_f: float) -> float:
    """
    Converts fahrenheit to celsius
    """
    return (float(temp_f) - 32) * (5/9)
Copy

このメソッドをテストファイル(test/test_functions.py)にインポートし、Pythonの汎用メソッドとしてテストすることができます。

import farenheit_to_celsius

def test_farenheit_to_celsius():
    expected = 0.0
    actual = farenheit_to_celsius(32)
    assert expected == actual
Copy

DataFrame 変換用のユニットテスト

DataFrame 変換にユニットテストを追加することで、予期せぬバグやリグレッションを防ぐのに役立ちます。DataFrame ロジックを簡単にテストできるようにするには、変換をPythonメソッドにカプセル化し、変換される DataFrames を入力として受け取り、変換された DataFrames を返します。

次の例では、 mf_df_transformer に変換ロジックが含まれています。Pythonプロジェクトの他のモジュールにインポートして、簡単にテストすることができます。

from snowflake.snowpark.dataframe import DataFrame, col

def my_df_tranformer(df: DataFrame) -> DataFrame:
    return df \
        .with_column('c', df['a']+df['b']) \
        .filter(col('c') > 3)
Copy

この変換をテストするには、次のステップに従います。

  1. test ディレクトリの下に DataFrame テスト用のファイル test_transformers.py を作成します(test/test_transformers.py)。

  2. テストする変換器のテストメソッドを作成します: test_my_df_transformer(session)。ここでの session パラメーターは、前のセクションで作成されたセッションフィクスチャを指します。

  3. セッションフィクスチャを使って、テストメソッド内に入力と予想される出力 DataFrames を作成します。

  4. 入力 DataFrame を変換器に渡し、予想される DataFrame と変換器が返す実際の DataFrame を比較します。

# test/test_transformers.py

import my_df_transformer

def test_my_df_transformer(session):
    input_df = session.create_dataframe([[1,2],[3,4]], ['a', 'b'])
    expected_df = session.create_dataframe([3,4,7], ['a','b','c'])
    actual_df = my_df_transformer(input_df)
    assert input_df.collect() == actual_df.collect()
Copy

ストアドプロシージャ用の統合テスト

ストアドプロシージャハンドラーをテストするには、セッションフィクスチャを使用してストアドプロシージャハンドラーを呼び出します。ストアドプロシージャが ETL パイプラインなどのテーブルから読み取る場合、以下の例に示すように、ストアドプロシージャハンドラーを呼び出す前にそれらのテーブルを作成することができます。このパターンでは、入力データがソース管理内で追跡され、テスト実行中に予期せず変更されることがないようにします。

from project import my_sproc_handler  # import stored proc handler

def test_my_sproc_handler(session: Session):

    # Create input table
    input_tbl = session.create_dataframe(
        data=[...],
        schema=[...],
    )

    input_tbl.write.mode('overwrite').save_as_table(['DB', 'SCHEMA', 'INPUT_TBL'], mode='overwrite')

    # Create expected output dataframe
    expected_df = session.create_dataframe(
        data=[...],
        schema=[...],
    ).collect()

    # Call the stored procedure
    my_sproc_handler()

    # Get actual table
    actual_tbl = session.table(['DB', 'SCHEMA', 'OUTPUT_TBL']).collect()

    # Clean up tables
    session.table(['DB', 'SCHEMA', 'OUTPUT_TBL']).delete()
    session.table(['DB', 'SCHEMA', 'INPUT_TBL']).delete()

    # Compare the actual and expected tables
    assert expected_df == actual_tbl
Copy