ローカルテストフレームワーク¶
このトピックでは、Snowparkライブラリを使用する際にローカルでコードをテストする方法について説明します。
このトピックの内容:
Snowpark Pythonローカルテストフレームワークを使用すると、Snowflakeアカウントに接続することなく、ローカルでSnowpark Python DataFrames を作成および操作できます。ローカルテストフレームワークを使用すると、コードの変更をアカウントに展開する前に、開発マシンまたは CI (継続的統合)パイプラインで DataFrame 操作をローカルにテストできます。API は同じであるため、コードを変更せずに、テストをローカルで実行することも、Snowflakeアカウントに対して実行することもできます。
前提条件¶
ローカルテストフレームワークを使用するには
オプションで依存関係
pandas
があるバージョン1.11.1以上のSnowpark Pythonライブラリを使用する必要があります。pip install "snowflake-snowpark-python[pandas]"
を実行してインストールします。サポートされているPythonのバージョンは次のとおりです。
3.8
3.9
3.10
3.11
セッションの作成とローカルテストの有効化¶
開始するには、Snowpark Session
を作成し、ローカルテスト構成を True
に設定します。
from snowflake.snowpark import Session
session = Session.builder.config('local_testing', True).create()
セッションが作成されたら、それを使って DataFrames の作成と操作ができます。
df = session.create_dataframe([[1,2],[3,4]],['a','b'])
df.with_column('c', df['a']+df['b']).show()
データのロード¶
Pythonのプリミティブ、ファイル、Pandas DataFrames からSnowpark DataFrames を作成できます。これは、テストケースの入力と予想される出力を指定するのに役立ちます。このようにすることで、データがソース管理内に置かれ、テストデータとテストケースの同期の維持が簡単になります。
CSV データの読み込み¶
まず Session.file.put()
を呼び出してメモリ内ステージにファイルを読み込み、次に Session.read()
を使用してコンテンツを読み取ることで、 CSV ファイルをSnowpark DataFrame に読み込むことができます。ファイル data.csv
があり、そのファイルには次のようなコンテンツがあるとします。
col1,col2,col3,col4
1,a,true,1.23
2,b,false,4.56
次のコードを使って data.csv
をSnowpark DataFrame に読み込むことができます。まずファイルをステージに配置する必要があります。そうしないと、ファイルが見つからないというエラーが表示されます。
from snowflake.snowpark.types import StructType, StructField, IntegerType, BooleanType, StringType, DoubleType
# Put file onto stage
session.file.put("data.csv", "@mystage", auto_compress=False)
schema = StructType(
[
StructField("col1", IntegerType()),
StructField("col2", StringType()),
StructField("col3", BooleanType()),
StructField("col4", DoubleType()),
]
)
# with option SKIP_HEADER set to 1, the header will be skipped when the csv file is loaded
dataframe = session.read.schema(schema).option("SKIP_HEADER", 1).csv("@mystage/data.csv")
dataframe.show()
dataframe.show()
の出力は次のとおりです。
-------------------------------------
|"COL1" |"COL2" |"COL3" |"COL4" |
-------------------------------------
|1 |a |True |1.23 |
|2 |b |False |4.56 |
-------------------------------------
Pandasデータの読み込み¶
Pandas DataFrame からSnowpark Python DataFrame を作成するには、 create_dataframe
メソッドを呼び出し、データをPandas DataFrame として渡します。
import pandas as pd
pandas_df = pd.DataFrame(
data={
"col1": pd.Series(["value1", "value2"]),
"col2": pd.Series([1.23, 4.56]),
"col3": pd.Series([123, 456]),
"col4": pd.Series([True, False]),
}
)
dataframe = session.create_dataframe(data=pandas_df)
dataframe.show()
dataframe.show()
の出力は次のとおりです。
-------------------------------------
|"col1" |"col2" |"col3" |"col4" |
-------------------------------------
|value1 |1.23 |123 |True |
|value2 |4.56 |456 |False |
-------------------------------------
Snowpark Python DataFrame は、 DataFrame の to_pandas
メソッドを呼び出すことで、Pandas DataFrame に変換することもできます。
from snowflake.snowpark.types import StructType, StructField, StringType, DoubleType, LongType, BooleanType
dataframe = session.create_dataframe(
data=[
["value1", 1.23, 123, True],
["value2", 4.56, 456, False],
],
schema=StructType([
StructField("col1", StringType()),
StructField("col2", DoubleType()),
StructField("col3", LongType()),
StructField("col4", BooleanType()),
])
)
pandas_dataframe = dataframe.to_pandas()
print(pandas_dataframe.to_string())
print(pandas_dataframe.to_string())
の呼び出しの出力は次のとおりです。
COL1 COL2 COL3 COL4
0 value1 1.23 123 True
1 value2 4.56 456 False
セッションの PyTest フィクスチャの作成¶
PyTest フィクスチャ は、テスト(またはテストのモジュール)の前に実行される関数で、通常はテストにデータや接続を提供します。このケースでは、Snowpark Session
オブジェクトを返すフィクスチャを作成します。まず、 test
ディレクトリがない場合は、それを作成します。次に、 test
ディレクトリに次のコンテンツのファイル conftest.py
を作成します。 connection_parameters
は、Snowflakeアカウントの認証情報があるディクショナリです。ディクショナリフォーマットの詳細については、 セッションの作成 をご参照ください。
# test/conftest.py
import pytest
from snowflake.snowpark.session import Session
def pytest_addoption(parser):
parser.addoption("--snowflake-session", action="store", default="live")
@pytest.fixture(scope='module')
def session(request) -> Session:
if request.config.getoption('--snowflake-session') == 'local':
return Session.builder.config('local_testing', True).create()
else:
return Session.builder.configs(CONNECTION_PARAMETERS).create()
pytest_addoption
の呼び出しにより、 pytest
コマンドに snowflake-session
というコマンドラインオプションが追加されます。 Session
フィクスチャはこのコマンドラインオプションをチェックし、その値に応じてローカルかライブの Session
を作成します。これにより、ローカルモードとライブモードを簡単に切り替えてテストすることができます。
# Using local mode:
pytest --snowflake-session local
# Using live mode
pytest
SQL 操作¶
Session.sql(...)
はローカルテストフレームワークではサポートされていません。可能な限りSnowparkの DataFrame APIs を使用します。 Session.sql(...)
を使用しなければならない場合は、Pythonの unittest.mock.patch
を使用して表形式の戻り値をモックし、指定された Session.sql()
呼び出しから予想される応答をパッチすることができます。
以下の例では、 mock_sql()
は SQL クエリテキストを DataFrame 応答にマッピングします。次の条件ステートメントは、現在のセッションがローカルテストを使っているかどうかをチェックし、そうであれば Session.sql()
メソッドにパッチを適用します。
from unittest import mock
from functools import partial
def test_something(pytestconfig, session):
def mock_sql(session, sql_string): # patch for SQL operations
if sql_string == "select 1,2,3":
return session.create_dataframe([[1,2,3]])
else:
raise RuntimeError(f"Unexpected query execution: {sql_string}")
if pytestconfig.getoption('--snowflake-session') == 'local':
with mock.patch.object(session, 'sql', wraps=partial(mock_sql, session)): # apply patch for SQL operations
assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
else:
assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
ローカルテストが有効な場合、 DataFrame.save_as_table()
で作成されたすべてのテーブルは仮テーブルとしてメモリに保存され、 Session.table()
を使用して取得することができます。サポートされている DataFrame 操作は、通常通りテーブル上で使用できます。
組み込み関数のパッチ¶
snowflake.snowpark.functions
の組み込み関数のすべてが、ローカルテストフレームワークでサポートされているわけではありません。サポートされていない関数を使用する場合は、 snowflake.snowpark.mock
の @patch
デコレーターを使用してパッチを作成する必要があります。
パッチされた関数を定義し実装するには、署名(パラメーターリスト)が組み込み関数のパラメーターと一致している必要があります。ローカルテストフレームワークは、次のルールを使ってパッチされた関数にパラメーターを渡します。
組み込み関数の署名の
ColumnOrName
型のパラメーターでは、ColumnEmulator
がパッチされた関数のパラメーターとして渡されます。ColumnEmulator
は列データを含むpandas.Series
オブジェクトに似ています。組み込み関数の署名の
LiteralType
型のパラメーターについては、リテラル値がパッチされた関数のパラメーターとして渡されます。そうでない場合は、生の値がパッチされた関数のパラメーターとして渡されます。
パッチされた関数の戻り型に関しては、 ColumnEmulator
のインスタンスを返すことが、組み込み関数の Column
の戻り型に対して予想されます。
たとえば、組み込み関数 to_timestamp()
は次のようにパッチすることができます。
import datetime
from snowflake.snowpark.mock import patch, ColumnEmulator, ColumnType
from snowflake.snowpark.functions import to_timestamp
from snowflake.snowpark.types import TimestampType
@patch(to_timestamp)
def mock_to_timestamp(column: ColumnEmulator, format = None) -> ColumnEmulator:
ret_column = ColumnEmulator(data=[datetime.datetime.strptime(row, '%Y-%m-%dT%H:%M:%S%z') for row in column])
ret_column.sf_type = ColumnType(TimestampType(), True)
return ret_column
テストケースのスキップ¶
PyTest テストスイートにローカルテストでうまくサポートされないテストケースが含まれている場合は、 PyTest の mark.skipif
デコレーターを使用してそれらのケースをスキップすることができます。次の例は、前述のようにセッションとパラメーターが構成されていると想定します。条件では local_testing_mode
が local
に設定されているかどうかをチェックし、設定されている場合はテストケースをスキップし、その理由を説明するメッセージを表示します。
import pytest
@pytest.mark.skipif(
condition="config.getvalue('local_testing_mode') == 'local'",
reason="Test case disabled for local testing"
)
def test_case(session):
...
制限事項¶
次の機能はサポートされていません。
生の SQL 文字列と、 SQL 文字列の解析を必要とする操作。たとえば、
session.sql
とDataFrame.filter("col1 > 12")
はサポートされていません。UDFs、 UDTFs、およびストアドプロシージャ
テーブル関数。
AsyncJobs。
ウェアハウス、スキーマ、その他のセッションプロパティの変更などのセッション操作。
Geometry
とGeography
のデータ型。ウィンドウ関数の集約。
# Selecting window function expressions is supported df.select("key", "value", sum_("value").over(), avg("value").over()) # Aggregating window function expressions is NOT supported df.group_by("key").agg([sum_("value"), sum_(sum_("value")).over(window) - sum_("value")])
その他の制限事項は次のとおりです。
Variant
、Array
、Object
のデータ型は、標準の JSON エンコードとデコードでのみサポートされています。{1,2,,3,} のような式は、Snowflakeでは有効な JSON とみなされますが、Pythonの組み込み JSON 関数が使用されるローカルテストでは無効です。モジュールレベルの変数snowflake.snowpark.mock.CUSTOM_JSON_ENCODER
とsnowflake.snowpark.mock.CUSTOM_JSON_DECODER
を指定して、デフォルト設定を上書きすることができます。Snowflakeの関数のサブセット(ウィンドウ関数を含む)のみが実装されます。独自の関数定義を注入する方法については、 組み込み関数のパッチ をご参照ください。
ランク関連関数のパッチは現在サポートされていません。
同じ名前の列を選択すると、1つの列のみが返されます。回避策として、
Column.alias
を使用して列の名前を変更します。df.select(lit(1), lit(1)).show() # col("a"), col("a") #--------- #|"'1'" | #--------- #|1 | #|... | #--------- # Workaround: Column.alias DataFrame.select(lit(1).alias("col1_1"), lit(1).alias("col1_2")) # "col1_1", "col1_2"
Column.cast
を使用した明示的な型キャストには、フォーマット文字列が以下に対してサポートされていないという制限があります。入力:to_decimal
、to_number
、to_numeric
、to_double
、to_date
、to_time
、to_timestamp
および出力:to_char
、to_varchar
、to_binary
。JSON
VariantType
に格納された文字列はDatetime
型に変換できません。Table.merge
とTable.update
については、セッションパラメーターERROR_ON_NONDETERMINISTIC_UPDATE
とERROR_ON_NONDETERMINISTIC_MERGE
がFalse
に設定されているときの動作のみをサポートする実装です。これは、複数結合の場合、マッチした行の1つを更新することを意味します。
サポートされた APIs のリスト¶
Snowparkセッション¶
Session.createDataFrame
Session.create_dataframe
Session.flatten
Session.range
Session.table
入力/出力¶
DataFrameReader.csv
DataFrameReader.table
DataFrameWriter.saveAsTable
DataFrameWriter.save_as_table
DataFrame¶
DataFrame.agg
DataFrame.cache_result
DataFrame.col
DataFrame.collect
DataFrame.collect_nowait
DataFrame.copy_into_table
DataFrame.count
DataFrame.createOrReplaceTempView
DataFrame.createOrReplaceView
DataFrame.create_or_replace_temp_view
DataFrame.create_or_replace_view
DataFrame.crossJoin
DataFrame.cross_join
DataFrame.distinct
DataFrame.drop
DataFrame.dropDuplicates
DataFrame.drop_duplicates
DataFrame.dropna
DataFrame.except_
DataFrame.explain
DataFrame.fillna
DataFrame.filter
DataFrame.first
DataFrame.groupBy
DataFrame.group_by
DataFrame.intersect
DataFrame.join
DataFrame.limit
DataFrame.minus
DataFrame.natural_join
DataFrame.orderBy
DataFrame.order_by
DataFrame.rename
DataFrame.replace
DataFrame.rollup
DataFrame.sample
DataFrame.select
DataFrame.show
DataFrame.sort
DataFrame.subtract
DataFrame.take
DataFrame.toDF
DataFrame.toLocalIterator
DataFrame.toPandas
DataFrame.to_df
DataFrame.to_local_iterator
DataFrame.to_pandas
DataFrame.to_pandas_batches
DataFrame.union
DataFrame.unionAll
DataFrame.unionAllByName
DataFrame.unionByName
DataFrame.union_all
DataFrame.union_all_by_name
DataFrame.union_by_name
DataFrame.unpivot
DataFrame.where
DataFrame.withColumn
DataFrame.withColumnRenamed
DataFrame.with_column
DataFrame.with_column_renamed
DataFrame.with_columns
DataFrameNaFunctions.drop
DataFrameNaFunctions.fill
DataFrameNaFunctions.replace
列¶
Column.alias
Column.as_
Column.asc
Column.asc_nulls_first
Column.asc_nulls_last
Column.astype
Column.between
Column.bitand
Column.bitor
Column.bitwiseAnd
Column.bitwiseOR
Column.bitwiseXOR
Column.bitxor
Column.cast
Column.collate
Column.desc
Column.desc_nulls_first
Column.desc_nulls_last
Column.endswith
Column.eqNullSafe
Column.equal_nan
Column.equal_null
Column.getItem
Column.getName
Column.get_name
Column.in_
Column.isNotNull
Column.isNull
Column.is_not_null
Column.is_null
Column.isin
Column.like
Column.name
Column.over
Column.regexp
Column.rlike
Column.startswith
Column.substr
Column.substring
Column.try_cast
Column.within_group
CaseExpr.when
CaseExpr.otherwise
データ型¶
ArrayType
BinaryType
BooleanType
ByteType
ColumnIdentifier
DataType
DateType
DecimalType
DoubleType
FloatType
IntegerType
LongType
MapType
NullType
ShortType
StringType
StructField
StructType
Timestamp
TimestampType
TimeType
Variant
VariantType
行¶
Row.asDict
Row.as_dict
Row.count
Row.index
関数¶
abs
avg
coalesce
contains
count
count_distinct
covar_pop
endswith
first_value
iff
lag
last_value
lead
list_agg
max
median
min
parse_json
row_number
startswith
substring
sum
to_array
to_binary
to_boolean
to_char
to_date
to_decimal
to_double
to_object
to_time
to_timestamp
to_variant
Window¶
Window.orderBy
Window.order_by
Window.partitionBy
Window.partition_by
Window.rangeBetween
Window.range_between
Window.rowsBetween
Window.rows_between
WindowSpec.orderBy
WindowSpec.order_by
WindowSpec.partitionBy
WindowSpec.partition_by
WindowSpec.rangeBetween
WindowSpec.range_between
WindowSpec.rowsBetween
WindowSpec.rows_between
グループ化¶
RelationalGroupedDataFrame.agg
RelationalGroupedDataFrame.apply_in_pandas
RelationalGroupedDataFrame.applyInPandas
RelationalGroupedDataFrame.avg
RelationalGroupedDataFrame.builtin
RelationalGroupedDataFrame.count
RelationalGroupedDataFrame.function
RelationalGroupedDataFrame.max
RelationalGroupedDataFrame.mean
RelationalGroupedDataFrame.median
RelationalGroupedDataFrame.min
RelationalGroupedDataFrame.sum
テーブル¶
Table.delete
Table.drop_table
Table.merge
Table.sample
Table.update
WhenMatchedClause.delete
WhenMatchedClause.update
WhenNotMatchedClause.insert