Snowparkチェックポイントライブラリ

Snowpark Checkpoints は、 Apache PySpark から Snowpark Python に移行したコードを検証するテストライブラリです。

前提条件

Snowpark Checkpointsを使用するには、Python開発環境をセットアップします。サポートされているPythonのバージョンは次のとおりです。

  • 3.9

  • 3.10

  • 3.11

注釈

Python 3.9はSnowparkクライアントバージョン1.5.0に依存しています。Python 3.10はSnowparkクライアントバージョン1.5.1に依存しています。Python 3.11はSnowparkクライアントバージョン1.9.0に依存しています。

AnacondaMinicondavirtualenv などのツールを使用して、特定のPythonバージョン用のPython仮想環境を作成できます。

Snowparkチェックポイントの設置

Snowpark Checkpoints パッケージを Python 仮想環境にインストールするには、 conda または pip を使用します。

  • condaの使用:

    conda install snowpark-checkpoints
    
    Copy
  • pipの使用:

    pip install snowpark-checkpoints
    
    Copy

お望みであれば、パッケージを個別にインストールすることもできます:

  • snowpark-checkpoints-collectors - このパッケージを使用して、 PySpark DataFrames についての情報を収集します。

    • condaの使用:

    conda install snowpark-checkpoints-collectors
    
    Copy
    • pipの使用:

    pip install snowpark-checkpoints-collectors
    
    Copy
  • snowpark-checkpoints-hypothesis - このパッケージを使用すると、元の PySpark コードから収集された DataFrame スキーマに従って自動的に生成された合成データに基づいて、Snowpark コードの単体テストを作成することができます。

    • condaの使用:

      conda install snowpark-checkpoints-hypothesis
      
      Copy
    • pipの使用:

      pip install snowpark-checkpoints-hypothesis
      
      Copy
  • snowpark-checkpoints-validators - このパッケージを使用して、変換された Snowpark DataFrames を、収集されたスキーマまたはコレクタ機能によって生成されたエクスポート DataFrames に対して検証します。

    • condaの使用:

      conda install snowpark-checkpoints-validators
      
      Copy
    • pipの使用:

      pip install snowpark-checkpoints-validators
      
      Copy
  • snowpark-checkpoints-configuration - snowpark-checkpoints-collectorssnowpark-checkpoints-validators がチェックポイントの構成を自動的にロードするためにこのパッケージを使います。

    • condaの使用:

    conda install snowpark-checkpoints-configuration
    
    Copy
    • pipの使用:

    pip install snowpark-checkpoints-configuration
    
    Copy

フレームワークの使用

PySpark コードに関する情報を収集します。

snowpark-checkpoints-collectors パッケージは、 PySpark DataFrames から情報を抽出する関数を提供します。そのデータを使って、変換されたSnowpark DataFrames で、動作の同等性を確認します。

新しいチェックポイント収集ポイントを挿入するには、次の関数を使用します。

関数の署名:

def collect_dataframe_checkpoint(df: SparkDataFrame,
  checkpoint_name: str,
  sample: Optional[float],
  mode: Optional[CheckpointMode],
  output_path: Optional[str]) -> None:
Copy

関数のパラメーター:

  • df: PySpark DataFrame.

  • checkpoint_name:チェックポイントの名前。文字(A-Z、a-z)またはアンダースコア(_)で始まり、文字、アンダースコア、10進数(0-9)のみを含むコンテナー。

  • sample: (オプション) サンプルサイズ。デフォルト値は1.0(PySpark DataFrame)で、範囲は0~1.0。

  • mode: (オプション) 実行モード。オプションは SCHEMADATAFRAME。デフォルト値は SCHEMA です。

  • output_path: (オプション)チェックポイントを保存する出力パス。デフォルト値は現在の作業ディレクトリです。

収集プロセスは、各収集ポイントの結果に関する情報を含む、 checkpoint_collection_result.json と呼ばれる出力ファイルを生成します。JSON ファイルで、以下の情報が含まれています。

  • コレクションポイントが開始したタイムスタンプ。

  • コレクションポイントがあるファイルの相対パス。

  • コレクション・ポイントがあるファイルのコード行。

  • コレクションポイントチェックポイントの名前。

  • コレクションポイントの結果(不合格または合格)。

スキーマ推論収集データモード(スキーマ)

これはデフォルトのモードで、Panderaのスキーマ推論を活用して、指定された DataFrame に対して評価されるメタデータとチェックを取得します。このモードでは、 PySpark のタイプに基づいて、 DataFrame の列からカスタムデータも収集します。

列のデータとチェックは、列の PySpark タイプに基づいて収集されます(以下の表を参照)。どの列についても、そのタイプにかかわらず、収集されるカスタムデータには、列の名前、列のタイプ、nullable、行のカウント、nullでない行のカウント、null行のカウントが含まれます。

カスタムデータは、列の PySpark タイプに基づいて収集されます。

列タイプ

収集したカスタムデータ

数値 (byte, short, integer, long, floatdouble)

最小値。最大値。平均値。10進数の精度(大文字と小文字のタイプでは値は0)。標準偏差。

Date

最小値。最大値。日付の形式: %Y-%m-%d

DayTimeIntervalType および YearMonthIntervalType

最小値。最大値。

タイムスタンプ

最小値。最大値。日付の形式: %Y-%m-%dH:%M:%S

タイムスタンプ ntz

最小値。最大値。日付の形式: %Y-%m-%dT%H:%M:%S%z

String

長さの最小値。長さの最大値。

Char

PySpark リテラルは文字列型として扱われるため、 char は有効な型ではありません。

Varchar

PySpark はあらゆるリテラルを文字列型として扱うため、 Varchar は有効な型ではありません。

10進数

最小値。最大値。平均値。10進数の精度。

配列

値のタイプ。許可されている場合は、要素としてnull。null値の割合。配列の最大サイズ。配列の最小サイズ。配列の平均サイズ。すべての配列が同じサイズである場合。

バイナリ

最大サイズ。最小サイズ。平均サイズ。すべての要素が同じサイズである場合。

マップ

キーのタイプ。値のタイプ。許可されている場合は、値としてnullを指定します。null値の割合。マッピングの最大サイズ。マッピングの最小サイズ。マッピングの平均サイズ。すべてのマッピングが同じサイズである場合。

Null

NullType はNoneを表し、タイプデータが決定できないため、このタイプから情報を取得することはできません。

構造

構造体のメタデータで、各 structField に対応します: nametypenullablerows countrows not null countrows null count。それは1つの配列です。

また、以下の表に示すように、各データタイプに対して定義済みの検証チェックのセットを定義しています。

チェックは列のタイプに基づいて収集されます。

Panderaチェック

追加チェック

ブール値

各値は「True」または「False」です。

True と False の値のカウント。

数値 (byte, short, integer, long, floatdouble)

各値は最小値と最大値の範囲にあります。

10進数の精度。平均値。標準偏差。

Date

N/A

最小値と最大値。

タイムスタンプ

各値は最小値と最大値の範囲にあります。

値の形式。

タイムスタンプ ntz

各値は最小値と最大値の範囲にあります。

値の形式。

String

各値の長さは最小値から最大値の範囲です。

なし

Char

PySpark はどんなリテラルも文字列型として扱うので、 char は有効な型ではありません。

Varchar

PySpark はどんなリテラルも文字列型として扱うので、 Varchar は有効な型ではありません。

10進数

N/A

N/A

配列

N/A

なし

バイナリ

N/A

なし

マップ

N/A

なし

Null

N/A

N/A

構造

N/A

なし

このモードでは、ユーザーが収集する DataFrame のサンプルを定義できますが、これはオプションです。デフォルトでは、コレクションは DataFrame 全体で動作します。サンプルのサイズは、統計的に母集団を代表していなければなりません。

Pandera は Pandas DataFrame のスキーマしか推論できないため、 PySpark DataFrame を Pandas DataFrame に変換する必要があり、列のタイプ解決に影響を与える可能性があります。特に、Panderaは以下の PySpark タイプをオブジェクトタイプとして推論します: string arraymapnullstructbinary

このモードの出力は、収集された各 DataFrame の JSON ファイルで、ファイル名はチェックポイントと同じです。このファイルにはスキーマに関する情報が含まれており、2つのセクションがあります。

  1. Pandera スキーマセクションには、名前、タイプ (Pandas)、カラムが NULL 値を許すかどうか、各カラムのその他の情報、 PySpark タイプに基づく列のチェックなど、Pandera が推論するデータが含まれます。Panderaの DataFrameSchema オブジェクトです。

  2. カスタムデータセクションは、 PySpark のタイプに基づいて各列が収集したカスタムデータの配列です。

注釈

コレクション・パッケージは、大容量の PySpark DataFrames を処理する際にメモリ問題が発生する可能性があります。この問題に対処するために、 PySpark DataFrame 全体ではなくデータのサブセットを扱うために、コレクション関数のサンプルパラメーターを 0.0 から 1.0 の間の値にセットすることができます。

DataFrame 収集データモード (DataFrame)

このモードでは、 PySpark DataFrame のデータを収集します。この場合、機構は与えられた DataFrame のすべてのデータを Parquet 形式で保存します。デフォルトユーザーのSnowflake接続を使用して、ParquetファイルをSnowflake temporalステージにアップロードし、ステージ内の情報に基づいてテーブルを作成しようとします。ファイル名とテーブル名はチェックポイントと同じです。

このモードの出力は、保存された DataFrame の Parquet ファイルの結果と、デフォルトの Snowflake 構成接続の DataFrame データを含むテーブルです。

Snowpark変換コードの検証

Snowparkチェックポイントパッケージは、Snowparkコードに適用できる検証セットを提供し、 PySpark コードとの動作の等価性を保証します。

フレームワークが提供する関数

  • check_with_spark:Snowpark DataFrame の引数を関数あるいはサンプルに変換し、 PySpark DataFrames に変換するデコレーター。このチェックでは、新しいSnowpark関数の機能を反映したspark関数がプロバイダーから提供されて実行され、2つの実装間の出力が比較されます。Spark関数とSnowpark関数が意味的に同一であると仮定すると、実際のサンプルデータ上でこれらの関数を検証することができます。

    パラメーター:
    • job_context (SnowparkJobContext):検証の構成と詳細を含むジョブコンテキスト。

    • spark_function (fn):Snowparkの実装と比較するための同等の PySpark 関数。

    • checkpoint_name (str):チェックポイントの名前。デフォルトは None。

    • sample_number (Optional[int], オプション):検証の行数。デフォルトは100。

    • sampling_strategy (Optional[SamplingStrategy], オプション):データのサンプリングに使用するストラテジー。デフォルトは SamplingStrategy.RANDOM_SAMPLE です。

    • output_path (Optional[str], オプション):検証結果を保存するパス。デフォルトは None。

    以下はその例です:

     def original_spark_code_I_dont_understand(df):
     from pyspark.sql.functions import col, when
    
     ret = df.withColumn(
         "life_stage",
         when(col("byte") < 4, "child")
         .when(col("byte").between(4, 10), "teenager")
         .otherwise("adult"),
     )
     return ret
    
    
    @check_with_spark(
     job_context=job_context, spark_function=original_spark_code_I_dont_understand
    )
    def new_snowpark_code_I_do_understand(df):
      from snowflake.snowpark.functions import col, lit, when
    
      ref = df.with_column(
          "life_stage",
          when(col("byte") < 4, lit("child"))
          .when(col("byte").between(4, 10), lit("teenager"))
          .otherwise(lit("adult")),
     )
     return ref
    
    
     df1 = new_snowpark_code_I_do_understand(df)
    
    Copy
  • validate_dataframe_checkpoint:この関数はSnowparkデータフレームを引数モードに従って、特定のチェックポイントスキーマファイルまたはインポートされたデータフレームに対して検証します。これは、その DataFrame、関数に渡される DataFrame、収集された情報が同等であることを保証します。

    パラメーター:
    • df (SnowparkDataFrame): DataFrame を検証します。

    • checkpoint_name (str):検証するチェックポイントの名前。

    • job_context (SnowparkJobContext, オプション) (str):検証のジョブコンテキスト。PARQUET モードに必要です。

    • mode (CheckpointMode):検証モード(例: SCHEMA、 PARQUET)デフォルトは SCHEMA です。

    • custom_checks (オプション[dict[Any, Any]]、オプション):検証中に適用するカスタムチェック。

    • skip_checks (オプション[dict[Any, Any]]、オプション):検証中にスキップするチェック。

    • sample_frac (Optional[float], オプション):検証用にサンプルする DataFrame の端数。デフォルトは0.1。

    • sample_number (Optional[int] オプション):検証用にサンプルする行の数。

    • sampling_strategy (Optional[SamplingStrategy], オプション):サンプリングに使用する戦略。

    • output_path (Optional[str], オプション):検証結果の出力パス。

    以下はその例です:

    # Check a schema/stats here!
    validate_dataframe_checkpoint(
       df1,
    "demo_add_a_column_dataframe",
    job_context=job_context,
    mode=CheckpointMode.DATAFRAME, # CheckpointMode.Schema)
    )
    
    Copy

    選択されたモードに応じて、検証は収集されたスキーマファイルまたはSnowflakeのParquetロードされたDataframeのいずれかを使用して、 PySpark バージョンとの依存関係を検証します。

  • check-ouput_schema:このデコレータはSnowpark関数の出力のスキーマを検証し、出力 DataFrame が指定されたPanderaスキーマに準拠していることを保証します。特にSnowparkパイプラインでデータの統合と一貫性を強制するのに便利です。このデコレータは、検証対象の Pandera スキーマ、チェックポイント名、 サンプリングパラメーター、そしてオプションのジョブコンテキストなどの パラメーターを受け取ります。Snowpark関数をラップし、結果を返す前に出力 DataFrame に対してスキーマ検証を行います。

    以下はその例です:

    from pandas import DataFrame as PandasDataFrame
    from pandera import DataFrameSchema, Column, Check
    from snowflake.snowpark import Session
    from snowflake.snowpark import DataFrame as SnowparkDataFrame
    from snowflake.snowpark_checkpoints.checkpoint import check_output_schema
    from numpy import int8
    
    # Define the Pandera schema
    out_schema = DataFrameSchema(
    {
         "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)),
         "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)),
         "COLUMN3": Column(float, Check.less_than(10)),
    }
    )
    
    # Define the Snowpark function and apply the decorator
    @check_output_schema(out_schema, "output_schema_checkpoint")
    def preprocessor(dataframe: SnowparkDataFrame):
      return dataframe.with_column(
         "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"]
    )
    
    # Create a Snowpark session and DataFrame
    session = Session.builder.getOrCreate()
    df = PandasDataFrame(
    {
         "COLUMN1": [1, 4, 0, 10, 9],
         "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
    }
    )
    
    sp_dataframe = session.create_dataframe(df)
    
    # Apply the preprocessor function
    preprocessed_dataframe = preprocessor(sp_dataframe)
    
    Copy
  • check_input_schema:このデコレータはSnowpark関数の入力引数のスキーマを検証します。このデコレータは、関数が実行される前に入力 DataFrame が指定した Pandera スキーマに準拠していることを確認します。特にSnowparkパイプラインでデータの統合と一貫性を強制するのに便利です。このデコレータは、検証対象の Pandera スキーマ、チェックポイント名、 サンプリングパラメーター、そしてオプションのジョブコンテキストなどの パラメーターを受け取ります。Snowpark関数をラップし、関数を実行する前に入力 DataFrame に対してスキーマ検証を行います。

    以下はその例です:

    from pandas import DataFrame as PandasDataFrame
    from pandera import DataFrameSchema, Column, Check
    from snowflake.snowpark import Session
    from snowflake.snowpark import DataFrame as SnowparkDataFrame
    from snowflake.snowpark_checkpoints.checkpoint import check_input_schema
    from numpy import int8
    
    # Define the Pandera schema
    input_schema = DataFrameSchema(
    {
         "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)),
         "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)),
    }
    )
    
    # Define the Snowpark function and apply the decorator
    @check_input_schema(input_schema, "input_schema_checkpoint")
    def process_dataframe(dataframe: SnowparkDataFrame):
     return dataframe.with_column(
         "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"]
    )
    
    # Create a Snowpark session and DataFrame
    session = Session.builder.getOrCreate()
    df = PandasDataFrame(
    {
         "COLUMN1": [1, 4, 0, 10, 9],
         "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
    }
    )
    sp_dataframe = session.create_dataframe(df)
    
    # Apply the process_dataframe function
    processed_dataframe = process_dataframe(sp_dataframe)
    
    Copy

統計チェック

Schema モードで検証を実行すると、デフォルトで特定の列タイプに対して統計情報の検証が適用されます。 skip_checks でこれらのチェックをスキップすることができます。

列タイプ

デフォルトチェック

数値: byte, short, integer, long, float, および double

:値が最小値と最大値の間にある場合、最小値と最大値を含みます。

decimal_precision:値が10進数の場合、10進数の精度をチェックします。

平均:列の平均が特定の範囲内にあるかどうかを検証します。

ブール値

isin:値が True か False かを検証します。

True_proportion:True 値の割合が特定の範囲内にあるかどうかを検証します。

False_proportion:False 値の割合が特定の範囲内にあるかどうかの検証。

日付: date, timestamp, そして timestamp_ntz

:値が最小値と最大値の間にある場合、最小値と最大値を含みます。

Nullable:サポートされるすべてのタイプ

Null_proportion:nullプロポーションを検証します。

スキップ・チェック

チェックのための細粒度の制御では、列の検証や特定の列のチェックをスキップすることができます。 skip_checks というパラメーターで、スキップしたい列とバリデーションタイプを指定できます。スキップに使用されるチェックの名前は、そのチェックに関連付けられているものです。

  • str_contains

  • str_endswith

  • str_length

  • str_matches

  • str_startswith

  • in_range

  • ​​equal_to

  • greater_than_or_equal_to

  • greater_than

  • less_than_or_equal_to

  • less_than

  • not_equal_to

  • notin

  • isin

df = pd.DataFrame(
{
       "COLUMN1": [1, 4, 0, 10, 9],
       "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)

schema = DataFrameSchema(
{
       "COLUMN1": Column(int8, Check.between(0, 10, element_wise=True)),
       "COLUMN2": Column(
           float,
           [
               Check.greater_than(-20.5),
               Check.less_than(-1.0),
               Check(lambda x: x < -1.2),
           ],
       ),
}
)

session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)
check_dataframe_schema(
   sp_df,
   schema,
   skip_checks={"COLUMN1": [SKIP_ALL], "COLUMN2": ["greater_than", "less_than"]},
)
Copy

カスタムチェック

custom_checks プロパティを使って、 JSON ファイルから生成されたスキーマに追加のチェックを加えることができます。これでチェックがPandera・スキーマに追加されます。

df = pd.DataFrame(
 {
       "COLUMN1": [1, 4, 0, 10, 9],
       "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
 }
)

session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)

# Those check will be added to the schema generate from the JSON file
result = validate_dataframe_checkpoint(
  sp_df,
  "checkpoint-name",
  custom_checks={
       "COLUMN1": [
           Check(lambda x: x.shape[0] == 5),
           Check(lambda x: x.shape[1] == 2),
  ],
  "COLUMN2": [Check(lambda x: x.shape[0] == 5)],
 },
)
Copy

サンプル戦略

プロバイダー・コードのサンプリング・プロセスは、データの代表サンプルを取ることによって、大容量 DataFrames を効率的に検証するように設計されています。このアプローチは、計算コストと時間のかかるデータセット全体を処理することなくスキーマ検証を行うのに役立ちます。

パラメーター
  • sample_frac:このパラメーターは、サンプルする DataFrame の割合を指定します。例えば、 sample_frac を0.1にセットすると、 DataFrame の行の10%がサンプルされます。これは、計算リソースを節約するためにデータのサブセットを検証したい場合に便利です。

  • sample_number: このパラメーターは、 DataFrame からサンプルする正確な行数を指定します。例えば、 sample_number を100にセットすると、 DataFrame から100行がサンプルされます。これは、 DataFrame のサイズに関係なく、固定行数を検証したい場合に便利です。

検証結果

どのタイプであれ、検証が実行されると、その結果は合格であれ不合格であれ、 checkpoint_validation_results.json に保存されます。このファイルは主に、 VSCode 拡張子の関数に使用されます。検証のステータス、タイムスタンプ、チェックポイント名、関数の実行が行われた行数、ファイルに関する情報が含まれます。

また、検証結果はデフォルトのSnowflakeアカウントにログとして記録され、 SNOWPARK_CHECKPOINTS_REPORT というテーブルに検証結果に関する情報が格納されます。

  • DATE: 検証の実行タイムスタンプ。

  • JOB: SnowparkJobContext の名前。

  • STATUS: 検証のステータス。

  • CHECKPOINT: 検証されたチェックポイントの名前。

  • MESSAGE: エラーメッセージ

  • DATA: 検証実行時のデータ。

  • EXECUTION_MODE: 検証モード実行。

チェックポイント環境変数

checkpoints.json ファイルを見つけるためのフレームワークのデフォルトの動作は、 SNOWFLAKE_CHECKPOINT_CONTRACT_FILE_PATH_ENV_VAR という環境変数を探すことです。この変数には checkpoint.json の相対パスが入ります。これは、 VSCode 拡張機能によって、コード内のレンズでチェックポイントを実行したときに割り当てられます。環境変数が割り当てられていない場合、フレームワークは現在の作業ディレクトリでファイルを探そうとします。

仮説単体テスト

仮説 は Python 用の強力なテストライブラリで、さまざまな入力データを自動的に生成することで、従来のユニットテストを強化するように設計されています。Hypothesisはプロパティベースのテストを使用し、個々のテストケースを指定する代わりに、コードの期待される動作をプロパティまたは条件で記述し、Hypothesisがそれらのプロパティを徹底的にテストする例を生成します。このアプローチは、エッジケースや予期せぬ動作を発見するのに役立ち、複雑な関数に特に効果的です。詳しくは 仮説 を参照してください。

snowpark-checkpoints-hypothesis パッケージは Hypothesis ライブラリを拡張し、テスト用に合成 Snowpark DataFrames を生成します。多様でランダムなテストデータを生成するHypothesisの機能を活用することで、さまざまなスキーマと値を持つSnowpark DataFrames を作成し、実世界のシナリオをシミュレートしてエッジケースを発見し、ロバストなコードを保証して複雑な変換の正しさを検証することができます。

Snowparkの仮説戦略は、合成データの生成にPanderaを利用しています。 dataframe_strategy 関数は、指定されたスキーマを使用して、それに準拠した Pandas DataFrame を生成し、それを Snowpark DataFrame に変換します。

関数の署名:

def dataframe_strategy(
  schema: Union[str, DataFrameSchema],
  session: Session,
  size: Optional[int] = None
) -> SearchStrategy[DataFrame]
Copy

関数のパラメーター

  • schema: 生成されるSnowparkデータフレームが一致すべき列、データタイプ、チェックを定義するスキーマ。スキーマは次のようになります:

  • session: snowflake.snowpark.Session のインスタンスで、Snowpark DataFrames の作成に使用されます。

  • size: 各Snowpark の生成行数 DataFrame。このパラメーターが提供されない場合、ストラテジーは異なるサイズの DataFrames を生成します。

関数出力

Snowpark DataFrames を生成する Hypothesis SearchStrategy を返します。

サポートされているデータ型とサポートされていないデータ型

dataframe_strategy 関数は、Snowpark DataFrames のさまざまなデータ タイプの生成をサポートします。関数に渡されるスキーマ引数のタイプによって、ストラテジーがサポートするデータ型は異なります。ストラテジーがサポートされていないデータ型を見つけた場合は例外が発生することに注意してください。

次の表は、 JSON ファイルを schema 引数として渡した場合に、 dataframe_strategy 関数でサポートされる PySpark データファイルとサポートされない データファイルのタイプを示しています。

PySpark データ型

サポート対象

配列

有り

ブール値

有り

Char

無し

日付

有り

DayTimeIntervalType

無し

10進数

無し

マップ

無し

Null

無し

Byte, Short, Integer, Long, Float, Double

有り

文字列

有り

Struct

無し

タイムスタンプ

有り

TimestampNTZ

有り

Varchar

無し

YearMonthIntervalType

無し

schema の引数として DataFrameSchema オブジェクトを渡した場合に dataframe_strategy 関数でサポートされる Pandera データ型と、それらがマッピングされる Snowpark データ型を次の表に示します。

Panderaデータタイプ

Snowparkデータタイプ

int8

ByteType

int16

ShortType

int32

IntegerType

int64

LongType

float32

FloatType

float64

DoubleType

文字列

StringType

bool

BooleanType

datetime64[ns, tz]。

TimestampType(TZ)

datetime64[ns]

TimestampType(NTZ)

date

DateType

Hypothesisライブラリを使用してSnowpark DataFrames を生成する典型的なワークフローは以下のとおりです。

  1. Python の標準的なテスト関数を作成し、あなたのコードがすべての入力に対して満たすべきさまざまなアサーションや条件を指定します。

  2. Hypothesis @given デコレーターをテスト関数に追加し、 dataframe_strategy 関数を引数として渡します。 @given デコレーターの情報については、 仮説.given を参照してください。

  3. テスト関数を実行します。テストが実行されると、Hypothesisは生成された入力をテストの引数として自動的に提供します。

例1: JSON ファイルから Snowpark DataFrames を生成します。

以下は、 snowpark-checkpoints-collectors パッケージの collect_dataframe_checkpoint 関数によって生成された JSON スキーマファイルから Snowpark DataFrames を生成する方法の例です。

from hypothesis import given

from snowflake.hypothesis_snowpark import dataframe_strategy
from snowflake.snowpark import DataFrame, Session


@given(
    df=dataframe_strategy(
        schema="path/to/file.json",
        session=Session.builder.getOrCreate(),
        size=10,
    )
)
def test_my_function_from_json_file(df: DataFrame):
    # Test a particular function using the generated Snowpark DataFrame
    ...
Copy

例 2: Pandera DataFrameSchema オブジェクトから Snowpark DataFrame を生成します。

以下は、Pandera DataFrameSchema のインスタンスから Snowpark DataFrames を生成する方法の例です。詳しくは Pandera DataFrameSchema を参照してください。

import pandera as pa

from hypothesis import given

from snowflake.hypothesis_snowpark import dataframe_strategy
from snowflake.snowpark import DataFrame, Session


@given(
    df=dataframe_strategy(
        schema=pa.DataFrameSchema(
            {
                "boolean_column": pa.Column(bool),
                "integer_column": pa.Column("int64", pa.Check.in_range(0, 9)),
                "float_column": pa.Column(pa.Float32, pa.Check.in_range(10.5, 20.5)),
            }
        ),
        session=Session.builder.getOrCreate(),
        size=10,
    )
)
def test_my_function_from_dataframeschema_object(df: DataFrame):
    # Test a particular function using the generated Snowpark DataFrame
    ...
Copy

例3: 仮説動作のカスタマイズ

仮説 @settings デコレーターを使って、テストの動作をカスタマイズすることもできます。このデコレーターを使うと、さまざまな構成パラメーターをカスタマイズして、テストの動作をニーズに合わせて調整することができます。 @settings デコレーターを使うことで、テストケースの最大数や各テストの実行期限、冗長性のレベルなどを制御できます。詳しくは 仮説の設定 を参照してください。

from datetime import timedelta

from hypothesis import given, settings
from snowflake.snowpark import DataFrame, Session

from snowflake.hypothesis_snowpark import dataframe_strategy


@given(
    df=dataframe_strategy(
        schema="path/to/file.json",
        session=Session.builder.getOrCreate(),
    )
)
@settings(
    deadline=timedelta(milliseconds=800),
    max_examples=25,
)
def test_my_function(df: DataFrame):
    # Test a particular function using the generated Snowpark DataFrame
    ...
Copy

IDE Snowparkチェックポイントの設定

Snowflake Extension for Visual Studio Code は、Snowpark Checkpoints ライブラリをサポートし、フレームワークの使用感を向上させます。コードに挿入された collectvalidate ステートメントをきめ細かく制御し、変換されたコードの動作同等性アサーションのステータスをレビューします。

Snowpark チェックポイントの有効化

Snowparkチェックポイントを有効にするには、Snowflakeの拡張機能設定で Snowpark Checkpoints: Enabled をチェックします。

有効なチェックポイント

ビュー

先に説明したように、Snowpark Checkpointsプロパティを Enabled にセットすると、拡張機能で SNOWPARK CHECKPOINTS という新しいタブが開きます。ワークスペース内のすべてのチェックポイントが表示され、すべてのチェックポイントを有効/無効にしたり、個別にチェックポイントを有効/無効にしたり、ファイルからすべてのチェックポイントを消去したり、各チェックポイントをダブルクリックすると、そのチェックポイントが定義されているファイルやコード行に移動するなど、複数のアクションを実行できます。

すべてのチェックポイントの切り替え

Snowpark Checkpoints タブの右上にあるこのオプションは、すべてのチェックポイントで有効プロパティを切り替えます。

チェックポイントの切り替え

有効なチェックポイント:

チェックポイントの切り替え

チェックポイントを無効にすると、ランタイム時にチェックポイントがスキップされます。

チェックポイントの無効化

すべてのチェックポイントのクリーンアップ

Snowpark Checkpoints タブの右上にあります。これは、ワークスペース内の Jupyter Notebooks を含むすべての Python ファイルからチェックポイントを削除しますが、コントラクトとパネルからは削除しません。つまり、コマンド Snowflake: Restore All Checkpoints を使って復元できるのです。

チェックポイントの削除

ファイルにチェックポイントを挿入

ファイル内を右クリックすると、 Snowpark Checkpoints オプションを含むコンテキストメニューが表示され、 CollectionValidation チェックポイントを追加することができます。

コンテキストメニューのSnowparkチェックポイントオプション:

チェックポイントの追加

コレクター/バリデーターを追加しました:

コレクターとバリデーターのチェックポイント

単一のチェックポイントの実行

各チェックポイントの上に表示されているコードレンズオプションをクリックすると、1つのチェックポイントを実行できます。実行すると、進行状況を示す出力コンソールが表示され、終了すると結果表示が表示されます。コントラクトファイルでチェックポイントが無効になっていても、実行の時だけ有効になります。

単一のチェックポイントの実行

コントラクトファイルでエントリーポイントが宣言されていない場合、エラーメッセージ チェックポイントでエントリーポイントが見つかりません が表示されます。

エントリーポイントが見つかりません

有効化されたSnowparkチェックポイントをすべてファイルで実行

各ファイルの右上には、 Run all checkpoints from the current file ボタンがあります。

すべてのチェックポイントの実行

クリックすると、実行の進捗状況を表示する出力チャンネルが表示されます。

チェックポイントの進捗状況

タイムライン表示

チェックポイント実行結果のタイムラインを表示します。

タイムライン表示

コマンド

Snowparkチェックポイントでは以下のコマンドが利用可能です。使用するには、コマンドパレットに Snowflake: [command name] と入力してください。

Snowpark チェックポイントコマンド

コマンド

説明

Snowflakeチェックポイントの切り替え

すべてのチェックポイントの有効プロパティを切り替えます。

Snowflake: Snowpark チェックポイント プロジェクトの初期化

プロジェクトの初期化をトリガーし、コントラクトファイルが存在しない場合は作成します。それが存在する場合、チェックポイントをコントラクトファイルにロードするかどうかを尋ねるポップアップが表示されます。

Snowflake: すべてのチェックポイントをクリア

ワークスペース内のすべてのファイルからすべてのチェックポイントを削除します。

Snowflake: すべてのチェックポイントを復元

コントラクトファイルに残っているファイルから、以前に削除されたチェックポイントを復元します。

Snowflake:検証/収集チェックポイントの追加

バリデーターあるいはコレクターを、必須パラメーターとともにカーソル位置に追加します。

Snowflake Snowparkのチェックポイントを中心に表示

パネル SNOWPARK CHECKPOINTS にフォーカスを移します。

Snowflake:オープンチェックポイント タイムライン

チェックポイント実行のタイムラインを表示します。

Snowflake:現在のファイルからすべてのチェックポイントを実行

現在のファイルで有効になっているすべてのチェックポイントを実行します。

Snowflake:ワークスペース内のすべてのチェックポイントの実行

有効になっているすべてのチェックポイントをワークスペースから実行します。

Snowflake:すべてのSnowparkチェックポイントの結果を表示

すべてのチェックポイントの結果をタブに表示します。

警告

  • 重複:コレクション プロジェクトで、2 つのチェックポイントに同じ名前が割り当てられている場合、警告: 「同じ名前を持つ別のチェックポイントが検出されたため、上書きされます。」 が表示されます。 検証プロジェクトでは、同じ名前を持つ複数のチェックポイントがあっても、警告は表示されません。

  • タイプが違う:プロジェクトのタイプと異なるタイプのチェックポイントを追加すると、以下のエラーメッセージとともに下線が表示されます: 「正しいSnowpark-Checkpointsステートメントを使用しているか確認してください。この特定のチェックポイント・ステートメントは、このプロジェクトで使用されている他のステートメントとは異なります。「プロジェクトのタイプと一致しないステートメントは、実行時に無視されます。」

  • 無効なチェックポイント名: チェックポイント名のパラメーターを追加する方法が無効です。この場合、警告メッセージが表示されます: 「無効なチェックポイント名です。チェックポイント名は文字で始まり、文字、数字、ハイフン、アンダースコアのみを含むことができます」