Snowparkチェックポイントライブラリ: Collectors

SnowparkチェックポイントPythonパッケージは、移行されたワークロードの検証をサポートするさまざまな機能を提供します。このセクションでは、パッケージに含まれる主要な機能の概要と、それらを効果的に使用する方法について説明します。

PySpark コードに関する情報を収集します。

snowpark-checkpoints-collectors パッケージは、 PySpark DataFrames から情報を抽出する関数を提供します。そのデータを使って、変換されたSnowpark DataFrames で、動作の同等性を確認します。

新しいチェックポイント収集ポイントを挿入するには、次の関数を使用します。

関数の署名:

def collect_dataframe_checkpoint(df: SparkDataFrame,
  checkpoint_name: str,
  sample: Optional[float],
  mode: Optional[CheckpointMode],
  output_path: Optional[str]) -> None:
Copy

関数のパラメーター:

  • df: PySpark DataFrame.

  • checkpoint_name:チェックポイントの名前。文字(A-Z、a-z)またはアンダースコア(_)で始まり、文字、アンダースコア、10進数(0-9)のみを含むコンテナー。

  • sample: (オプション) サンプルサイズ。デフォルト値は1.0(PySpark DataFrame)で、範囲は0~1.0。

  • mode: (オプション) 実行モード。オプションは SCHEMADATAFRAME。デフォルト値は SCHEMA です。

  • output_path: (オプション)チェックポイントを保存する出力パス。デフォルト値は現在の作業ディレクトリです。

収集プロセスは、各収集ポイントの結果に関する情報を含む、 checkpoint_collection_result.json と呼ばれる出力ファイルを生成します。JSON ファイルで、以下の情報が含まれています。

  • コレクションポイントが開始したタイムスタンプ。

  • コレクションポイントがあるファイルの相対パス。

  • コレクション・ポイントがあるファイルのコード行。

  • コレクションポイントチェックポイントの名前。

  • コレクションポイントの結果(不合格または合格)。

スキーマ推論収集データモード(スキーマ)

これはデフォルトのモードで、Panderaのスキーマ推論を活用して、指定された DataFrame に対して評価されるメタデータとチェックを取得します。このモードでは、 PySpark のタイプに基づいて、 DataFrame の列からカスタムデータも収集します。

列のデータとチェックは、列の PySpark タイプに基づいて収集されます(以下の表を参照)。どの列についても、そのタイプにかかわらず、収集されるカスタムデータには、列の名前、列のタイプ、nullable、行のカウント、nullでない行のカウント、null行のカウントが含まれます。

カスタムデータは、列の PySpark タイプに基づいて収集されます。

列タイプ

収集したカスタムデータ

数値 (byte, short, integer, long, floatdouble)

最小値。最大値。平均値。10進数の精度(大文字と小文字のタイプでは値は0)。標準偏差。

Date

最小値。最大値。日付の形式: %Y-%m-%d

DayTimeIntervalType および YearMonthIntervalType

最小値。最大値。

タイムスタンプ

最小値。最大値。日付の形式: %Y-%m-%dH:%M:%S

タイムスタンプ ntz

最小値。最大値。日付の形式: %Y-%m-%dT%H:%M:%S%z

String

長さの最小値。長さの最大値。

Char

PySpark リテラルは文字列型として扱われるため、 char は有効な型ではありません。

Varchar

PySpark はあらゆるリテラルを文字列型として扱うため、 Varchar は有効な型ではありません。

10進数

最小値。最大値。平均値。10進数の精度。

配列

値のタイプ。許可されている場合は、要素としてnull。null値の割合。配列の最大サイズ。配列の最小サイズ。配列の平均サイズ。すべての配列が同じサイズである場合。

バイナリ

最大サイズ。最小サイズ。平均サイズ。すべての要素が同じサイズである場合。

マップ

キーのタイプ。値のタイプ。許可されている場合は、値としてnullを指定します。null値の割合。マッピングの最大サイズ。マッピングの最小サイズ。マッピングの平均サイズ。すべてのマッピングが同じサイズである場合。

Null

NullType はNoneを表し、タイプデータが決定できないため、このタイプから情報を取得することはできません。

構造

構造体のメタデータで、各 structField に対応します: nametypenullablerows countrows not null countrows null count。それは1つの配列です。

また、以下の表に示すように、各データタイプに対して定義済みの検証チェックのセットを定義しています。

チェックは列のタイプに基づいて収集されます。

Panderaチェック

追加チェック

ブール値

各値は「True」または「False」です。

True と False の値のカウント。

数値 (byte, short, integer, long, floatdouble)

各値は最小値と最大値の範囲にあります。

10進数の精度。平均値。標準偏差。

Date

N/A

最小値と最大値。

タイムスタンプ

各値は最小値と最大値の範囲にあります。

値の形式。

タイムスタンプ ntz

各値は最小値と最大値の範囲にあります。

値の形式。

String

各値の長さは最小値から最大値の範囲です。

なし

Char

PySpark はどんなリテラルも文字列型として扱うので、 char は有効な型ではありません。

Varchar

PySpark はどんなリテラルも文字列型として扱うので、 Varchar は有効な型ではありません。

10進数

N/A

N/A

配列

N/A

なし

バイナリ

N/A

なし

マップ

N/A

なし

Null

N/A

N/A

構造

N/A

なし

このモードでは、ユーザーが収集する DataFrame のサンプルを定義できますが、これはオプションです。デフォルトでは、コレクションは DataFrame 全体で動作します。サンプルのサイズは、統計的に母集団を代表していなければなりません。

Pandera は Pandas DataFrame のスキーマしか推論できないため、 PySpark DataFrame を Pandas DataFrame に変換する必要があり、列のタイプ解決に影響を与える可能性があります。特に、Panderaは以下の PySpark タイプをオブジェクトタイプとして推論します: string arraymapnullstructbinary

このモードの出力は、収集された各 DataFrame の JSON ファイルで、ファイル名はチェックポイントと同じです。このファイルにはスキーマに関する情報が含まれており、2つのセクションがあります。

  1. Panderaスキーマセクションには、名前、タイプ(Pandas)、列がNULL値を許すかどうか、各列のその他の情報など、Panderaによって推測されるデータが含まれ、列が PySpark タイプに基づいているかどうかをチェックします。Panderaの DataFrameSchema オブジェクトです。

  2. カスタムデータセクションは、 PySpark のタイプに基づいて各列が収集したカスタムデータの配列です。

注釈

コレクション・パッケージは、大容量の PySpark DataFrames を処理する際にメモリ問題が発生する可能性があります。この問題に対処するために、 PySpark DataFrame 全体ではなくデータのサブセットを扱うために、コレクション関数のサンプルパラメーターを 0.0 から 1.0 の間の値にセットすることができます。

DataFrame 収集データモード (DataFrame)

このモードでは、 PySpark DataFrame のデータを収集します。この場合、機構は与えられた DataFrame のすべてのデータを Parquet 形式で保存します。デフォルトユーザーのSnowflake接続を使用して、ParquetファイルをSnowflake temporalステージにアップロードし、ステージ内の情報に基づいてテーブルを作成しようとします。ファイル名とテーブル名はチェックポイントと同じです。

このモードの出力は、保存された DataFrame の Parquet ファイルの結果と、デフォルトの Snowflake 構成接続の DataFrame データを含むテーブルです。