Tutorial: teste do Python Snowpark

Introdução

Este tutorial apresenta os princípios básicos para testar seu código Snowpark Python.

O que você aprenderá

Neste tutorial, você aprenderá como:

  • Teste seu código do Snowpark enquanto estiver conectado ao Snowflake.

    Você pode usar utilitários de teste padrão, como PyTest, para testar suas UDFs do Snowpark Python, transformações de DataFrame e procedimentos armazenados.

  • Teste seus DataFrames Snowpark Python localmente sem se conectar a uma conta Snowflake usando a estrutura de teste local.

    Você pode usar a estrutura de teste local para testar localmente, em sua máquina de desenvolvimento, antes de implantar alterações no código.

Pré-requisitos

Para usar a estrutura de teste local:

  • Você deve usar a versão 1.11.1 ou superior da biblioteca Snowpark Python.

  • As versões suportadas do Python são:

    • 3.8

    • 3.9

    • 3,10

    • 3,11

Configuração do projeto

Nesta seção, você clonará o repositório do projeto e configurará o ambiente necessário para o tutorial.

  1. Clone o repositório do projeto.

    git clone https://github.com/Snowflake-Labs/sftutorial-snowpark-testing
    
    Copy

    Caso você não tenha o git instalado, acesse a página do repositório e baixe o conteúdo clicando em Code » Download Contents.

  2. Defina variáveis de ambiente com as credenciais da sua conta. A Snowpark API as usará para autenticar sua conta Snowflake.

    # Linux/MacOS
    export SNOWSQL_ACCOUNT=<replace with your account identifer>
    export SNOWSQL_USER=<replace with your username>
    export SNOWSQL_ROLE=<replace with your role>
    export SNOWSQL_PWD=<replace with your password>
    export SNOWSQL_DATABASE=<replace with your database>
    export SNOWSQL_SCHEMA=<replace with your schema>
    export SNOWSQL_WAREHOUSE=<replace with your warehouse>
    
    Copy
    # Windows/PowerShell
    $env:SNOWSQL_ACCOUNT = "<replace with your account identifer>"
    $env:SNOWSQL_USER = "<replace with your username>"
    $env:SNOWSQL_ROLE = "<replace with your role>"
    $env:SNOWSQL_PWD = "<replace with your password>"
    $env:SNOWSQL_DATABASE = "<replace with your database>"
    $env:SNOWSQL_SCHEMA = "<replace with your schema>"
    $env:SNOWSQL_WAREHOUSE = "<replace with your warehouse>"
    
    Copy

    Opcional: você pode definir essa variável de ambiente permanentemente editando seu perfil bash (no Linux/MacOS) ou usando o menu System Properties (no Windows).

  3. Crie e ative um ambiente conda usando Anaconda:

    conda env create --file environment.yml
    conda activate snowpark-testing
    
    Copy
  4. Crie a tabela de amostra em sua conta executando setup/create_table.py. Este script Python criará um banco de dados chamado CITIBIKE, um esquema chamado PUBLIC e uma pequena tabela chamada TRIPS.

    python setup/create_table.py
    
    Copy

Agora você está pronto para passar para a próxima seção. Nesta seção você:

  • Clonou o repositório do tutorial.

  • Criou variáveis de ambiente com as informações da sua conta.

  • Criou um ambiente conda para o projeto.

  • Conectou-se ao Snowflake usando a Snowpark API e criou um banco de dados, esquema e tabela de amostra.

Como experimentar o procedimento armazenado

O projeto de amostra inclui um manipulador de procedimento armazenado (sproc.py) e três métodos transformadores de DataFrames (transformers.py). O manipulador de procedimento armazenado usa os transformadores de UDF e DataFrame para ler a tabela de origem, CITIBIKE.PUBLIC.TRIPS, e cria duas tabelas de fatos: MONTH_FACTS e BIKE_FACTS.

Você pode executar o procedimento armazenado na linha de comando executando este comando.

python project/sproc.py
Copy

Agora que você se familiarizou com o projeto, na próxima seção você configurará o diretório de teste e criará um acessório de PyTest para a sessão do Snowflake.

Crie um acessório do PyTest para a sessão Snowflake

Acessórios do PyTest são funções executadas antes de um teste (ou módulo de testes), normalmente para fornecer dados ou conexões para testes. Para este projeto, você criará um acessório do PyTest que retorna um objeto Snowpark Session. Seus casos de teste usarão esta sessão para se conectar ao Snowflake.

  1. Crie um diretório test no diretório raiz do projeto.

    mkdir test
    
    Copy
  2. No diretório test, crie um novo arquivo Python chamado conftest.py. Dentro de conftest.py, crie um acessório PyTest para o objeto Session:

    import pytest
    from project.utils import get_env_var_config
    from snowflake.snowpark.session import Session
    
    @pytest.fixture
    def session() -> Session:
        return Session.builder.configs(get_env_var_config()).create()
    
    Copy

Como adicionar testes de unidade para transformadores de DataFrame

  1. No diretório test, crie um novo arquivo Python chamado test_transformers.py.

  2. No arquivo test_transformers.py, importe os métodos do transformador.

    # test/test_transformers.py
    
    from project.transformers import add_rider_age, calc_bike_facts, calc_month_facts
    
    Copy
  3. A seguir, crie testes de unidade para esses transformadores. A convenção típica é criar um método para cada teste com o nome test_<nome do método>. No nosso caso, os testes serão:

    # test/test_transformers.py
    from project.transformers import add_rider_age, calc_bike_facts, calc_month_facts
    def test_add_rider_age(session):
        ...
    
    def test_calc_bike_facts(session):
        ...
    
    
    def test_calc_month_facts(session):
        ...
    
    Copy

    O parâmetro session em cada caso de teste refere-se ao acessório do PyTest que você criou na seção anterior.

  4. Agora implemente os casos de teste para cada transformador. Use o seguinte padrão.

    1. Crie um DataFrame de entrada.

    2. Crie o DataFrame de saída esperado.

    3. Passe o DataFrame de entrada da etapa 1 para o método do transformador.

    4. Compare a saída da etapa 3 com a saída esperada da etapa 2.

    # test/test_transformers.py
    from project.transformers import add_rider_age, calc_bike_facts, calc_month_facts
    from snowflake.snowpark.types import StructType, StructField, IntegerType, FloatType
    
    def test_add_rider_age(session: Session):
        input = session.create_dataframe(
            [
                [1980],
                [1995],
                [2000]
            ],
            schema=StructType([StructField("BIRTH_YEAR", IntegerType())])
        )
    
        expected = session.create_dataframe(
            [
                [1980, 43],
                [1995, 28],
                [2000, 23]
            ],
            schema=StructType([StructField("BIRTH_YEAR", IntegerType()), StructField("RIDER_AGE", IntegerType())])
        )
    
        actual = add_rider_age(input)
        assert expected.collect() == actual.collect()
    
    
    def test_calc_bike_facts(session: Session):
        input = session.create_dataframe([
                [1, 10, 20],
                [1, 5, 30],
                [2, 20, 50],
                [2, 10, 60]
            ],
            schema=StructType([
                StructField("BIKEID", IntegerType()),
                StructField("TRIPDURATION", IntegerType()),
                StructField("RIDER_AGE", IntegerType())
            ])
        )
    
        expected = session.create_dataframe([
                [1, 2, 7.5, 25.0],
                [2, 2, 15.0, 55.0],
            ],
            schema=StructType([
                StructField("BIKEID", IntegerType()),
                StructField("COUNT", IntegerType()),
                StructField("AVG_TRIPDURATION", FloatType()),
                StructField("AVG_RIDER_AGE", FloatType())
            ])
        )
    
        actual = calc_bike_facts(input)
        assert expected.collect() == actual.collect()
    
    
    def test_calc_month_facts(session: Session):
        from patches import patch_to_timestamp
    
        input = session.create_dataframe(
            data=[
                ['2018-03-01 09:47:00.000 +0000', 1, 10,  15],
                ['2018-03-01 09:47:14.000 +0000', 2, 20, 12],
                ['2018-04-01 09:47:04.000 +0000', 3, 6,  30]
            ],
            schema=['STARTTIME', 'BIKE_ID', 'TRIPDURATION', 'RIDER_AGE']
        )
    
        expected = session.create_dataframe(
            data=[
                ['Mar', 2, 15, 13.5],
                ['Apr', 1, 6, 30.0]
            ],
            schema=['MONTH', 'COUNT', 'AVG_TRIPDURATION', 'AVG_RIDER_AGE']
        )
    
        actual = calc_month_facts(input)
    
        assert expected.collect() == actual.collect()
    
    Copy
  5. Agora você pode executar PyTest para executar todos os testes de unidade.

    pytest test/test_transformers.py
    
    Copy

Como adicionar testes de integração para procedimentos armazenados

Agora que temos testes de unidade para os métodos do transformador do DataFrame, vamos adicionar um teste de integração para o procedimento armazenado. O caso de teste seguirá este padrão:

  1. Crie uma tabela representando os dados de entrada do procedimento armazenado.

  2. Crie dois DataFrames com o conteúdo esperado das duas tabelas de saída do procedimento armazenado.

  3. Chame o procedimento armazenado.

  4. Compare as tabelas de saída reais com DataFrames da etapa 2.

  5. Limpeza: exclua a tabela de entrada da etapa 1 e as tabelas de saída da etapa 3.

Crie um arquivo Python chamado test_sproc.py no diretório test.

Importe o manipulador do procedimento armazenado do diretório do projeto e crie um caso de teste.

# test/test_sproc.py
from project.sproc import create_fact_tables

def test_create_fact_tables(session):
    ...
Copy

Implemente o caso de teste, começando pela criação da tabela de entrada.

# test/test_sproc.py
from project.sproc import create_fact_tables
from snowflake.snowpark.types import *

def test_create_fact_tables(session):
    DB = 'CITIBIKE'
    SCHEMA = 'TEST'

    # Set up source table
    tbl = session.create_dataframe(
        data=[
            [1983, '2018-03-01 09:47:00.000 +0000', 551, 30958],
            [1988, '2018-03-01 09:47:01.000 +0000', 242, 19278],
            [1992, '2018-03-01 09:47:01.000 +0000', 768, 18461],
            [1980, '2018-03-01 09:47:03.000 +0000', 690, 15533],
            [1991, '2018-03-01 09:47:03.000 +0000', 490, 32449],
            [1959, '2018-03-01 09:47:04.000 +0000', 457, 29411],
            [1971, '2018-03-01 09:47:08.000 +0000', 279, 28015],
            [1964, '2018-03-01 09:47:09.000 +0000', 546, 15148],
            [1983, '2018-03-01 09:47:11.000 +0000', 358, 16967],
            [1985, '2018-03-01 09:47:12.000 +0000', 848, 20644],
            [1984, '2018-03-01 09:47:14.000 +0000', 295, 16365]
        ],
        schema=['BIRTH_YEAR', 'STARTTIME', 'TRIPDURATION',    'BIKEID'],
    )

    tbl.write.mode('overwrite').save_as_table([DB, SCHEMA, 'TRIPS_TEST'], mode='overwrite')
Copy

Em seguida, crie DataFrames para as tabelas de saída esperadas.

# test/test_sproc.py
from project.sproc import create_fact_tables
from snowflake.snowpark.types import *

def test_create_fact_tables(session):
    DB = 'CITIBIKE'
    SCHEMA = 'TEST'

    # Set up source table
    tbl = session.create_dataframe(
        data=[
            [1983, '2018-03-01 09:47:00.000 +0000', 551, 30958],
            [1988, '2018-03-01 09:47:01.000 +0000', 242, 19278],
            [1992, '2018-03-01 09:47:01.000 +0000', 768, 18461],
            [1980, '2018-03-01 09:47:03.000 +0000', 690, 15533],
            [1991, '2018-03-01 09:47:03.000 +0000', 490, 32449],
            [1959, '2018-03-01 09:47:04.000 +0000', 457, 29411],
            [1971, '2018-03-01 09:47:08.000 +0000', 279, 28015],
            [1964, '2018-03-01 09:47:09.000 +0000', 546, 15148],
            [1983, '2018-03-01 09:47:11.000 +0000', 358, 16967],
            [1985, '2018-03-01 09:47:12.000 +0000', 848, 20644],
            [1984, '2018-03-01 09:47:14.000 +0000', 295, 16365]
        ],
        schema=['BIRTH_YEAR', 'STARTTIME', 'TRIPDURATION',    'BIKEID'],
    )

    tbl.write.mode('overwrite').save_as_table([DB, SCHEMA, 'TRIPS_TEST'], mode='overwrite')

    # Expected values
    n_rows_expected = 12
    bike_facts_expected = session.create_dataframe(
        data=[
            [30958, 1, 551.0, 40.0],
            [19278, 1, 242.0, 35.0],
            [18461, 1, 768.0, 31.0],
            [15533, 1, 690.0, 43.0],
            [32449, 1, 490.0, 32.0],
            [29411, 1, 457.0, 64.0],
            [28015, 1, 279.0, 52.0],
            [15148, 1, 546.0, 59.0],
            [16967, 1, 358.0, 40.0],
            [20644, 1, 848.0, 38.0],
            [16365, 1, 295.0, 39.0]
        ],
        schema=StructType([
            StructField("BIKEID", IntegerType()),
            StructField("COUNT", IntegerType()),
            StructField("AVG_TRIPDURATION", FloatType()),
            StructField("AVG_RIDER_AGE", FloatType())
        ])
    ).collect()

    month_facts_expected = session.create_dataframe(
        data=[['Mar', 11, 502.18182, 43.00000]],
        schema=StructType([
            StructField("MONTH", StringType()),
            StructField("COUNT", IntegerType()),
            StructField("AVG_TRIPDURATION", DecimalType()),
            StructField("AVG_RIDER_AGE", DecimalType())
        ])
    ).collect()
Copy

E, finalmente, chame o procedimento armazenado e leia as tabelas de saída. Compare as tabelas reais com o conteúdo do DataFrame.

# test/test_sproc.py
from project.sproc import create_fact_tables
from snowflake.snowpark.types import *

def test_create_fact_tables(session):
    DB = 'CITIBIKE'
    SCHEMA = 'TEST'

    # Set up source table
    tbl = session.create_dataframe(
        data=[
            [1983, '2018-03-01 09:47:00.000 +0000', 551, 30958],
            [1988, '2018-03-01 09:47:01.000 +0000', 242, 19278],
            [1992, '2018-03-01 09:47:01.000 +0000', 768, 18461],
            [1980, '2018-03-01 09:47:03.000 +0000', 690, 15533],
            [1991, '2018-03-01 09:47:03.000 +0000', 490, 32449],
            [1959, '2018-03-01 09:47:04.000 +0000', 457, 29411],
            [1971, '2018-03-01 09:47:08.000 +0000', 279, 28015],
            [1964, '2018-03-01 09:47:09.000 +0000', 546, 15148],
            [1983, '2018-03-01 09:47:11.000 +0000', 358, 16967],
            [1985, '2018-03-01 09:47:12.000 +0000', 848, 20644],
            [1984, '2018-03-01 09:47:14.000 +0000', 295, 16365]
        ],
        schema=['BIRTH_YEAR', 'STARTTIME', 'TRIPDURATION',    'BIKEID'],
    )

    tbl.write.mode('overwrite').save_as_table([DB, SCHEMA, 'TRIPS_TEST'], mode='overwrite')

    # Expected values
    n_rows_expected = 12
    bike_facts_expected = session.create_dataframe(
        data=[
            [30958, 1, 551.0, 40.0],
            [19278, 1, 242.0, 35.0],
            [18461, 1, 768.0, 31.0],
            [15533, 1, 690.0, 43.0],
            [32449, 1, 490.0, 32.0],
            [29411, 1, 457.0, 64.0],
            [28015, 1, 279.0, 52.0],
            [15148, 1, 546.0, 59.0],
            [16967, 1, 358.0, 40.0],
            [20644, 1, 848.0, 38.0],
            [16365, 1, 295.0, 39.0]
        ],
        schema=StructType([
            StructField("BIKEID", IntegerType()),
            StructField("COUNT", IntegerType()),
            StructField("AVG_TRIPDURATION", FloatType()),
            StructField("AVG_RIDER_AGE", FloatType())
        ])
    ).collect()

    month_facts_expected = session.create_dataframe(
        data=[['Mar', 11, 502.18182, 43.00000]],
        schema=StructType([
            StructField("MONTH", StringType()),
            StructField("COUNT", IntegerType()),
            StructField("AVG_TRIPDURATION", DecimalType()),
            StructField("AVG_RIDER_AGE", DecimalType())
        ])
    ).collect()

    # Call sproc, get actual values
    n_rows_actual = create_fact_tables(session, 'TRIPS_TEST')
    bike_facts_actual = session.table([DB, SCHEMA, 'bike_facts']).collect()
    month_facts_actual = session.table([DB, SCHEMA, 'month_facts']).collect()

    # Comparisons
    assert n_rows_expected == n_rows_actual
    assert bike_facts_expected == bike_facts_actual
    assert month_facts_expected ==  month_facts_actual
Copy

Para executar o caso de teste, execute pytest no terminal.

pytest test/test_sproc.py
Copy

Para executar todos os testes do projeto, execute pytest sem outras opções.

pytest
Copy

Configuração de testes locais

Neste ponto você tem um conjunto de testes PyTest para seus procedimentos armazenados e transformadores de DataFrame. Em cada caso de teste, o acessório Session é usado para conectar-se à sua conta Snowflake, enviar o SQL da Snowpark Python API e recuperar a resposta.

Alternativamente, você pode usar a estrutura de teste local para executar as transformações localmente sem uma conexão com o Snowflake. Em grandes conjuntos de testes, isso pode resultar em uma execução de testes significativamente mais rápida. Esta seção mostra como atualizar o conjunto de testes para usar a funcionalidade da estrutura de teste local.

  1. Comece atualizando o acessório do PyTest Session. Adicionaremos uma opção de linha de comando ao PyTest para alternar entre os modos de teste local e ao vivo.

    # test/conftest.py
    
    import pytest
    from project.utils import get_env_var_config
    from snowflake.snowpark.session import Session
    
    def pytest_addoption(parser):
        parser.addoption("--snowflake-session", action="store", default="live")
    
    @pytest.fixture(scope='module')
    def session(request) -> Session:
        if request.config.getoption('--snowflake-session') == 'local':
            return Session.builder.configs({'local_testing': True}).create()
        else:
            return Session.builder.configs(get_env_var_config()).create()
    
    Copy
  2. Devemos primeiro corrigir esse método porque nem todas as funções internas são suportadas pela estrutura de teste local, por exemplo, a função monthname() usada no transformador calc_month_facts(). Crie um arquivo chamado patches.py no diretório de testes. Neste arquivo, cole o seguinte código.

    from snowflake.snowpark.mock.functions import patch
    from snowflake.snowpark.functions import monthname
    from snowflake.snowpark.mock.snowflake_data_type import ColumnEmulator, ColumnType
    from snowflake.snowpark.types import StringType
    import datetime
    import calendar
    
    @patch(monthname)
    def patch_monthname(column: ColumnEmulator) -> ColumnEmulator:
        ret_column = ColumnEmulator(data=[
            calendar.month_abbr[datetime.datetime.strptime(row, '%Y-%m-%d %H:%M:%S.%f %z').month]
            for row in column])
        ret_column.sf_type = ColumnType(StringType(), True)
        return ret_column
    
    Copy

    O patch acima aceita um único parâmetro, column, que é um objeto semelhante a pandas.Series contendo as linhas de dados dentro da coluna. Em seguida, usamos uma combinação de métodos dos módulos Python datetime e calendar para emular a funcionalidade da coluna monthname() integrada. Por fim, definimos o tipo de retorno como String, pois o método integrado retorna cadeias de caracteres correspondentes aos meses (“Jan”, “Fev”, “Mar” etc.).

  3. Em seguida, importe esse método para os testes do procedimento armazenado e transformador do DataFrame.

    # test/test_transformers.py
    
    # No changes to the other unit test methods
    
    def test_calc_month_facts(request, session):
        # Add conditional to include the patch if local testing is being used
        if request.config.getoption('--snowflake-session') == 'local':
            from patches import patch_monthname
    
        # No other changes
    
    Copy
  4. Execute novamente pytest com o sinalizador local.

    pytest test/test_transformers.py --snowflake-session local
    
    Copy
  5. Agora aplique o mesmo patch ao teste do procedimento armazenado.

    #test/test_sproc.py
    
    def test_create_fact_tables(request, session):
        # Add conditional to include the patch if local testing is being used
        if request.config.getoption('--snowflake-session') == 'local':
            from patches import patch_monthname
    
        # No other changes required
    
    Copy
  6. Execute novamente o pytest com o sinalizador local.

    pytest test/test_sproc.py --snowflake-session local
    
    Copy
  7. Para finalizar, vamos comparar o tempo necessário para executar o conjunto de testes completo localmente com uma conexão ativa. Usaremos o comando time para medir o tempo necessário para ambos os comandos. Vamos começar com a conexão ativa.

    time pytest
    
    Copy

    Nesse caso, o conjunto de testes levou 7.89 segundos para ser executado. (A hora exata pode variar dependendo do computador, da conexão de rede e de outros fatores.)

    =================================== test session starts ==========================
    platform darwin -- Python 3.9.18, pytest-7.4.3, pluggy-1.3.0
    rootdir: /Users/jfreeberg/Desktop/snowpark-testing-tutorial
    configfile: pytest.ini
    collected 4 items
    
    test/test_sproc.py .                                                             [ 25%]
    test/test_transformers.py ...                                                    [100%]
    
    =================================== 4 passed in 6.86s =================================
    pytest  1.63s user 1.86s system 44% cpu 7.893 total
    

    Agora vamos tentar com a estrutura de teste local:

    time pytest --snowflake-session local
    
    Copy

    Com a estrutura de testes local e o conjunto de testes, a execução levou apenas 1 segundo!

    ================================== test session starts ================================
    platform darwin -- Python 3.9.18, pytest-7.4.3, pluggy-1.3.0
    rootdir: /Users/jfreeberg/Desktop/snowpark-testing-tutorial
    configfile: pytest.ini
    collected 4 items
    
    test/test_sproc.py .                                                             [ 25%]
    test/test_transformers.py ...                                                    [100%]
    
    =================================== 4 passed in 0.10s ==================================
    pytest --snowflake-session local  1.37s user 1.70s system 281% cpu 1.093 total
    

Saiba mais

Você terminou! Muito bem.

Neste tutorial, você teve uma visão completa de como testar seu código Python Snowpark. Ao longo do caminho, você: