Tutorial: teste do Python Snowpark¶
Introdução¶
Este tutorial apresenta os princípios básicos para testar seu código Snowpark Python.
O que você aprenderá¶
Neste tutorial, você aprenderá como:
Teste seu código do Snowpark enquanto estiver conectado ao Snowflake.
Você pode usar utilitários de teste padrão, como PyTest, para testar suas UDFs do Snowpark Python, transformações de DataFrame e procedimentos armazenados.
Teste seus DataFrames Snowpark Python localmente sem se conectar a uma conta Snowflake usando a estrutura de teste local.
Você pode usar a estrutura de teste local para testar localmente, em sua máquina de desenvolvimento, antes de implantar alterações no código.
Pré-requisitos¶
Para usar a estrutura de teste local:
Você deve usar a versão 1.11.1 ou superior da biblioteca Snowpark Python.
As versões suportadas do Python são:
3.8
3.9
3,10
3,11
Configuração do projeto¶
Nesta seção, você clonará o repositório do projeto e configurará o ambiente necessário para o tutorial.
Clone o repositório do projeto.
git clone https://github.com/Snowflake-Labs/sftutorial-snowpark-testing
Caso você não tenha o git instalado, acesse a página do repositório e baixe o conteúdo clicando em Code » Download Contents.
Defina variáveis de ambiente com as credenciais da sua conta. A Snowpark API as usará para autenticar sua conta Snowflake.
# Linux/MacOS export SNOWSQL_ACCOUNT=<replace with your account identifer> export SNOWSQL_USER=<replace with your username> export SNOWSQL_ROLE=<replace with your role> export SNOWSQL_PWD=<replace with your password> export SNOWSQL_DATABASE=<replace with your database> export SNOWSQL_SCHEMA=<replace with your schema> export SNOWSQL_WAREHOUSE=<replace with your warehouse>
# Windows/PowerShell $env:SNOWSQL_ACCOUNT = "<replace with your account identifer>" $env:SNOWSQL_USER = "<replace with your username>" $env:SNOWSQL_ROLE = "<replace with your role>" $env:SNOWSQL_PWD = "<replace with your password>" $env:SNOWSQL_DATABASE = "<replace with your database>" $env:SNOWSQL_SCHEMA = "<replace with your schema>" $env:SNOWSQL_WAREHOUSE = "<replace with your warehouse>"
Opcional: você pode definir essa variável de ambiente permanentemente editando seu perfil bash (no Linux/MacOS) ou usando o menu System Properties (no Windows).
Crie e ative um ambiente conda usando Anaconda:
conda env create --file environment.yml conda activate snowpark-testing
Crie a tabela de amostra em sua conta executando
setup/create_table.py
. Este script Python criará um banco de dados chamado CITIBIKE, um esquema chamado PUBLIC e uma pequena tabela chamada TRIPS.python setup/create_table.py
Agora você está pronto para passar para a próxima seção. Nesta seção você:
Clonou o repositório do tutorial.
Criou variáveis de ambiente com as informações da sua conta.
Criou um ambiente conda para o projeto.
Conectou-se ao Snowflake usando a Snowpark API e criou um banco de dados, esquema e tabela de amostra.
Como experimentar o procedimento armazenado¶
O projeto de amostra inclui um manipulador de procedimento armazenado (sproc.py
) e três métodos transformadores de DataFrames (transformers.py
). O manipulador de procedimento armazenado usa os transformadores de UDF e DataFrame para ler a tabela de origem, CITIBIKE.PUBLIC.TRIPS
, e cria duas tabelas de fatos: MONTH_FACTS
e BIKE_FACTS
.
Você pode executar o procedimento armazenado na linha de comando executando este comando.
python project/sproc.py
Agora que você se familiarizou com o projeto, na próxima seção você configurará o diretório de teste e criará um acessório de PyTest para a sessão do Snowflake.
Crie um acessório do PyTest para a sessão Snowflake¶
Acessórios do PyTest são funções executadas antes de um teste (ou módulo de testes), normalmente para fornecer dados ou conexões para testes. Para este projeto, você criará um acessório do PyTest que retorna um objeto Snowpark Session
. Seus casos de teste usarão esta sessão para se conectar ao Snowflake.
Crie um diretório
test
no diretório raiz do projeto.mkdir test
No diretório
test
, crie um novo arquivo Python chamadoconftest.py
. Dentro deconftest.py
, crie um acessório PyTest para o objetoSession
:import pytest from project.utils import get_env_var_config from snowflake.snowpark.session import Session @pytest.fixture def session() -> Session: return Session.builder.configs(get_env_var_config()).create()
Como adicionar testes de unidade para transformadores de DataFrame¶
No diretório
test
, crie um novo arquivo Python chamadotest_transformers.py
.No arquivo
test_transformers.py
, importe os métodos do transformador.# test/test_transformers.py from project.transformers import add_rider_age, calc_bike_facts, calc_month_facts
A seguir, crie testes de unidade para esses transformadores. A convenção típica é criar um método para cada teste com o nome
test_<nome do método>
. No nosso caso, os testes serão:# test/test_transformers.py from project.transformers import add_rider_age, calc_bike_facts, calc_month_facts def test_add_rider_age(session): ... def test_calc_bike_facts(session): ... def test_calc_month_facts(session): ...
O parâmetro
session
em cada caso de teste refere-se ao acessório do PyTest que você criou na seção anterior.Agora implemente os casos de teste para cada transformador. Use o seguinte padrão.
Crie um DataFrame de entrada.
Crie o DataFrame de saída esperado.
Passe o DataFrame de entrada da etapa 1 para o método do transformador.
Compare a saída da etapa 3 com a saída esperada da etapa 2.
# test/test_transformers.py from project.transformers import add_rider_age, calc_bike_facts, calc_month_facts from snowflake.snowpark.types import StructType, StructField, IntegerType, FloatType def test_add_rider_age(session: Session): input = session.create_dataframe( [ [1980], [1995], [2000] ], schema=StructType([StructField("BIRTH_YEAR", IntegerType())]) ) expected = session.create_dataframe( [ [1980, 43], [1995, 28], [2000, 23] ], schema=StructType([StructField("BIRTH_YEAR", IntegerType()), StructField("RIDER_AGE", IntegerType())]) ) actual = add_rider_age(input) assert expected.collect() == actual.collect() def test_calc_bike_facts(session: Session): input = session.create_dataframe([ [1, 10, 20], [1, 5, 30], [2, 20, 50], [2, 10, 60] ], schema=StructType([ StructField("BIKEID", IntegerType()), StructField("TRIPDURATION", IntegerType()), StructField("RIDER_AGE", IntegerType()) ]) ) expected = session.create_dataframe([ [1, 2, 7.5, 25.0], [2, 2, 15.0, 55.0], ], schema=StructType([ StructField("BIKEID", IntegerType()), StructField("COUNT", IntegerType()), StructField("AVG_TRIPDURATION", FloatType()), StructField("AVG_RIDER_AGE", FloatType()) ]) ) actual = calc_bike_facts(input) assert expected.collect() == actual.collect() def test_calc_month_facts(session: Session): from patches import patch_to_timestamp input = session.create_dataframe( data=[ ['2018-03-01 09:47:00.000 +0000', 1, 10, 15], ['2018-03-01 09:47:14.000 +0000', 2, 20, 12], ['2018-04-01 09:47:04.000 +0000', 3, 6, 30] ], schema=['STARTTIME', 'BIKE_ID', 'TRIPDURATION', 'RIDER_AGE'] ) expected = session.create_dataframe( data=[ ['Mar', 2, 15, 13.5], ['Apr', 1, 6, 30.0] ], schema=['MONTH', 'COUNT', 'AVG_TRIPDURATION', 'AVG_RIDER_AGE'] ) actual = calc_month_facts(input) assert expected.collect() == actual.collect()
Agora você pode executar PyTest para executar todos os testes de unidade.
pytest test/test_transformers.py
Como adicionar testes de integração para procedimentos armazenados¶
Agora que temos testes de unidade para os métodos do transformador do DataFrame, vamos adicionar um teste de integração para o procedimento armazenado. O caso de teste seguirá este padrão:
Crie uma tabela representando os dados de entrada do procedimento armazenado.
Crie dois DataFrames com o conteúdo esperado das duas tabelas de saída do procedimento armazenado.
Chame o procedimento armazenado.
Compare as tabelas de saída reais com DataFrames da etapa 2.
Limpeza: exclua a tabela de entrada da etapa 1 e as tabelas de saída da etapa 3.
Crie um arquivo Python chamado test_sproc.py
no diretório test
.
Importe o manipulador do procedimento armazenado do diretório do projeto e crie um caso de teste.
# test/test_sproc.py
from project.sproc import create_fact_tables
def test_create_fact_tables(session):
...
Implemente o caso de teste, começando pela criação da tabela de entrada.
# test/test_sproc.py
from project.sproc import create_fact_tables
from snowflake.snowpark.types import *
def test_create_fact_tables(session):
DB = 'CITIBIKE'
SCHEMA = 'TEST'
# Set up source table
tbl = session.create_dataframe(
data=[
[1983, '2018-03-01 09:47:00.000 +0000', 551, 30958],
[1988, '2018-03-01 09:47:01.000 +0000', 242, 19278],
[1992, '2018-03-01 09:47:01.000 +0000', 768, 18461],
[1980, '2018-03-01 09:47:03.000 +0000', 690, 15533],
[1991, '2018-03-01 09:47:03.000 +0000', 490, 32449],
[1959, '2018-03-01 09:47:04.000 +0000', 457, 29411],
[1971, '2018-03-01 09:47:08.000 +0000', 279, 28015],
[1964, '2018-03-01 09:47:09.000 +0000', 546, 15148],
[1983, '2018-03-01 09:47:11.000 +0000', 358, 16967],
[1985, '2018-03-01 09:47:12.000 +0000', 848, 20644],
[1984, '2018-03-01 09:47:14.000 +0000', 295, 16365]
],
schema=['BIRTH_YEAR', 'STARTTIME', 'TRIPDURATION', 'BIKEID'],
)
tbl.write.mode('overwrite').save_as_table([DB, SCHEMA, 'TRIPS_TEST'], mode='overwrite')
Em seguida, crie DataFrames para as tabelas de saída esperadas.
# test/test_sproc.py
from project.sproc import create_fact_tables
from snowflake.snowpark.types import *
def test_create_fact_tables(session):
DB = 'CITIBIKE'
SCHEMA = 'TEST'
# Set up source table
tbl = session.create_dataframe(
data=[
[1983, '2018-03-01 09:47:00.000 +0000', 551, 30958],
[1988, '2018-03-01 09:47:01.000 +0000', 242, 19278],
[1992, '2018-03-01 09:47:01.000 +0000', 768, 18461],
[1980, '2018-03-01 09:47:03.000 +0000', 690, 15533],
[1991, '2018-03-01 09:47:03.000 +0000', 490, 32449],
[1959, '2018-03-01 09:47:04.000 +0000', 457, 29411],
[1971, '2018-03-01 09:47:08.000 +0000', 279, 28015],
[1964, '2018-03-01 09:47:09.000 +0000', 546, 15148],
[1983, '2018-03-01 09:47:11.000 +0000', 358, 16967],
[1985, '2018-03-01 09:47:12.000 +0000', 848, 20644],
[1984, '2018-03-01 09:47:14.000 +0000', 295, 16365]
],
schema=['BIRTH_YEAR', 'STARTTIME', 'TRIPDURATION', 'BIKEID'],
)
tbl.write.mode('overwrite').save_as_table([DB, SCHEMA, 'TRIPS_TEST'], mode='overwrite')
# Expected values
n_rows_expected = 12
bike_facts_expected = session.create_dataframe(
data=[
[30958, 1, 551.0, 40.0],
[19278, 1, 242.0, 35.0],
[18461, 1, 768.0, 31.0],
[15533, 1, 690.0, 43.0],
[32449, 1, 490.0, 32.0],
[29411, 1, 457.0, 64.0],
[28015, 1, 279.0, 52.0],
[15148, 1, 546.0, 59.0],
[16967, 1, 358.0, 40.0],
[20644, 1, 848.0, 38.0],
[16365, 1, 295.0, 39.0]
],
schema=StructType([
StructField("BIKEID", IntegerType()),
StructField("COUNT", IntegerType()),
StructField("AVG_TRIPDURATION", FloatType()),
StructField("AVG_RIDER_AGE", FloatType())
])
).collect()
month_facts_expected = session.create_dataframe(
data=[['Mar', 11, 502.18182, 43.00000]],
schema=StructType([
StructField("MONTH", StringType()),
StructField("COUNT", IntegerType()),
StructField("AVG_TRIPDURATION", DecimalType()),
StructField("AVG_RIDER_AGE", DecimalType())
])
).collect()
E, finalmente, chame o procedimento armazenado e leia as tabelas de saída. Compare as tabelas reais com o conteúdo do DataFrame.
# test/test_sproc.py
from project.sproc import create_fact_tables
from snowflake.snowpark.types import *
def test_create_fact_tables(session):
DB = 'CITIBIKE'
SCHEMA = 'TEST'
# Set up source table
tbl = session.create_dataframe(
data=[
[1983, '2018-03-01 09:47:00.000 +0000', 551, 30958],
[1988, '2018-03-01 09:47:01.000 +0000', 242, 19278],
[1992, '2018-03-01 09:47:01.000 +0000', 768, 18461],
[1980, '2018-03-01 09:47:03.000 +0000', 690, 15533],
[1991, '2018-03-01 09:47:03.000 +0000', 490, 32449],
[1959, '2018-03-01 09:47:04.000 +0000', 457, 29411],
[1971, '2018-03-01 09:47:08.000 +0000', 279, 28015],
[1964, '2018-03-01 09:47:09.000 +0000', 546, 15148],
[1983, '2018-03-01 09:47:11.000 +0000', 358, 16967],
[1985, '2018-03-01 09:47:12.000 +0000', 848, 20644],
[1984, '2018-03-01 09:47:14.000 +0000', 295, 16365]
],
schema=['BIRTH_YEAR', 'STARTTIME', 'TRIPDURATION', 'BIKEID'],
)
tbl.write.mode('overwrite').save_as_table([DB, SCHEMA, 'TRIPS_TEST'], mode='overwrite')
# Expected values
n_rows_expected = 12
bike_facts_expected = session.create_dataframe(
data=[
[30958, 1, 551.0, 40.0],
[19278, 1, 242.0, 35.0],
[18461, 1, 768.0, 31.0],
[15533, 1, 690.0, 43.0],
[32449, 1, 490.0, 32.0],
[29411, 1, 457.0, 64.0],
[28015, 1, 279.0, 52.0],
[15148, 1, 546.0, 59.0],
[16967, 1, 358.0, 40.0],
[20644, 1, 848.0, 38.0],
[16365, 1, 295.0, 39.0]
],
schema=StructType([
StructField("BIKEID", IntegerType()),
StructField("COUNT", IntegerType()),
StructField("AVG_TRIPDURATION", FloatType()),
StructField("AVG_RIDER_AGE", FloatType())
])
).collect()
month_facts_expected = session.create_dataframe(
data=[['Mar', 11, 502.18182, 43.00000]],
schema=StructType([
StructField("MONTH", StringType()),
StructField("COUNT", IntegerType()),
StructField("AVG_TRIPDURATION", DecimalType()),
StructField("AVG_RIDER_AGE", DecimalType())
])
).collect()
# Call sproc, get actual values
n_rows_actual = create_fact_tables(session, 'TRIPS_TEST')
bike_facts_actual = session.table([DB, SCHEMA, 'bike_facts']).collect()
month_facts_actual = session.table([DB, SCHEMA, 'month_facts']).collect()
# Comparisons
assert n_rows_expected == n_rows_actual
assert bike_facts_expected == bike_facts_actual
assert month_facts_expected == month_facts_actual
Para executar o caso de teste, execute pytest
no terminal.
pytest test/test_sproc.py
Para executar todos os testes do projeto, execute pytest
sem outras opções.
pytest
Configuração de testes locais¶
Neste ponto você tem um conjunto de testes PyTest para seus procedimentos armazenados e transformadores de DataFrame. Em cada caso de teste, o acessório Session
é usado para conectar-se à sua conta Snowflake, enviar o SQL da Snowpark Python API e recuperar a resposta.
Alternativamente, você pode usar a estrutura de teste local para executar as transformações localmente sem uma conexão com o Snowflake. Em grandes conjuntos de testes, isso pode resultar em uma execução de testes significativamente mais rápida. Esta seção mostra como atualizar o conjunto de testes para usar a funcionalidade da estrutura de teste local.
Comece atualizando o acessório do PyTest
Session
. Adicionaremos uma opção de linha de comando ao PyTest para alternar entre os modos de teste local e ao vivo.# test/conftest.py import pytest from project.utils import get_env_var_config from snowflake.snowpark.session import Session def pytest_addoption(parser): parser.addoption("--snowflake-session", action="store", default="live") @pytest.fixture(scope='module') def session(request) -> Session: if request.config.getoption('--snowflake-session') == 'local': return Session.builder.configs({'local_testing': True}).create() else: return Session.builder.configs(get_env_var_config()).create()
Devemos primeiro corrigir esse método porque nem todas as funções internas são suportadas pela estrutura de teste local, por exemplo, a função
monthname()
usada no transformadorcalc_month_facts()
. Crie um arquivo chamadopatches.py
no diretório de testes. Neste arquivo, cole o seguinte código.from snowflake.snowpark.mock.functions import patch from snowflake.snowpark.functions import monthname from snowflake.snowpark.mock.snowflake_data_type import ColumnEmulator, ColumnType from snowflake.snowpark.types import StringType import datetime import calendar @patch(monthname) def patch_monthname(column: ColumnEmulator) -> ColumnEmulator: ret_column = ColumnEmulator(data=[ calendar.month_abbr[datetime.datetime.strptime(row, '%Y-%m-%d %H:%M:%S.%f %z').month] for row in column]) ret_column.sf_type = ColumnType(StringType(), True) return ret_column
O patch acima aceita um único parâmetro,
column
, que é um objeto semelhante apandas.Series
contendo as linhas de dados dentro da coluna. Em seguida, usamos uma combinação de métodos dos módulos Pythondatetime
ecalendar
para emular a funcionalidade da colunamonthname()
integrada. Por fim, definimos o tipo de retorno comoString
, pois o método integrado retorna cadeias de caracteres correspondentes aos meses (“Jan”, “Fev”, “Mar” etc.).Em seguida, importe esse método para os testes do procedimento armazenado e transformador do DataFrame.
# test/test_transformers.py # No changes to the other unit test methods def test_calc_month_facts(request, session): # Add conditional to include the patch if local testing is being used if request.config.getoption('--snowflake-session') == 'local': from patches import patch_monthname # No other changes
Execute novamente
pytest
com o sinalizador local.pytest test/test_transformers.py --snowflake-session local
Agora aplique o mesmo patch ao teste do procedimento armazenado.
#test/test_sproc.py def test_create_fact_tables(request, session): # Add conditional to include the patch if local testing is being used if request.config.getoption('--snowflake-session') == 'local': from patches import patch_monthname # No other changes required
Execute novamente o pytest com o sinalizador local.
pytest test/test_sproc.py --snowflake-session local
Para finalizar, vamos comparar o tempo necessário para executar o conjunto de testes completo localmente com uma conexão ativa. Usaremos o comando
time
para medir o tempo necessário para ambos os comandos. Vamos começar com a conexão ativa.time pytest
Nesse caso, o conjunto de testes levou 7.89 segundos para ser executado. (A hora exata pode variar dependendo do computador, da conexão de rede e de outros fatores.)
=================================== test session starts ========================== platform darwin -- Python 3.9.18, pytest-7.4.3, pluggy-1.3.0 rootdir: /Users/jfreeberg/Desktop/snowpark-testing-tutorial configfile: pytest.ini collected 4 items test/test_sproc.py . [ 25%] test/test_transformers.py ... [100%] =================================== 4 passed in 6.86s ================================= pytest 1.63s user 1.86s system 44% cpu 7.893 total
Agora vamos tentar com a estrutura de teste local:
time pytest --snowflake-session local
Com a estrutura de testes local e o conjunto de testes, a execução levou apenas 1 segundo!
================================== test session starts ================================ platform darwin -- Python 3.9.18, pytest-7.4.3, pluggy-1.3.0 rootdir: /Users/jfreeberg/Desktop/snowpark-testing-tutorial configfile: pytest.ini collected 4 items test/test_sproc.py . [ 25%] test/test_transformers.py ... [100%] =================================== 4 passed in 0.10s ================================== pytest --snowflake-session local 1.37s user 1.70s system 281% cpu 1.093 total
Saiba mais¶
Você terminou! Muito bem.
Neste tutorial, você teve uma visão completa de como testar seu código Python Snowpark. Ao longo do caminho, você:
Criou um acessório de PyTest e adicionou testes de unidade e testes de integração.
Para obter mais informações, consulte Como escrever testes para Snowpark Python.
Testes locais configurados
Para obter mais informações, consulte Estrutura de teste local.