Como escrever testes para Snowpark Python

Este tópico explica como testar seu código do Snowpark enquanto estiver conectado ao Snowflake. Você pode usar utilitários de teste padrão, como PyTest, para testar suas UDFs do Snowpark Python, transformações de DataFrame e procedimentos armazenados.

Neste tópico:

Testes completos podem ajudar a evitar alterações significativas não intencionais. Os testes de unidade verificam se uma seção do código funciona conforme o esperado. Os testes de integração ajudam a garantir que os componentes funcionem juntos corretamente para um caso de uso ponta a ponta.

Os exemplos neste documento usam PyTest, uma das estruturas de teste mais populares para Python. Para obter orientações adicionais e práticas recomendadas, consulte a documentação do PyTest.

Como alternativa, você pode usar a estrutura de testes locais do Snowpark Python para criar e operar em DataFrames do Snowpark Python localmente sem se conectar a uma conta Snowflake. Para obter mais informações, consulte Estrutura de teste local.

Configuração de seus testes

Instale PyTest em seu projeto executando pip install pytest ou conda install pytest. Você também pode adicioná-lo ao seu arquivo de ambiente requirements.txt ou conda.

Crie um diretório test próximo ao diretório do código-fonte e adicione seus testes de unidade e integração a ele. Para ver um exemplo, consulte o modelo de projeto Snowpark Python.

Criação de um acessório do PyTest para a sessão Snowpark

Os acessórios do PyTest são funções executadas antes de um teste (ou módulo de testes) para fornecer dados ou conexões para testes. Neste cenário, crie um acessório do PyTest que retorne um objeto Snowpark Session.

  1. Crie um diretório test se ainda não tiver um.

  2. Crie um conftest.py em test com o seguinte conteúdo, onde connection_parameters é um dicionário com as credenciais da sua conta Snowflake. Para obter mais informações sobre o formato do dicionário, consulte Como criar uma sessão.

  3. Crie o acessório Session como um acessório com escopo de módulo em vez de um acessório com escopo de arquivo para evitar que múltiplas sessões sejam criadas e causem problemas devido a objetos de sessão conflitantes.

from snowflake.snowpark.session import Session

@pytest.fixture(scope='module')
def session(request) -> Session:
    connection_parameters = {}
    return Session.builder.configs(...).create()
Copy

Testes de unidade para UDFs

Você pode testar sua lógica da UDF do Python testando o manipulador da UDF como um método Python genérico.

  1. Crie um arquivo no diretório test para os testes de unidade da UDF. Por exemplo, nomeie o arquivo test_functions.py.

  2. Importe os métodos Python para testar.

  3. Para cada cenário de teste, crie um método Python chamado test_<cenário_a_ser_testado>.

Por exemplo, aqui está um manipulador da UDF de Python:

def fahrenheit_to_celsius(temp_f: float) -> float:
    """
    Converts fahrenheit to celsius
    """
    return (float(temp_f) - 32) * (5/9)
Copy

Você pode importar este método para o arquivo de teste (test/test_functions.py) e testá-lo como um método Python genérico.

import farenheit_to_celsius

def test_farenheit_to_celsius():
    expected = 0.0
    actual = farenheit_to_celsius(32)
    assert expected == actual
Copy

Testes de unidade para transformações de DataFrame

Adicionar testes de unidade para suas transformações de DataFrame ajuda a proteger contra bugs e regressões inesperados. Para tornar sua lógica de DataFrame facilmente testável, encapsule as transformações em um método Python que recebe como entrada os DataFrames a serem transformados e retorna os DataFrames transformados.

No exemplo abaixo, mf_df_transformer contém a lógica de transformação. Ela pode ser importada para outros módulos do projeto Python e testado facilmente.

from snowflake.snowpark.dataframe import DataFrame, col

def my_df_tranformer(df: DataFrame) -> DataFrame:
    return df \
        .with_column('c', df['a']+df['b']) \
        .filter(col('c') > 3)
Copy

Para testar essa transformação, siga estas etapas:

  1. Crie um arquivo para os testes do DataFrame, test_transformers.py, no diretório test (test/test_transformers.py).

  2. Crie um método de teste para o transformador a ser testado: test_my_df_transformer(session). O parâmetro session aqui se refere ao acessório de sessão criado na seção anterior.

  3. Usando o acessório de sessão, crie os DataFrames de entrada e de saída esperada dentro do método de teste.

  4. Passe o DataFrame de entrada para o transformador e compare o DataFrame esperado com o DataFrame real retornado pelo transformador.

# test/test_transformers.py

import my_df_transformer

def test_my_df_transformer(session):
    input_df = session.create_dataframe([[1,2],[3,4]], ['a', 'b'])
    expected_df = session.create_dataframe([3,4,7], ['a','b','c'])
    actual_df = my_df_transformer(input_df)
    assert input_df.collect() == actual_df.collect()
Copy

Testes de integração para procedimentos armazenados

Para testar seus manipuladores de procedimento armazenado, use o acessório de sessão para chamar o manipulador de procedimento armazenado. Se seu procedimento armazenado lê tabelas, como em um pipeline ETL, você pode criar essas tabelas antes de chamar o manipulador do procedimento armazenado, conforme mostrado no exemplo abaixo. Esse padrão garante que seus dados de entrada sejam rastreados no controle de origem e não sejam alterados inesperadamente entre as execuções de teste.

from project import my_sproc_handler  # import stored proc handler

def test_my_sproc_handler(session: Session):

    # Create input table
    input_tbl = session.create_dataframe(
        data=[...],
        schema=[...],
    )

    input_tbl.write.mode('overwrite').save_as_table(['DB', 'SCHEMA', 'INPUT_TBL'], mode='overwrite')

    # Create expected output dataframe
    expected_df = session.create_dataframe(
        data=[...],
        schema=[...],
    ).collect()

    # Call the stored procedure
    my_sproc_handler()

    # Get actual table
    actual_tbl = session.table(['DB', 'SCHEMA', 'OUTPUT_TBL']).collect()

    # Clean up tables
    session.table(['DB', 'SCHEMA', 'OUTPUT_TBL']).delete()
    session.table(['DB', 'SCHEMA', 'INPUT_TBL']).delete()

    # Compare the actual and expected tables
    assert expected_df == actual_tbl
Copy