Snowparkチェックポイントライブラリ:Validators¶
Snowparkチェックポイントパッケージは、Snowparkコードに適用できる検証セットを提供し、 PySpark コードとの動作の等価性を保証します。
フレームワークが提供する関数¶
:emph:`check_with_spark:`Snowpark DataFrame の引数を関数あるいはサンプルに変換し、 PySpark DataFrames に変換するデコレーター。このチェックでは、提供されたspark関数が実行され、新しいSnowpark関数の機能を反映し、2つの実装間の出力が比較されます。Spark関数とSnowpark関数が意味的に同一であると仮定すると、このデコレータは実際のサンプルデータ上でこれらの関数を検証します。
- パラメーター:
job_context
(SnowparkJobContext):検証の構成と詳細を含むジョブコンテキストspark_function
(fn):Snowparkの実装と比較するための同等の PySpark 関数checkpoint_name
(str):チェックポイントの名前。デフォルトは None:code:`sample_number`(Optional[int], オプション):検証用の行数。デフォルトは100
sampling_strategy`(Optional[SamplingStrategy]、オプション):データのサンプリングに使用される戦略。デフォルトは :code:`SamplingStrategy.RANDOM_SAMPLE
:code:`output_path`(Optional[str], オプション):検証結果が保存されているファイルへのパス。デフォルトは None
例:
def original_spark_code_I_dont_understand(df): from pyspark.sql.functions import col, when ret = df.withColumn( "life_stage", when(col("byte") < 4, "child") .when(col("byte").between(4, 10), "teenager") .otherwise("adult"), ) return ret @check_with_spark( job_context=job_context, spark_function=original_spark_code_I_dont_understand ) def new_snowpark_code_I_do_understand(df): from snowflake.snowpark.functions import col, lit, when ref = df.with_column( "life_stage", when(col("byte") < 4, lit("child")) .when(col("byte").between(4, 10), lit("teenager")) .otherwise(lit("adult")), ) return ref df1 = new_snowpark_code_I_do_understand(df)
:emph:`validate_dataframe_checkpoint:`この関数は、引数モードに従って、特定のチェックポイントスキーマファイルまたはインポートされたデータフレームに対してSnowpark データフレームを検証します。これにより、DataFrameに対して収集された情報と関数に渡された DataFrame が同等であることを確認します。
- パラメーター:
df
(SnowparkDataFrame):検証するDataFramecheckpoint_name
(str):検証するチェックポイントの名前job_context
(SnowparkJobContext, optional) (str):検証のジョブコンテキスト。PARQUET モードで必須。:code:`mode`(CheckpointMode):検証モード(例: SCHEMA, PARQUET)。デフォルトでは SCHEMA
:code:`custom_checks`(Optional[dict[Any, Any]]、オプション):検証中に適用するカスタムチェック
:code:`skip_checks`(Optional[dict[Any, Any]]、オプション):検証中にスキップするチェック
:code:`sample_frac`(Optional[float]、オプション):検証用にサンプルする DataFrame の端数。デフォルトでは 0.1
:code:`sample_number`(Optional[int] オプション):検証用にサンプルする行の数
:code:`sampling_strategy`(Optional[SamplingStrategy], オプション):サンプリングに使用する戦略
:code:`output_path`(Optional[str], オプション):検証結果が保存されているファイルへのパス。
例:
# Check a schema/stats here! validate_dataframe_checkpoint( df1, "demo_add_a_column_dataframe", job_context=job_context, mode=CheckpointMode.DATAFRAME, # CheckpointMode.Schema) )
選択されたモードに応じて、検証は収集されたスキーマファイルまたはSnowflakeのParquetロードされたDataframeのいずれかを使用して、 PySpark バージョンとの同等性を確認します。
:emph:`check-output_schema:`このデコレータはSnowpark関数の出力のスキーマを検証し、出力 DataFrame が指定されたPanderaスキーマに準拠していることを保証します。特にSnowparkパイプラインでデータの統合と一貫性を強制するのに便利です。このデコレータは、検証する対象の Pandera スキーマ、チェックポイント名、サンプリングパラメーター、そしてオプションのジョブコンテキストなどの複数のパラメーターを受け取ります。Snowpark関数をラップし、結果を返す前に出力 DataFrame に対してスキーマ検証を行います。
例:
from pandas import DataFrame as PandasDataFrame from pandera import DataFrameSchema, Column, Check from snowflake.snowpark import Session from snowflake.snowpark import DataFrame as SnowparkDataFrame from snowflake.snowpark_checkpoints.checkpoint import check_output_schema from numpy import int8 # Define the Pandera schema out_schema = DataFrameSchema( { "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)), "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)), "COLUMN3": Column(float, Check.less_than(10)), } ) # Define the Snowpark function and apply the decorator @check_output_schema(out_schema, "output_schema_checkpoint") def preprocessor(dataframe: SnowparkDataFrame): return dataframe.with_column( "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"] ) # Create a Snowpark session and DataFrame session = Session.builder.getOrCreate() df = PandasDataFrame( { "COLUMN1": [1, 4, 0, 10, 9], "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4], } ) sp_dataframe = session.create_dataframe(df) # Apply the preprocessor function preprocessed_dataframe = preprocessor(sp_dataframe)
:emph:`check_input_schema:`このデコレータはSnowpark関数の入力引数のスキーマを検証します。このデコレータは、関数が実行される前に入力 DataFrame が指定した Pandera スキーマに準拠していることを確認します。特にSnowparkパイプラインでデータの統合と一貫性を強制するのに便利です。このデコレータは、検証する対象の Pandera スキーマ、チェックポイント名、サンプリングパラメーター、そしてオプションのジョブコンテキストなどの複数のパラメーターを受け取ります。Snowpark関数をラップし、関数を実行する前に入力DataFrame に対してスキーマ検証を行います。
例:
from pandas import DataFrame as PandasDataFrame from pandera import DataFrameSchema, Column, Check from snowflake.snowpark import Session from snowflake.snowpark import DataFrame as SnowparkDataFrame from snowflake.snowpark_checkpoints.checkpoint import check_input_schema from numpy import int8 # Define the Pandera schema input_schema = DataFrameSchema( { "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)), "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)), } ) # Define the Snowpark function and apply the decorator @check_input_schema(input_schema, "input_schema_checkpoint") def process_dataframe(dataframe: SnowparkDataFrame): return dataframe.with_column( "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"] ) # Create a Snowpark session and DataFrame session = Session.builder.getOrCreate() df = PandasDataFrame( { "COLUMN1": [1, 4, 0, 10, 9], "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4], } ) sp_dataframe = session.create_dataframe(df) # Apply the process_dataframe function processed_dataframe = process_dataframe(sp_dataframe)
統計チェック¶
Schema
モードで検証を実行すると、デフォルトで特定の列タイプに対して統計情報の検証が適用されます。skip_checks
でこれらのチェックをスキップすることができます。
列タイプ |
デフォルトチェック |
---|---|
数値: |
:emph:`間:`最小値と最大値を含み、値が最小値と最大値の間にあるかどうかを検証します。 decimal_precision: 値が10進数の場合、10進数の精度をチェックします。 平均: 列の平均が特定の範囲内にあるかどうかを検証します。 |
ブール値 |
isin: 値が True か False かを検証します。 True_proportion: True 値の割合が特定の範囲内にあるかどうかを検証します。 False_proportion: False 値の割合が特定の範囲内にあるかどうかの検証。 |
日付: |
|
Nullable:サポートされるすべてのタイプ |
Null_proportion: 必要に応じてnull の割合を検証します。 |
スキップ・チェック¶
細かなチェックの制御により、列の検証や列の特定のチェックをスキップすることができます。skip_checks
というパラメーターで、スキップしたい列とバリデーションタイプを指定できます。スキップに使用されるチェックの名前は、そのチェックに関連付けられているものです。
str_contains
str_endswith
str_length
str_matches
str_startswith
in_range
equal_to
greater_than_or_equal_to
greater_than
less_than_or_equal_to
less_than
not_equal_to
notin
isin
例:
df = pd.DataFrame(
{
"COLUMN1": [1, 4, 0, 10, 9],
"COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)
schema = DataFrameSchema(
{
"COLUMN1": Column(int8, Check.between(0, 10, element_wise=True)),
"COLUMN2": Column(
float,
[
Check.greater_than(-20.5),
Check.less_than(-1.0),
Check(lambda x: x < -1.2),
],
),
}
)
session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)
check_dataframe_schema(
sp_df,
schema,
skip_checks={"COLUMN1": [SKIP_ALL], "COLUMN2": ["greater_than", "less_than"]},
)
カスタムチェック¶
JSON プロパティを使って、 custom_checks
ファイルから生成されたスキーマに追加のチェックを加えることができます。これでPanderaスキーマにチェックが追加されます。
例:
df = pd.DataFrame(
{
"COLUMN1": [1, 4, 0, 10, 9],
"COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)
session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)
# Those check will be added to the schema generate from the JSON file
result = validate_dataframe_checkpoint(
sp_df,
"checkpoint-name",
custom_checks={
"COLUMN1": [
Check(lambda x: x.shape[0] == 5),
Check(lambda x: x.shape[1] == 2),
],
"COLUMN2": [Check(lambda x: x.shape[0] == 5)],
},
)
サンプル戦略¶
プロバイダー・コードのサンプリング・プロセスは、データの代表サンプルを取ることによって、大容量 DataFrames を効率的に検証するように設計されています。このアプローチは、計算コストと時間のかかるデータセット全体を処理することなくスキーマ検証を行うのに役立ちます。
- パラメーター:
sample_frac
:このパラメーターは、サンプルする DataFrame の割合を指定します。例えば、sample_frac
を0.1にセットすると、 DataFrame の行の10%がサンプルされます。これは、計算リソースを節約するためにデータのサブセットを検証したい場合に便利です。sample_number
: このパラメーターは、 DataFrame からサンプルする正確な行数を指定します。例えば、sample_number
を100にセットすると、 DataFrame から100行がサンプルされます。これは、 DataFrame のサイズに関係なく、固定行数を検証したい場合に便利です。
検証結果¶
どのタイプであれ、検証が実行されると、その結果は合格であれ不合格であれ、 checkpoint_validation_results.json
に保存されます。このファイルは主に、 VSCode 拡張子の関数に使用されます。検証のステータス、タイムスタンプ、チェックポイント名、関数の実行が行われた行数、ファイルに関する情報が含まれます。
また、検証結果はデフォルトのSnowflakeアカウントにログとして記録され、* SNOWPARK_CHECKPOINTS_REPORT* というテーブルに検証結果に関する情報が格納されます。
DATE
: 検証の実行タイムスタンプJOB
: SnowparkJobContext の名前STATUS
: 検証のステータスCHECKPOINT
: 検証されたチェックポイントの名前MESSAGE
: エラーメッセージDATA
: 検証実行時のデータEXECUTION_MODE
: 実行された検証モード