Biblioteca Snowpark Checkpoints: validadores

Validação do código convertido do Snowpark

O pacote Snowpark Checkpoints oferece um conjunto de validações que podem ser aplicadas ao código Snowpark para garantir a equivalência comportamental com o código PySpark.

Funções fornecidas pelo framework

  • check_with_spark: um decorador que converterá qualquer argumento do Snowpark DataFrame em uma função ou amostra e, em seguida, em PySpark DataFrames. Em seguida, a verificação executará uma função spark fornecida que espelha a funcionalidade da nova função Snowpark e compara as saídas entre as duas implementações. Supondo que a função spark e as funções do Snowpark sejam semanticamente idênticas, isso verifica essas funções em dados reais e amostrados.

    Parâmetros:
    • job_context (SnowparkJobContext): o contexto do trabalho que contém a configuração e os detalhes da validação.

    • spark_function (fn): a função PySpark equivalente para comparar com a implementação do Snowpark.

    • checkpoint_name (str): um nome para o ponto de verificação. O padrão é Nenhum.

    • sample_number (Optional[int], opcional): o número de linhas para validação. O padrão é 100.

    • sampling_strategy (Optional[SamplingStrategy], opcional): a estratégia usada para a amostragem de dados. O padrão é SamplingStrategy.RANDOM_SAMPLE.

    • output_path (Optional[str], opcional): o caminho para armazenar os resultados da validação. O padrão é Nenhum.

    Veja a seguir um exemplo:

     def original_spark_code_I_dont_understand(df):
     from pyspark.sql.functions import col, when
    
     ret = df.withColumn(
         "life_stage",
         when(col("byte") < 4, "child")
         .when(col("byte").between(4, 10), "teenager")
         .otherwise("adult"),
     )
     return ret
    
    
    @check_with_spark(
     job_context=job_context, spark_function=original_spark_code_I_dont_understand
    )
    def new_snowpark_code_I_do_understand(df):
      from snowflake.snowpark.functions import col, lit, when
    
      ref = df.with_column(
          "life_stage",
          when(col("byte") < 4, lit("child"))
          .when(col("byte").between(4, 10), lit("teenager"))
          .otherwise(lit("adult")),
     )
     return ref
    
    
     df1 = new_snowpark_code_I_do_understand(df)
    
    Copy
  • validate_dataframe_checkpoint: Essa função valida um Snowpark Dataframe em relação a um arquivo de esquema de ponto de verificação específico ou a um Dataframe importado de acordo com o modo de argumento. Isso garante que as informações coletadas para esse DataFrame e o DataFrame que é passado para a função sejam equivalentes.

    Parâmetros:
    • df (SnowparkDataFrame): o DataFrame a ser validado.

    • checkpoint_name (str): o nome do ponto de verificação para validação.

    • job_context (SnowparkJobContext, opcional) (str): o contexto do trabalho para a validação. Necessário para o modo PARQUET.

    • mode (CheckpointMode): o modo de validação (por exemplo, SCHEMA, PARQUET). O padrão é SCHEMA.

    • custom_checks (Optional[dict[Any, Any]], opcional): verificações personalizadas a serem aplicadas durante a validação.

    • skip_checks (Optional[dict[Any, Any]], opcional): verificações a serem ignoradas durante a validação.

    • sample_frac (Optional[float], opcional): fração do DataFrame a ser amostrada para validação. O padrão é 0,1.

    • sample_number (Optional[int], opcional): número de linhas a serem amostradas para validação.

    • sampling_strategy (Optional[SamplingStrategy], opcional): estratégia a ser usada para amostragem.

    • output_path (Optional[str], opcional): o caminho de saída para os resultados da validação.

    Veja a seguir um exemplo:

    # Check a schema/stats here!
    validate_dataframe_checkpoint(
       df1,
    "demo_add_a_column_dataframe",
    job_context=job_context,
    mode=CheckpointMode.DATAFRAME, # CheckpointMode.Schema)
    )
    
    Copy

    Dependendo do modo selecionado, a validação usará o arquivo de esquema coletado ou um Dataframe carregado com Parquet no Snowflake para verificar a equivalência com a versão PySpark.

  • check-output_schema: esse decorador valida o esquema de saída de uma função do Snowpark e garante que o DataFrame de saída esteja em conformidade com um esquema Pandera especificado. Ele é especialmente útil para reforçar a integridade e a consistência dos dados nos pipelines do Snowpark. Esse decorador recebe vários parâmetros, inclusive o esquema Pandera para validação, o nome do ponto de verificação, parâmetros de amostragem e um contexto de trabalho opcional. Ele envolve a função Snowpark e executa a validação do esquema na saída DataFrame antes de retornar o resultado.

    Veja a seguir um exemplo:

    from pandas import DataFrame as PandasDataFrame
    from pandera import DataFrameSchema, Column, Check
    from snowflake.snowpark import Session
    from snowflake.snowpark import DataFrame as SnowparkDataFrame
    from snowflake.snowpark_checkpoints.checkpoint import check_output_schema
    from numpy import int8
    
    # Define the Pandera schema
    out_schema = DataFrameSchema(
    {
        "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)),
        "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)),
        "COLUMN3": Column(float, Check.less_than(10)),
    }
    )
    
    # Define the Snowpark function and apply the decorator
    @check_output_schema(out_schema, "output_schema_checkpoint")
    def preprocessor(dataframe: SnowparkDataFrame):
     return dataframe.with_column(
        "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"]
    )
    
    # Create a Snowpark session and DataFrame
    session = Session.builder.getOrCreate()
    df = PandasDataFrame(
    {
        "COLUMN1": [1, 4, 0, 10, 9],
        "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
    }
    )
    
    sp_dataframe = session.create_dataframe(df)
    
    # Apply the preprocessor function
    preprocessed_dataframe = preprocessor(sp_dataframe)
    
    Copy
  • check_input_schema: esse decorador valida o esquema dos argumentos de entrada de uma função do Snowpark. Esse decorador garante que a entrada DataFrame esteja em conformidade com um esquema Pandera especificado antes de a função ser executada. Ele é especialmente útil para reforçar a integridade e a consistência dos dados nos pipelines do Snowpark. Esse decorador recebe vários parâmetros, inclusive o esquema Pandera para validação, o nome do ponto de verificação, parâmetros de amostragem e um contexto de trabalho opcional. Ele envolve a função Snowpark e realiza a validação do esquema na entrada DataFrame antes de executar a função.

    Veja a seguir um exemplo:

    from pandas import DataFrame as PandasDataFrame
    from pandera import DataFrameSchema, Column, Check
    from snowflake.snowpark import Session
    from snowflake.snowpark import DataFrame as SnowparkDataFrame
    from snowflake.snowpark_checkpoints.checkpoint import check_input_schema
    from numpy import int8
    
    # Define the Pandera schema
    input_schema = DataFrameSchema(
    {
        "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)),
        "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)),
    }
    )
    
    # Define the Snowpark function and apply the decorator
    @check_input_schema(input_schema, "input_schema_checkpoint")
    def process_dataframe(dataframe: SnowparkDataFrame):
    return dataframe.with_column(
        "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"]
    )
    
    # Create a Snowpark session and DataFrame
    session = Session.builder.getOrCreate()
    df = PandasDataFrame(
    {
        "COLUMN1": [1, 4, 0, 10, 9],
        "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
    }
    )
    sp_dataframe = session.create_dataframe(df)
    
    # Apply the process_dataframe function
    processed_dataframe = process_dataframe(sp_dataframe)
    
    Copy

Verificações de estatísticas

As validações de estatísticas são aplicadas ao tipo de coluna específico por padrão quando a validação é executada no modo Schema; essas verificações podem ser ignoradas com skip_checks.

Tipo de coluna

Verificação padrão

Numérico: byte, short, integer, long, float e double

between: se o valor estiver entre o mínimo ou o máximo, incluindo o mínimo e o máximo.

decimal_precision: se o valor for decimal, isso verificará a precisão decimal.

mean: validar se a média das colunas está dentro de um intervalo específico.

Booliano

isin: validar se o valor é True (verdadeiro) ou False (falso).

True_proportion: validar se a proporção dos valores True está dentro de um intervalo específico.

False_proportion: validar se a proporção dos valores False está dentro de um intervalo específico.

Data: date, timestamp e timestamp_ntz

between: se o valor estiver entre o mínimo ou o máximo, incluindo o mínimo e o máximo.

Anulável: todos os tipos suportados

Null_proportion: validar a proporção nula adequadamente.

Ignorar verificações

Há um controle granular para verificações, que permite ignorar a validação da coluna ou verificações específicas para uma coluna. Com o parâmetro skip_checks, você pode especificar a coluna específica e o tipo de validação que deseja ignorar. O nome da verificação usada para ignorar é aquele associado à verificação.

  • str_contains

  • str_endswith

  • str_length

  • str_matches

  • str_startswith

  • in_range

  • ​​equal_to

  • greater_than_or_equal_to

  • greater_than

  • less_than_or_equal_to

  • less_than

  • not_equal_to

  • notin

  • isin

df = pd.DataFrame(
{
      "COLUMN1": [1, 4, 0, 10, 9],
      "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)

schema = DataFrameSchema(
{
      "COLUMN1": Column(int8, Check.between(0, 10, element_wise=True)),
      "COLUMN2": Column(
          float,
          [
              Check.greater_than(-20.5),
              Check.less_than(-1.0),
              Check(lambda x: x < -1.2),
          ],
      ),
}
)

session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)
check_dataframe_schema(
  sp_df,
  schema,
  skip_checks={"COLUMN1": [SKIP_ALL], "COLUMN2": ["greater_than", "less_than"]},
)
Copy

Verificações personalizadas

Você pode adicionar verificações adicionais ao esquema gerado a partir do arquivo JSON com a propriedade custom_checks. Isso adicionará a verificação ao esquema pandera:

df = pd.DataFrame(
  {
        "COLUMN1": [1, 4, 0, 10, 9],
        "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
  }
)

session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)

# Those check will be added to the schema generate from the JSON file
result = validate_dataframe_checkpoint(
  sp_df,
  "checkpoint-name",
  custom_checks={
        "COLUMN1": [
            Check(lambda x: x.shape[0] == 5),
            Check(lambda x: x.shape[1] == 2),
    ],
    "COLUMN2": [Check(lambda x: x.shape[0] == 5)],
  },
)
Copy

Estratégias de amostragem

O processo de amostragem do código fornecido foi projetado para validar com eficiência grandes DataFrames, coletando uma amostra representativa dos dados. Essa abordagem ajuda a realizar a validação do esquema sem a necessidade de processar todo o conjunto de dados, o que pode ser computacionalmente caro e demorado.

Parâmetros:
  • sample_frac: esse parâmetro especifica a fração do DataFrame a ser amostrada. Por exemplo, se sample_frac for definido como 0,1, então 10% das linhas de DataFrame serão amostradas. Isso é útil quando você deseja validar um subconjunto dos dados para economizar recursos computacionais.

  • sample_number: ssse parâmetro especifica o número exato de linhas para amostragem no DataFrame. Por exemplo, se sample_number for definido como 100, então 100 linhas serão amostradas a partir de DataFrame. Isso é útil quando você deseja validar um número fixo de linhas, independentemente do tamanho do DataFrame.

Resultado da validação

Depois que qualquer tipo de validação for executado, o resultado, seja ele aprovado ou reprovado, será salvo em checkpoint_validation_results.json. Esse arquivo é usado principalmente para as funcionalidades da extensão VSCode. Ele conterá informações sobre o status da validação, o carimbo de data/hora, o nome do ponto de verificação, o número da linha em que ocorre a execução da função e o arquivo.

Ele também registrará o resultado na conta padrão do Snowflake em uma tabela chamada SNOWPARK_CHECKPOINTS_REPORT, que conterá informações sobre o resultado da validação.

  • DATE: carimbo de data/hora da execução da validação.

  • JOB: nome do SnowparkJobContext.

  • STATUS: status da validação.

  • CHECKPOINT: nome do ponto de verificação validado.

  • MESSAGE: mensagem de erro.

  • DATA: dados da execução da validação.

  • EXECUTION_MODE: modo de validação executado.