Snowparkチェックポイントライブラリ:Collectors

snowpark-checkpoints-collectors パッケージは、 PySpark DataFrames から情報を抽出する関数を提供します。そのデータを使って、変換されたSnowpark DataFrames に対して検証を行い、動作の同等性を確認します。

  • 新しいチェックポイントのコレクションポイントを挿入するには、次の関数を使用します。

    関数の署名:

    def collect_dataframe_checkpoint(df: SparkDataFrame,
      checkpoint_name: str,
      sample: Optional[float],
      mode: Optional[CheckpointMode],
      output_path: Optional[str]) -> None:
    
    Copy

    関数のパラメーター:

    • df: PySpark DataFrame

    • checkpoint_name: チェックポイントの名前。

      文字(A-Z、a-z)またはアンダースコア(_)で始まり、文字、アンダースコア、数字(0-9)のみを含みます。

    • sample: (オプション) サンプルサイズ

      デフォルト値は1.0(PySpark DataFrame)で、範囲は0~1.0。

    • mode: (オプション) 実行モード

      オプションは、 SCHEMA (デフォルト)および:code:`DATAFRAME`です。

    • **output_path:**(オプション)チェックポイントが保存されるファイルへのパス

      デフォルト値は現在の作業ディレクトリです。

コレクションプロセスでは、:file:`checkpoint_collection_result.json`と呼ばれるJSON出力ファイルを生成します。これは、各コレクションポイントの結果について次の情報を含んでいます。

  • コレクションポイントが開始した時間のタイムスタンプ

  • コレクションポイントがあるファイルの相対パス

  • コレクションポイントがあるファイルのコード行

  • コレクションポイントのチェックポイントの名前

  • コレクションポイントの結果(不合格または合格)

スキーマ推論収集データモード(スキーマ)

これはデフォルトのモードで、Panderaのスキーマ推論を活用して、指定された DataFrame に対して評価されるメタデータとチェックを取得します。このモードでは、 PySpark のタイプに基づいて、 DataFrame の列からカスタムデータも収集します。

列のデータとチェックは、列の PySpark タイプに基づいて収集されます(次の表を参照)。どの列についても、そのタイプにかかわらず、収集されるカスタムデータには、列の名前、列のタイプ、nullable、行のカウント、nullでない行のカウント、null行のカウントが含まれます。

カスタムデータは、列の PySpark タイプに基づいて収集されます。

列タイプ

収集したカスタムデータ

数値 (byte, short, integer, long, floatdouble)

最小値、最大値、平均値、10進数の精度(整数型の場合は値が0)、標準偏差

Date

最小値、最大値、日付形式(%Y-%m-%d

DayTimeIntervalType および YearMonthIntervalType

最小値、最大値

タイムスタンプ

最小値、最大値、日付形式(%Y-%m-%dH:%M:%S

タイムスタンプ ntz

最小値、最大値、日付形式(%Y-%m-%dT%H:%M:%S%z

String

長さの最小値、長さの最大値

Char

PySpark はあらゆるリテラルを文字列型として扱うため、char は有効な型ではありません。

Varchar

PySpark はあらゆるリテラルを文字列型として扱うため、Varchar は有効な型ではありません。

10進数

最小値、最大値、平均値、10進数の精度

配列

値のタイプ、許可されている場合は要素としてのnul、null値の割合、配列の最大サイズ、配列の最小サイズ、配列の平均サイズ、すべての配列が同じサイズであるかどうか

バイナリ

最大サイズ、最小サイズ、平均サイズ、すべての要素が同じサイズを持つかどうか

マップ

キーのタイプ、値のタイプ、許可されている場合は値としてのnul、null値の割合、マップの最大サイズ、マップの最小サイズ、マップの平均サイズ、すべてのマップが同じサイズであるかどうか

Null

NullType はNoneを表し、タイプデータが決定できないため、このタイプから情報を取得することはできません。

構造

各 structField に対する構造体のメタデータ: nametypenullablerows countrows not null countrows null count。それは1つの配列です。

また、以下の表に示すように、各データタイプに対して事前定義済みの検証チェックのセットを定義しています。

チェックは列のタイプに基づいて収集されます。

Panderaチェック

追加チェック

ブール値

各値は「True」または「False」です。

True と False の値のカウント

数値 (byte, short, integer, long, floatdouble)

各値は最小値と最大値の範囲にあります。

10進数の精度、平均値、標準偏差

Date

N/A

最小値と最大値。

タイムスタンプ

各値は最小値と最大値の範囲にあります。

値の形式

タイムスタンプ ntz

各値は最小値と最大値の範囲にあります。

値の形式

String

各値の長さは最小値から最大値の範囲です。

なし

Char

PySpark はあらゆるリテラルを文字列型として扱うため、char は有効な型ではありません。

Varchar

PySpark はあらゆるリテラルを文字列型として扱うため、Varchar は有効な型ではありません。

10進数

N/A

N/A

配列

N/A

なし

バイナリ

N/A

なし

マップ

N/A

なし

Null

N/A

N/A

構造

N/A

なし

このモードでは、ユーザーが収集する DataFrame のサンプルを定義できますが、これはオプションです。デフォルトでは、コレクションは DataFrame 全体で動作します。サンプルのサイズは、統計的に母集団を代表していなければなりません。

Pandera は Pandas DataFrame のスキーマしか推論できないため、 PySpark DataFrame を Pandas DataFrame に変換する必要があり、列のタイプ解決に影響を与える可能性があります。特に、Panderaは以下のPySparkタイプをオブジェクトタイプとして推論します。stringarraymapnullstructbinary

このモードの出力は、収集された各DataFrame の JSONファイルで、ファイル名はチェックポイントと同じです。このファイルにはスキーマに関する情報が含まれており、2つのセクションがあります。

  • Pandera スキーマ セクションには、名前、タイプ (Pandas)、列が null 値を許可するかどうか、各列のその他の情報、 列がPySpark タイプに基づいているかどうかのチェックなど、Pandera が推論するデータが含まれます。Panderaの DataFrameSchema オブジェクトです。

  • カスタムデータ セクションは、 PySpark のタイプに基づいて各列が収集したカスタムデータの配列です。

注釈

コレクション・パッケージは、大容量の PySpark DataFrames を処理する際にメモリ問題が発生する可能性があります。PySparkDataFrame 全体ではなくデータのサブセットを扱うために、コレクション関数のサンプルパラメーターを 0.0 から 1.0 の間の値にセットすることができます。

DataFrame 収集データモード (DataFrame)

このモードでは、 PySpark DataFrame のデータを収集します。この場合、機構は与えられた DataFrame のすべてのデータを Parquet 形式で保存します。デフォルトユーザーのSnowflake接続を使用して、ParquetファイルをSnowflake temporalステージにアップロードし、ステージ内の情報に基づいてテーブルを作成しようとします。ファイル名とテーブル名はチェックポイントと同じです。

このモードの出力は、保存された DataFrame の Parquet ファイルの結果と、デフォルトの Snowflake 構成接続の DataFrame データを含むテーブルです。