Biblioteca Snowpark Checkpoints¶
O Snowpark Checkpoints é uma biblioteca de testes que valida o código migrado do Apache PySpark para o Snowpark Python.
Pré-requisitos¶
Para usar o Snowpark Checkpoints, configure um ambiente de desenvolvimento Python. As versões suportadas do Python são:
3.9
3,10
3,11
Nota
O Python 3.9 depende da versão 1.5.0 do cliente Snowpark. O Python 3.10 depende da versão 1.5.1 do cliente Snowpark. O Python 3.11 depende da versão 1.9.0 do cliente Snowpark.
Você pode criar um ambiente virtual Python para uma versão específica do Python usando ferramentas como Anaconda, Miniconda ou virtualenv.
Instalação do Snowpark Checkpoints¶
Instale o pacote Snowpark Checkpoints em um ambiente virtual Python usando conda ou pip.
Usando conda:
conda install snowpark-checkpoints
Usando pip:
pip install snowpark-checkpoints
Se preferir, você também pode instalar os pacotes individualmente:
snowpark-checkpoints-collectors - Use este pacote para coletar informações sobre PySpark DataFrames.
Usando conda:
conda install snowpark-checkpoints-collectors
Usando pip:
pip install snowpark-checkpoints-collectors
snowpark-checkpoints-hypothesis - Use esse pacote para criar testes de unidade para o código do Snowpark com base em dados sintéticos gerados automaticamente, seguindo os esquemas DataFrame coletados do código PySpark original.
Usando conda:
conda install snowpark-checkpoints-hypothesis
Usando pip:
pip install snowpark-checkpoints-hypothesis
snowpark-checkpoints-validators - Use esse pacote para validar o Snowpark DataFrames convertido em relação aos esquemas coletados ou ao DataFrames exportado gerado pela funcionalidade do coletor.
Usando conda:
conda install snowpark-checkpoints-validators
Usando pip:
pip install snowpark-checkpoints-validators
snowpark-checkpoints-configuration - Use esse pacote para permitir que
snowpark-checkpoints-collectors
esnowpark-checkpoints-validators
carreguem automaticamente a configuração dos pontos de verificação.Usando conda:
conda install snowpark-checkpoints-configuration
Usando pip:
pip install snowpark-checkpoints-configuration
Uso do framework¶
Coleta de informações sobre o seu código PySpark¶
O pacote snowpark-checkpoints-collectors
oferece uma função para extrair informações do site PySpark DataFrames. Em seguida, podemos usar esses dados para validar com o Snowpark DataFrames convertido para garantir a equivalência comportamental.
Use a função a seguir para inserir um novo ponto de coleta de ponto de verificação:
Assinatura da função:
def collect_dataframe_checkpoint(df: SparkDataFrame,
checkpoint_name: str,
sample: Optional[float],
mode: Optional[CheckpointMode],
output_path: Optional[str]) -> None:
Parâmetros de função:
df: o PySpark DataFrame.
checkpoint_name: o nome do ponto de verificação. Começa com uma letra (A-Z, a-z) ou um sublinhado (_) e contém apenas letras, sublinhados e dígitos decimais (0-9).
sample: (opcional) o tamanho da amostra. O valor padrão é 1,0 (PySpark DataFrame inteiro) em um intervalo de 0 a 1,0.
mode: (opcional) o modo de execução. As opções são
SCHEMA
eDATAFRAME
. O valor padrão éSCHEMA
.output_path: (opcional) o caminho de saída para salvar o ponto de verificação. O valor padrão é o diretório de trabalho atual.
O processo de coleta gera um arquivo de saída, chamado checkpoint_collection_result.json
, com as informações sobre o resultado de cada ponto de coleta. É um arquivo JSON e contém as seguintes informações:
Um carimbo de data/hora em que o ponto de coleta foi iniciado.
O caminho relativo do arquivo onde está o ponto de coleta.
A linha de código do arquivo onde está o ponto de coleta.
O nome do ponto de verificação do ponto de coleta.
O resultado do ponto de coleta (falha ou aprovação).
Modo de dados coletados de inferência de esquema (esquema)¶
Esse é o modo padrão, que utiliza a inferência de esquema do Pandera para obter os metadados e as verificações que serão avaliadas para o DataFrame especificado. Esse modo também coleta dados personalizados das colunas do DataFrame com base no tipo PySpark.
Os dados e verificações da coluna são coletados com base no tipo PySpark da coluna (veja as tabelas abaixo). Para qualquer coluna, independentemente de seu tipo, os dados personalizados coletados incluirão o nome da coluna, o tipo da coluna, anulável, a contagem de linhas, a contagem de linhas não nulas e a contagem de linhas nulas.
Tipo de coluna |
Dados personalizados coletados |
---|---|
Numérico ( |
O valor mínimo. O valor máximo. O valor médio. A precisão decimal (no caso do tipo inteiro, o valor é zero). O desvio padrão. |
Data |
O valor mínimo. O valor máximo. O formato da data: %Y-%m-%d |
DayTimeIntervalType e YearMonthIntervalType |
O valor mínimo. O valor máximo. |
Carimbo de data/hora |
O valor mínimo. O valor máximo. O formato da data: %Y-%m-%dH:%M:%S |
Carimbo de data/hora ntz |
O valor mínimo. O valor máximo. O formato da data: %Y-%m-%dT%H:%M:%S%z |
Cadeia de caracteres |
O valor do comprimento mínimo. O valor do comprimento máximo. |
Char |
PySpark trata qualquer literal como um tipo de cadeia de caracteres, portanto, char não é um tipo válido. |
Varchar |
PySpark trata qualquer literal como um tipo de cadeia de caracteres, portanto, Varchar não é um tipo válido. |
Decimal |
O valor mínimo. O valor máximo. O valor médio. A precisão decimal. |
Matriz |
O tipo do valor. Se permitido, nulo como um elemento. A proporção de valores nulos. O tamanho máximo da matriz. O tamanho mínimo da matriz. O tamanho médio das matrizes. Se todas as matrizes tiverem o mesmo tamanho. |
Binário |
O tamanho máximo. O tamanho mínimo. O tamanho médio. Se todos os elementos tiverem o mesmo tamanho. |
Mapa |
O tipo da chave. O tipo do valor. Se permitido, nulo como um valor. A proporção de valores nulos. O tamanho máximo do mapa. O tamanho mínimo do mapa. O tamanho médio do mapa. Se todos os mapas tiverem o mesmo tamanho. |
Nulo |
NullType representa Nenhum, porque os dados do tipo não podem ser determinados; portanto, não é possível obter informações desse tipo. |
Estrutura |
Os metadados da estrutura são para cada structField: |
Ele também define um conjunto de verificações de validação predefinidas para cada tipo de dados detalhado na tabela a seguir:
Tipo |
Verificações Pandera |
Verificações adicionais |
---|---|---|
Booleano |
Cada valor é True ou False. |
A contagem de valores True e False. |
Numérico ( |
Cada valor está no intervalo do valor mínimo e do valor máximo. |
A precisão decimal. O valor médio. O desvio padrão. |
Data |
N/A |
Valores mínimos e máximos. |
Carimbo de data/hora |
Cada valor está no intervalo do valor mínimo e do valor máximo. |
O formato do valor. |
Carimbo de data/hora ntz |
Cada valor está no intervalo do valor mínimo e do valor máximo. |
O formato do valor. |
Cadeia de caracteres |
O comprimento de cada valor está no intervalo de comprimento mínimo e máximo. |
Nenhum |
Char |
PySpark trata qualquer literal como um tipo de cadeia de caracteres, portanto, |
|
Varchar |
PySpark trata qualquer literal como um tipo de cadeia de caracteres, portanto, |
|
Decimal |
N/A |
N/A |
Matriz |
N/A |
Nenhum |
Binário |
N/A |
Nenhum |
Mapa |
N/A |
Nenhum |
Nulo |
N/A |
N/A |
Estrutura |
N/A |
Nenhum |
Esse modo permite que o usuário defina uma amostra de um DataFrame para coletar, mas é opcional. Por padrão, a coleção funciona com todo o DataFrame. O tamanho da amostra deve representar estatisticamente a população.
O Pandera só pode inferir o esquema de um Pandas DataFrame, o que implica que o PySpark DataFrame deve ser convertido em um Pandas DataFrame, o que pode afetar as resoluções de tipo das colunas. Em particular, o Pandera infere os seguintes tipos de PySpark como tipos de objeto: string
, array
, map
, null
, struct
e binary
.
A saída desse modo é um arquivo JSON para cada DataFrame coletado, em que o nome do arquivo é o mesmo do ponto de verificação. Esse arquivo contém informações relacionadas ao esquema e tem duas seções:
A seção de esquema do Pandera contém os dados inferidos pelo Pandera, como nome, tipo (Pandas), se a coluna permite ou não valores nulos e outras informações para cada coluna, além de verificações das colunas com base no tipo PySpark. É um objeto
DataFrameSchema
de Pandera.A seção de dados personalizados é uma matriz dos dados personalizados coletados por cada coluna com base no tipo PySpark.
Nota
O pacote de coleta pode ter problemas de memória ao processar grandes PySpark DataFrames. Para resolver isso, você pode definir o parâmetro de amostra na função de coleta para um valor entre 0,0 e 1,0, a fim de trabalhar com um subconjunto dos dados em vez de todo o PySpark DataFrame.
Modo de dados coletados DataFrame (DataFrame)¶
Esse modo coleta os dados do PySpark DataFrame. Nesse caso, o mecanismo salva todos os dados do DataFrame fornecido no formato parquet. Usando a conexão padrão do Snowflake do usuário, ele tenta carregar os arquivos de parquet no estágio temporal do Snowflake e criar uma tabela com base nas informações do estágio. O nome do arquivo e da tabela são os mesmos do ponto de verificação.
A saída desse modo é um resultado de arquivo parquet do DataFrame salvo e uma tabela com os dados do DataFrame na conexão de configuração padrão do Snowflake.
Validação do código convertido do Snowpark¶
O pacote Snowpark Checkpoints oferece um conjunto de validações que podem ser aplicadas ao código Snowpark para garantir a equivalência comportamental com o código PySpark.
Funções fornecidas pelo framework¶
check_with_spark: um decorador que converterá qualquer argumento do Snowpark DataFrame em uma função ou amostra e, em seguida, em PySpark DataFrames. Em seguida, a verificação executará uma função spark fornecida que espelha a funcionalidade da nova função Snowpark e comparará os resultados entre as duas implementações. Supondo que a função spark e as funções do Snowpark sejam semanticamente idênticas, isso permite a verificação dessas funções em dados reais e amostrados.
- Parâmetros:
job_context
(SnowparkJobContext): o contexto do trabalho que contém a configuração e os detalhes da validação.spark_function
(fn): a função PySpark equivalente para comparar com a implementação do Snowpark.checkpoint_name
(str): um nome para o ponto de verificação. O padrão é Nenhum.sample_number
(Optional[int], opcional): o número de linhas para validação. O padrão é 100.sampling_strategy
(Optional[SamplingStrategy], opcional): a estratégia usada para a amostragem de dados. O padrão éSamplingStrategy.RANDOM_SAMPLE
.output_path
(Optional[str], opcional): o caminho para armazenar os resultados da validação. O padrão é Nenhum.
Veja a seguir um exemplo:
def original_spark_code_I_dont_understand(df): from pyspark.sql.functions import col, when ret = df.withColumn( "life_stage", when(col("byte") < 4, "child") .when(col("byte").between(4, 10), "teenager") .otherwise("adult"), ) return ret @check_with_spark( job_context=job_context, spark_function=original_spark_code_I_dont_understand ) def new_snowpark_code_I_do_understand(df): from snowflake.snowpark.functions import col, lit, when ref = df.with_column( "life_stage", when(col("byte") < 4, lit("child")) .when(col("byte").between(4, 10), lit("teenager")) .otherwise(lit("adult")), ) return ref df1 = new_snowpark_code_I_do_understand(df)
validate_dataframe_checkpoint: Essa função valida um Snowpark Dataframe em relação a um arquivo de esquema de ponto de verificação específico ou a um Dataframe importado de acordo com o modo de argumento. Isso garante que as informações coletadas para esse DataFrame e o DataFrame que é passado para a função sejam equivalentes.
- Parâmetros:
df
(SnowparkDataFrame): o DataFrame a ser validado.checkpoint_name
(str): o nome do ponto de verificação para validação.job_context
(SnowparkJobContext, opcional) (str): o contexto do trabalho para a validação. Necessário para o modo PARQUET.mode
(CheckpointMode): o modo de validação (por exemplo, SCHEMA, PARQUET). O padrão é SCHEMA.custom_checks
(Optional[dict[Any, Any]], opcional): verificações personalizadas a serem aplicadas durante a validação.skip_checks
(Optional[dict[Any, Any]], opcional): verificações a serem ignoradas durante a validação.sample_frac
(Optional[float], opcional): fração do DataFrame a ser amostrada para validação. O padrão é 0,1.sample_number
(Optional[int], opcional): número de linhas a serem amostradas para validação.sampling_strategy
(Optional[SamplingStrategy], opcional): estratégia a ser usada para amostragem.output_path
(Optional[str], opcional): o caminho de saída para os resultados da validação.
Veja a seguir um exemplo:
# Check a schema/stats here! validate_dataframe_checkpoint( df1, "demo_add_a_column_dataframe", job_context=job_context, mode=CheckpointMode.DATAFRAME, # CheckpointMode.Schema) )
Dependendo do modo selecionado, a validação usará o arquivo de esquema coletado ou um Dataframe carregado com Parquet no Snowflake para verificar a equivalência com a versão PySpark.
check-ouput_schema: esse decorador valida o esquema de saída de uma função do Snowpark e garante que a saída DataFrame esteja em conformidade com um esquema Pandera especificado. Ele é especialmente útil para reforçar a integridade e a consistência dos dados nos pipelines do Snowpark. Esse decorador recebe vários parâmetros, inclusive o esquema Pandera para validação, o nome do ponto de verificação, parâmetros de amostragem e um contexto de trabalho opcional. Ele envolve a função Snowpark e executa a validação do esquema na saída DataFrame antes de retornar o resultado.
Veja a seguir um exemplo:
from pandas import DataFrame as PandasDataFrame from pandera import DataFrameSchema, Column, Check from snowflake.snowpark import Session from snowflake.snowpark import DataFrame as SnowparkDataFrame from snowflake.snowpark_checkpoints.checkpoint import check_output_schema from numpy import int8 # Define the Pandera schema out_schema = DataFrameSchema( { "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)), "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)), "COLUMN3": Column(float, Check.less_than(10)), } ) # Define the Snowpark function and apply the decorator @check_output_schema(out_schema, "output_schema_checkpoint") def preprocessor(dataframe: SnowparkDataFrame): return dataframe.with_column( "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"] ) # Create a Snowpark session and DataFrame session = Session.builder.getOrCreate() df = PandasDataFrame( { "COLUMN1": [1, 4, 0, 10, 9], "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4], } ) sp_dataframe = session.create_dataframe(df) # Apply the preprocessor function preprocessed_dataframe = preprocessor(sp_dataframe)
check_input_schema: esse decorador valida o esquema dos argumentos de entrada de uma função do Snowpark. Esse decorador garante que a entrada DataFrame esteja em conformidade com um esquema Pandera especificado antes de a função ser executada. Ele é especialmente útil para reforçar a integridade e a consistência dos dados nos pipelines do Snowpark. Esse decorador recebe vários parâmetros, inclusive o esquema Pandera para validação, o nome do ponto de verificação, parâmetros de amostragem e um contexto de trabalho opcional. Ele envolve a função Snowpark e realiza a validação do esquema na entrada DataFrame antes de executar a função.
Veja a seguir um exemplo:
from pandas import DataFrame as PandasDataFrame from pandera import DataFrameSchema, Column, Check from snowflake.snowpark import Session from snowflake.snowpark import DataFrame as SnowparkDataFrame from snowflake.snowpark_checkpoints.checkpoint import check_input_schema from numpy import int8 # Define the Pandera schema input_schema = DataFrameSchema( { "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)), "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)), } ) # Define the Snowpark function and apply the decorator @check_input_schema(input_schema, "input_schema_checkpoint") def process_dataframe(dataframe: SnowparkDataFrame): return dataframe.with_column( "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"] ) # Create a Snowpark session and DataFrame session = Session.builder.getOrCreate() df = PandasDataFrame( { "COLUMN1": [1, 4, 0, 10, 9], "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4], } ) sp_dataframe = session.create_dataframe(df) # Apply the process_dataframe function processed_dataframe = process_dataframe(sp_dataframe)
Verificações de estatísticas¶
As validações de estatísticas são aplicadas ao tipo de coluna específico por padrão quando a validação é executada no modo Schema
; essas verificações podem ser ignoradas com skip_checks
.
Tipo de coluna |
Verificação padrão |
---|---|
Numérico: |
between: se o valor estiver entre o mínimo ou o máximo, incluindo o mínimo e o máximo. decimal_precision: se o valor for decimal, isso verificará a precisão decimal. mean: validar se a média das colunas está dentro de um intervalo específico. |
Booleano |
isin: validar se o valor é True (verdadeiro) ou False (falso). True_proportion: validar se a proporção dos valores True está dentro de um intervalo específico. False_proportion: validar se a proporção dos valores False está dentro de um intervalo específico. |
Data: |
between: se o valor estiver entre o mínimo ou o máximo, incluindo o mínimo e o máximo. |
Anulável: todos os tipos suportados |
Null_proportion: validar a proporção nula adequadamente. |
Ignorar verificações¶
Há um controle granular para verificações, que permite ignorar a validação da coluna ou verificações específicas para uma coluna. Com o parâmetro skip_checks
, você pode especificar a coluna específica e o tipo de validação que deseja ignorar. O nome da verificação usada para ignorar é aquele associado à verificação.
str_contains
str_endswith
str_length
str_matches
str_startswith
in_range
equal_to
greater_than_or_equal_to
greater_than
less_than_or_equal_to
less_than
not_equal_to
notin
isin
df = pd.DataFrame(
{
"COLUMN1": [1, 4, 0, 10, 9],
"COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)
schema = DataFrameSchema(
{
"COLUMN1": Column(int8, Check.between(0, 10, element_wise=True)),
"COLUMN2": Column(
float,
[
Check.greater_than(-20.5),
Check.less_than(-1.0),
Check(lambda x: x < -1.2),
],
),
}
)
session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)
check_dataframe_schema(
sp_df,
schema,
skip_checks={"COLUMN1": [SKIP_ALL], "COLUMN2": ["greater_than", "less_than"]},
)
Verificações personalizadas¶
Você pode adicionar verificações adicionais ao esquema gerado a partir do arquivo JSON com a propriedade custom_checks
. Isso adicionará a verificação ao esquema pandera:
df = pd.DataFrame(
{
"COLUMN1": [1, 4, 0, 10, 9],
"COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)
session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)
# Those check will be added to the schema generate from the JSON file
result = validate_dataframe_checkpoint(
sp_df,
"checkpoint-name",
custom_checks={
"COLUMN1": [
Check(lambda x: x.shape[0] == 5),
Check(lambda x: x.shape[1] == 2),
],
"COLUMN2": [Check(lambda x: x.shape[0] == 5)],
},
)
Estratégias de amostragem¶
O processo de amostragem do código fornecido foi projetado para validar com eficiência grandes DataFrames, coletando uma amostra representativa dos dados. Essa abordagem ajuda a realizar a validação do esquema sem a necessidade de processar todo o conjunto de dados, o que pode ser computacionalmente caro e demorado.
- Parâmetros:
sample_frac
: esse parâmetro especifica a fração do DataFrame a ser amostrada. Por exemplo, sesample_frac
for definido como 0,1, então 10% das linhas de DataFrame serão amostradas. Isso é útil quando você deseja validar um subconjunto dos dados para economizar recursos computacionais.sample_number
: ssse parâmetro especifica o número exato de linhas para amostragem no DataFrame. Por exemplo, sesample_number
for definido como 100, então 100 linhas serão amostradas a partir de DataFrame. Isso é útil quando você deseja validar um número fixo de linhas, independentemente do tamanho do DataFrame.
Resultado da validação¶
Depois que qualquer tipo de validação for executado, o resultado, seja ele aprovado ou reprovado, será salvo em checkpoint_validation_results.json
. Esse arquivo é usado principalmente para as funcionalidades da extensão VSCode. Ele conterá informações sobre o status da validação, o carimbo de data/hora, o nome do ponto de verificação, o número da linha em que ocorre a execução da função e o arquivo.
Ele também registrará o resultado na conta padrão do Snowflake em uma tabela chamada SNOWPARK_CHECKPOINTS_REPORT, que conterá informações sobre o resultado da validação.
DATE
: carimbo de data/hora da execução da validação.JOB
: nome do SnowparkJobContext.STATUS
: status da validação.CHECKPOINT
: nome do ponto de verificação validado.MESSAGE
: mensagem de erro.DATA
: dados da execução da validação.EXECUTION_MODE
: modo de validação executado.
Variável de ambiente de ponto de verificação¶
O comportamento padrão da estrutura para localizar o arquivo checkpoints.json
é procurar uma variável de ambiente chamada SNOWFLAKE_CHECKPOINT_CONTRACT_FILE_PATH_ENV_VAR
. Essa variável conterá o caminho relativo do checkpoint.json
. Ele é atribuído pela extensão VSCode quando você executa o ponto de verificação com as lentes de código no código. Se a variável de ambiente não for atribuída, o framework tentará procurar o arquivo no diretório de trabalho atual.
Teste de unidade do Hypothesis¶
Hypothesis é uma biblioteca de testes avançada para Python, projetada para aprimorar os testes de unidade tradicionais, gerando automaticamente uma ampla variedade de dados de entrada. Ele usa testes baseados em propriedades, nos quais, em vez de especificar casos de teste individuais, você pode descrever o comportamento esperado do seu código com propriedades ou condições, e o Hypothesis gera exemplos para testar essas propriedades minuciosamente. Essa abordagem ajuda a descobrir casos extremos e comportamentos inesperados, o que a torna especialmente eficaz para funções complexas. Para obter mais informações, consulte Hypothesis.
O pacote snowpark-checkpoints-hypothesis
amplia a biblioteca Hypothesis para gerar Snowpark DataFrames sintéticos para fins de teste. Ao aproveitar a capacidade do Hypothesis de gerar dados de teste diversos e aleatórios, você pode criar o Snowpark DataFrames com esquemas e valores variados para simular cenários do mundo real e descobrir casos extremos, garantindo um código robusto e verificando a correção de transformações complexas.
A estratégia do Hypothesis para o Snowpark conta com o Pandera para gerar dados sintéticos. A função dataframe_strategy
usa o esquema especificado para gerar um Pandas DataFrame que esteja em conformidade com ele e, em seguida, converte-o em um Snowpark DataFrame.
Assinatura da função:
def dataframe_strategy(
schema: Union[str, DataFrameSchema],
session: Session,
size: Optional[int] = None
) -> SearchStrategy[DataFrame]
Parâmetros de função:
schema
: o esquema que define as colunas, os tipos de dados e as verificações que Snowpark dataframe gerado deve corresponder. O esquema pode ser:Um caminho para um arquivo de esquema JSON gerado pela função
collect_dataframe_checkpoint
do pacotesnowpark-checkpoints-collectors
.Uma instância de pandera.api.pandas.container.DataFrameSchema.
session
: uma instância de snowflake.snowpark.Session que será usada para criar Snowpark DataFrames.size
: número de linhas a serem geradas para cada Snowpark DataFrame. Se esse parâmetro não for fornecido, a estratégia gerará DataFrames de tamanhos diferentes.
Saída de função:
Retorna uma SearchStrategy do Hypothesis que gera o Snowpark DataFrames.
Tipos de dados suportados e não suportados¶
A função dataframe_strategy
oferece suporte à geração de Snowpark DataFrames com diferentes tipos de dados. Dependendo do tipo do argumento do esquema passado para a função, os tipos de dados suportados pela estratégia variam. Observe que, se a estratégia encontrar um tipo de dados não suportado, ela abrirá uma exceção.
A tabela a seguir mostra os tipos de dados PySpark suportados e não suportados pela função dataframe_strategy
ao passar um arquivo JSON como argumento schema
.
Tipo de dados PySpark |
Com suporte |
---|---|
Sim |
|
Sim |
|
Não |
|
Sim |
|
Não |
|
Não |
|
Não |
|
Não |
|
Sim |
|
Sim |
|
Não |
|
Sim |
|
Sim |
|
Não |
|
Não |
A tabela a seguir mostra os tipos de dados do Pandera suportados pela função dataframe_strategy
ao passar um objeto DataFrameSchema como argumento schema
e os tipos de dados do Snowpark para os quais eles são mapeados.
Tipo de dados Pandera |
Tipo de dados Snowpark |
---|---|
int8 |
|
int16 |
|
int32 |
|
int64 |
|
float32 |
|
float64 |
|
cadeia de caracteres |
|
bool |
|
datetime64[ns, tz] |
|
datetime64[ns] |
|
data |
Exemplos¶
O fluxo de trabalho típico para usar a biblioteca Hypothesis para gerar o Snowpark DataFrames é o seguinte:
Crie uma função de teste Python padrão com as diferentes asserções ou condições que seu código deve satisfazer para todas as entradas.
Adicione o decorador Hypothesis
@given
à sua função de teste e passe a funçãodataframe_strategy
como argumento. Para obter mais informações sobre o decorador@given
, consulte hypothesis.given.Execute a função de teste. Quando o teste for executado, o Hypothesis fornecerá automaticamente as entradas geradas como argumentos para o teste.
Exemplo 1: gerar Snowpark DataFrames a partir de um arquivo JSON
Veja abaixo um exemplo de como gerar Snowpark DataFrames a partir de um arquivo de esquema JSON gerado pela função collect_dataframe_checkpoint
do pacote snowpark-checkpoints-collectors
.
from hypothesis import given
from snowflake.hypothesis_snowpark import dataframe_strategy
from snowflake.snowpark import DataFrame, Session
@given(
df=dataframe_strategy(
schema="path/to/file.json",
session=Session.builder.getOrCreate(),
size=10,
)
)
def test_my_function_from_json_file(df: DataFrame):
# Test a particular function using the generated Snowpark DataFrame
...
Exemplo 2: gerar um Snowpark DataFrame a partir de um objeto Pandera DataFrameSchema
Abaixo está um exemplo de como gerar Snowpark DataFrames a partir de uma instância de um Pandera DataFrameSchema. Para obter mais informações, consulte Pandera DataFrameSchema.
import pandera as pa
from hypothesis import given
from snowflake.hypothesis_snowpark import dataframe_strategy
from snowflake.snowpark import DataFrame, Session
@given(
df=dataframe_strategy(
schema=pa.DataFrameSchema(
{
"boolean_column": pa.Column(bool),
"integer_column": pa.Column("int64", pa.Check.in_range(0, 9)),
"float_column": pa.Column(pa.Float32, pa.Check.in_range(10.5, 20.5)),
}
),
session=Session.builder.getOrCreate(),
size=10,
)
)
def test_my_function_from_dataframeschema_object(df: DataFrame):
# Test a particular function using the generated Snowpark DataFrame
...
Exemplo 3: personalizar o comportamento do Hypothesis
Você também pode personalizar o comportamento do seu teste com o decorador Hypothesis @settings
. Esse decorador permite que você personalize vários parâmetros de configuração para adequar o comportamento do teste às suas necessidades. Ao usar o decorador @settings
, você pode controlar aspectos como o número máximo de casos de teste, o prazo para a execução de cada teste, os níveis de detalhamento e muitos outros. Para obter mais informações, consulte Configurações do Hypothesis.
from datetime import timedelta
from hypothesis import given, settings
from snowflake.snowpark import DataFrame, Session
from snowflake.hypothesis_snowpark import dataframe_strategy
@given(
df=dataframe_strategy(
schema="path/to/file.json",
session=Session.builder.getOrCreate(),
)
)
@settings(
deadline=timedelta(milliseconds=800),
max_examples=25,
)
def test_my_function(df: DataFrame):
# Test a particular function using the generated Snowpark DataFrame
...
Como configurar um IDE para Snowpark Checkpoints¶
O Snowflake Extension for Visual Studio Code oferece suporte à biblioteca Snowpark Checkpoints para aprimorar a experiência de uso do framework. Ele oferece controle refinado sobre as instruções collect
e validate
inseridas no seu código, bem como analisa o status das afirmações de equivalência comportamental do seu código convertido.
Como habilitar Snowpark Checkpoints¶
Para habilitar Snowpark Checkpoints, acesse as configurações de extensão do Snowflake e marque Snowpark Checkpoints: Enabled.

Exibição¶
A definição da propriedade Snowpark Checkpoints como Enabled, conforme explicado anteriormente, abrirá uma nova guia na extensão chamada SNOWPARK CHECKPOINTS. Ele exibe todos os pontos de verificação no espaço de trabalho e permite a execução de várias ações, como ativar/desativar todos ou individualmente, limpar todos os arquivos e, ao clicar duas vezes em cada ponto de navegação, navega até o arquivo e a linha de código em que ele está definido.
Alternar todos os pontos de verificação¶
Localizada no canto superior direito da guia Snowpark Checkpoints, essa opção alterna a propriedade ativada em todos os pontos de verificação.

Pontos de verificação ativados:

A desativação de um ponto de verificação faz com que ele seja ignorado no tempo de execução.

Limpeza de todos os pontos de verificação¶
Localizado no canto superior direito da guia Snowpark Checkpoints. Isso remove os pontos de verificação de todos os arquivos Python, incluindo Jupyter notebooks, em seu espaço de trabalho, mas não os exclui do contrato e do painel. Isso significa que eles podem ser restaurados usando o comando Snowflake: Restore All Checkpoints
.

Inserção de pontos de verificação em um arquivo¶
Ao clicar com o botão direito do mouse em um arquivo, será exibido um menu de contexto que contém a opção Snowpark Checkpoints, que permite adicionar pontos de verificação Collection e Validation.
Opção de pontos de verificação do Snowpark no menu de contexto:

Coletor/Validador adicionado:

Execução de um único ponto de verificação¶
Um único ponto de verificação pode ser executado clicando na opção de lente de código mostrada acima de cada ponto de verificação. A execução abrirá um console de saída mostrando o progresso e, quando terminar, abrirá a exibição de resultados. Mesmo que o ponto de verificação esteja desativado no arquivo de contrato, ele será ativado apenas para sua execução.

Se um ponto de entrada não for declarado no arquivo de contrato, será exibida a mensagem de erro: Ponto de entrada não encontrado para o ponto de verificação.

Execução de todos os pontos de verificação do Snowpark ativados em um arquivo¶
No canto superior direito de cada arquivo, o botão Run all checkpoints from the current file estará presente.

Ao clicar nele, será exibido um canal de saída que mostra o progresso da execução.

Visualização da linha do tempo¶
Exibe uma linha do tempo dos resultados da execução dos pontos de verificação.

Comandos¶
Os seguintes comandos estão disponíveis para Snowpark Checkpoints. Para usá-los, digite Snowflake: [command name]
na paleta de comandos.
Comando |
Descrição |
---|---|
Snowflake: alternar pontos de verificação |
Alterna a propriedade habilitada de todos os pontos de verificação. |
Snowflake: inicialização do projeto Snowpark Checkpoints |
Aciona a inicialização do projeto, criando um arquivo de contrato se ele não existir. Caso ele exista, será exibida uma janela pop-up perguntando se você deseja carregar o ponto de verificação no arquivo de contrato. |
Snowflake: limpar todos os pontos de verificação |
Exclui todos os pontos de verificação de todos os arquivos no espaço de trabalho. |
Snowflake: restaurar todos os pontos de verificação |
Restaurar pontos de verificação excluídos anteriormente dos arquivos que ainda estão presentes no arquivo de contrato. |
Snowflake: adicionar ponto de verificação de validação/coleta |
Adiciona um validador ou coletor com seus parâmetros obrigatórios na posição do cursor. |
Snowflake: foco na visão do Snowpark Checkpoints |
Muda o foco para o painel SNOWPARK CHECKPOINTS. |
Snowflake: abrir linha do tempo de pontos de verificação |
Exibe uma linha do tempo das execuções dos pontos de verificação. |
Snowflake: executar todos os pontos de verificação do arquivo atual |
Executa todos os pontos de verificação habilitados no arquivo atual. |
Snowflake: executar todos os pontos de verificação no espaço de trabalho |
Executa todos os pontos de verificação habilitados do espaço de trabalho. |
Snowflake: mostrar o resultado de Snowpark Checkpoints |
Exibe uma guia com todos os resultados dos pontos de verificação. |
Avisos¶
Duplicado: em um projeto de coleção, se dois pontos de verificação forem atribuídos com o mesmo nome, um aviso: «Outro ponto de verificação com um nome idêntico foi detectado e será substituído.» Os projetos de validação podem ter vários pontos de verificação com o mesmo nome, e nenhum aviso será exibido.
Tipo errado: ao adicionar um ponto de verificação com um tipo diferente do tipo de projeto, você o sublinhará com a seguinte mensagem de erro: «Certifique-se de que você está usando a instrução correta do Snowpark-Checkpoints. Essa instrução de ponto de verificação específica é diferente das outras usadas neste projeto; as instruções que não corresponderem ao tipo de projeto serão ignoradas quando executadas.»
Nome do ponto de controle inválido: há maneiras inválidas de adicionar um parâmetro de nome de ponto de verificação. Se isso acontecer, será exibida uma mensagem de aviso: «Nome do ponto de verificação inválido. Os nomes dos pontos de verificação devem começar com uma letra e só podem conter letras, números, hífens e sublinhados».