Biblioteca Snowpark Checkpoints: validadores¶
Validação do código convertido do Snowpark¶
O pacote Snowpark Checkpoints oferece um conjunto de validações que podem ser aplicadas ao código Snowpark para garantir a equivalência comportamental com o código PySpark.
Funções fornecidas pelo framework¶
check_with_spark: um decorador que converterá qualquer argumento do Snowpark DataFrame em uma função ou amostra e, em seguida, em PySpark DataFrames. Em seguida, a verificação executará uma função spark fornecida que espelha a funcionalidade da nova função Snowpark e compara as saídas entre as duas implementações. Supondo que a função spark e as funções do Snowpark sejam semanticamente idênticas, isso verifica essas funções em dados reais e amostrados.
- Parâmetros:
job_context
(SnowparkJobContext): o contexto do trabalho que contém a configuração e os detalhes da validação.spark_function
(fn): a função PySpark equivalente para comparar com a implementação do Snowpark.checkpoint_name
(str): um nome para o ponto de verificação. O padrão é Nenhum.sample_number
(Optional[int], opcional): o número de linhas para validação. O padrão é 100.sampling_strategy
(Optional[SamplingStrategy], opcional): a estratégia usada para a amostragem de dados. O padrão éSamplingStrategy.RANDOM_SAMPLE
.output_path
(Optional[str], opcional): o caminho para armazenar os resultados da validação. O padrão é Nenhum.
Veja a seguir um exemplo:
def original_spark_code_I_dont_understand(df): from pyspark.sql.functions import col, when ret = df.withColumn( "life_stage", when(col("byte") < 4, "child") .when(col("byte").between(4, 10), "teenager") .otherwise("adult"), ) return ret @check_with_spark( job_context=job_context, spark_function=original_spark_code_I_dont_understand ) def new_snowpark_code_I_do_understand(df): from snowflake.snowpark.functions import col, lit, when ref = df.with_column( "life_stage", when(col("byte") < 4, lit("child")) .when(col("byte").between(4, 10), lit("teenager")) .otherwise(lit("adult")), ) return ref df1 = new_snowpark_code_I_do_understand(df)
validate_dataframe_checkpoint: Essa função valida um Snowpark Dataframe em relação a um arquivo de esquema de ponto de verificação específico ou a um Dataframe importado de acordo com o modo de argumento. Isso garante que as informações coletadas para esse DataFrame e o DataFrame que é passado para a função sejam equivalentes.
- Parâmetros:
df
(SnowparkDataFrame): o DataFrame a ser validado.checkpoint_name
(str): o nome do ponto de verificação para validação.job_context
(SnowparkJobContext, opcional) (str): o contexto do trabalho para a validação. Necessário para o modo PARQUET.mode
(CheckpointMode): o modo de validação (por exemplo, SCHEMA, PARQUET). O padrão é SCHEMA.custom_checks
(Optional[dict[Any, Any]], opcional): verificações personalizadas a serem aplicadas durante a validação.skip_checks
(Optional[dict[Any, Any]], opcional): verificações a serem ignoradas durante a validação.sample_frac
(Optional[float], opcional): fração do DataFrame a ser amostrada para validação. O padrão é 0,1.sample_number
(Optional[int], opcional): número de linhas a serem amostradas para validação.sampling_strategy
(Optional[SamplingStrategy], opcional): estratégia a ser usada para amostragem.output_path
(Optional[str], opcional): o caminho de saída para os resultados da validação.
Veja a seguir um exemplo:
# Check a schema/stats here! validate_dataframe_checkpoint( df1, "demo_add_a_column_dataframe", job_context=job_context, mode=CheckpointMode.DATAFRAME, # CheckpointMode.Schema) )
Dependendo do modo selecionado, a validação usará o arquivo de esquema coletado ou um Dataframe carregado com Parquet no Snowflake para verificar a equivalência com a versão PySpark.
check-output_schema: esse decorador valida o esquema de saída de uma função do Snowpark e garante que o DataFrame de saída esteja em conformidade com um esquema Pandera especificado. Ele é especialmente útil para reforçar a integridade e a consistência dos dados nos pipelines do Snowpark. Esse decorador recebe vários parâmetros, inclusive o esquema Pandera para validação, o nome do ponto de verificação, parâmetros de amostragem e um contexto de trabalho opcional. Ele envolve a função Snowpark e executa a validação do esquema na saída DataFrame antes de retornar o resultado.
Veja a seguir um exemplo:
from pandas import DataFrame as PandasDataFrame from pandera import DataFrameSchema, Column, Check from snowflake.snowpark import Session from snowflake.snowpark import DataFrame as SnowparkDataFrame from snowflake.snowpark_checkpoints.checkpoint import check_output_schema from numpy import int8 # Define the Pandera schema out_schema = DataFrameSchema( { "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)), "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)), "COLUMN3": Column(float, Check.less_than(10)), } ) # Define the Snowpark function and apply the decorator @check_output_schema(out_schema, "output_schema_checkpoint") def preprocessor(dataframe: SnowparkDataFrame): return dataframe.with_column( "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"] ) # Create a Snowpark session and DataFrame session = Session.builder.getOrCreate() df = PandasDataFrame( { "COLUMN1": [1, 4, 0, 10, 9], "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4], } ) sp_dataframe = session.create_dataframe(df) # Apply the preprocessor function preprocessed_dataframe = preprocessor(sp_dataframe)
check_input_schema: esse decorador valida o esquema dos argumentos de entrada de uma função do Snowpark. Esse decorador garante que a entrada DataFrame esteja em conformidade com um esquema Pandera especificado antes de a função ser executada. Ele é especialmente útil para reforçar a integridade e a consistência dos dados nos pipelines do Snowpark. Esse decorador recebe vários parâmetros, inclusive o esquema Pandera para validação, o nome do ponto de verificação, parâmetros de amostragem e um contexto de trabalho opcional. Ele envolve a função Snowpark e realiza a validação do esquema na entrada DataFrame antes de executar a função.
Veja a seguir um exemplo:
from pandas import DataFrame as PandasDataFrame from pandera import DataFrameSchema, Column, Check from snowflake.snowpark import Session from snowflake.snowpark import DataFrame as SnowparkDataFrame from snowflake.snowpark_checkpoints.checkpoint import check_input_schema from numpy import int8 # Define the Pandera schema input_schema = DataFrameSchema( { "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)), "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)), } ) # Define the Snowpark function and apply the decorator @check_input_schema(input_schema, "input_schema_checkpoint") def process_dataframe(dataframe: SnowparkDataFrame): return dataframe.with_column( "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"] ) # Create a Snowpark session and DataFrame session = Session.builder.getOrCreate() df = PandasDataFrame( { "COLUMN1": [1, 4, 0, 10, 9], "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4], } ) sp_dataframe = session.create_dataframe(df) # Apply the process_dataframe function processed_dataframe = process_dataframe(sp_dataframe)
Verificações de estatísticas¶
As validações de estatísticas são aplicadas ao tipo de coluna específico por padrão quando a validação é executada no modo Schema
; essas verificações podem ser ignoradas com skip_checks
.
Tipo de coluna |
Verificação padrão |
---|---|
Numérico: |
between: se o valor estiver entre o mínimo ou o máximo, incluindo o mínimo e o máximo. decimal_precision: se o valor for decimal, isso verificará a precisão decimal. mean: validar se a média das colunas está dentro de um intervalo específico. |
Booliano |
isin: validar se o valor é True (verdadeiro) ou False (falso). True_proportion: validar se a proporção dos valores True está dentro de um intervalo específico. False_proportion: validar se a proporção dos valores False está dentro de um intervalo específico. |
Data: |
between: se o valor estiver entre o mínimo ou o máximo, incluindo o mínimo e o máximo. |
Anulável: todos os tipos suportados |
Null_proportion: validar a proporção nula adequadamente. |
Ignorar verificações¶
Há um controle granular para verificações, que permite ignorar a validação da coluna ou verificações específicas para uma coluna. Com o parâmetro skip_checks
, você pode especificar a coluna específica e o tipo de validação que deseja ignorar. O nome da verificação usada para ignorar é aquele associado à verificação.
str_contains
str_endswith
str_length
str_matches
str_startswith
in_range
equal_to
greater_than_or_equal_to
greater_than
less_than_or_equal_to
less_than
not_equal_to
notin
isin
df = pd.DataFrame(
{
"COLUMN1": [1, 4, 0, 10, 9],
"COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)
schema = DataFrameSchema(
{
"COLUMN1": Column(int8, Check.between(0, 10, element_wise=True)),
"COLUMN2": Column(
float,
[
Check.greater_than(-20.5),
Check.less_than(-1.0),
Check(lambda x: x < -1.2),
],
),
}
)
session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)
check_dataframe_schema(
sp_df,
schema,
skip_checks={"COLUMN1": [SKIP_ALL], "COLUMN2": ["greater_than", "less_than"]},
)
Verificações personalizadas¶
Você pode adicionar verificações adicionais ao esquema gerado a partir do arquivo JSON com a propriedade custom_checks
. Isso adicionará a verificação ao esquema pandera:
df = pd.DataFrame(
{
"COLUMN1": [1, 4, 0, 10, 9],
"COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)
session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)
# Those check will be added to the schema generate from the JSON file
result = validate_dataframe_checkpoint(
sp_df,
"checkpoint-name",
custom_checks={
"COLUMN1": [
Check(lambda x: x.shape[0] == 5),
Check(lambda x: x.shape[1] == 2),
],
"COLUMN2": [Check(lambda x: x.shape[0] == 5)],
},
)
Estratégias de amostragem¶
O processo de amostragem do código fornecido foi projetado para validar com eficiência grandes DataFrames, coletando uma amostra representativa dos dados. Essa abordagem ajuda a realizar a validação do esquema sem a necessidade de processar todo o conjunto de dados, o que pode ser computacionalmente caro e demorado.
- Parâmetros:
sample_frac
: esse parâmetro especifica a fração do DataFrame a ser amostrada. Por exemplo, sesample_frac
for definido como 0,1, então 10% das linhas de DataFrame serão amostradas. Isso é útil quando você deseja validar um subconjunto dos dados para economizar recursos computacionais.sample_number
: ssse parâmetro especifica o número exato de linhas para amostragem no DataFrame. Por exemplo, sesample_number
for definido como 100, então 100 linhas serão amostradas a partir de DataFrame. Isso é útil quando você deseja validar um número fixo de linhas, independentemente do tamanho do DataFrame.
Resultado da validação¶
Depois que qualquer tipo de validação for executado, o resultado, seja ele aprovado ou reprovado, será salvo em checkpoint_validation_results.json
. Esse arquivo é usado principalmente para as funcionalidades da extensão VSCode. Ele conterá informações sobre o status da validação, o carimbo de data/hora, o nome do ponto de verificação, o número da linha em que ocorre a execução da função e o arquivo.
Ele também registrará o resultado na conta padrão do Snowflake em uma tabela chamada SNOWPARK_CHECKPOINTS_REPORT, que conterá informações sobre o resultado da validação.
DATE
: carimbo de data/hora da execução da validação.JOB
: nome do SnowparkJobContext.STATUS
: status da validação.CHECKPOINT
: nome do ponto de verificação validado.MESSAGE
: mensagem de erro.DATA
: dados da execução da validação.EXECUTION_MODE
: modo de validação executado.