Estrutura de testes local¶
Este tópico explica como testar seu código localmente ao trabalhar com a biblioteca Python do Snowpark.
Neste tópico:
A estrutura de teste local do Snowpark Python é um emulador que permite que você crie e opere em DataFrames do Snowpark Python localmente sem se conectar a uma conta Snowflake. Você pode usar a estrutura de teste local para testar suas operações de DataFrame, em sua máquina de desenvolvimento ou em um pipeline de CI (integração contínua), antes de implantar alterações de código em sua conta. A API é a mesma, então é possível executar seus testes localmente ou em uma conta Snowflake sem fazer alterações no código.
Pré-requisitos¶
Para usar a estrutura de teste local:
Você deve usar a versão 1.18.0 ou posterior da biblioteca Snowpark Python com a dependência opcional
localtest
.As versões suportadas do Python são:
3.9
3,10
3,11
3,12
Instalação da biblioteca Snowpark Python¶
Para instalar a biblioteca com a dependência opcional, execute o seguinte comando:
pip install "snowflake-snowpark-python[localtest]"
Criação de uma sessão e habilitação de teste local¶
Crie uma
Session
Snowpark e defina a configuração de teste local comoTrue
:from snowflake.snowpark import Session session = Session.builder.config('local_testing', True).create()
Use a sessão para criar e operar em DataFrames:
df = session.create_dataframe([[1,2],[3,4]],['a','b']) df.with_column('c', df['a']+df['b']).show()
Carregamento de dados¶
É possível criar DataFrames do Snowpark a partir de primitivos Python, arquivos e DataFrames pandas. Isto é útil para especificar a entrada e a saída esperada dos casos de teste. Com esse método, os dados ficam no controle de origem, o que facilita manter os dados de teste sincronizados com os casos de teste.
Carregamento de dados CSV:¶
Para carregar arquivos CSV em um DataFrame Snowpark, primeiro chame
Session.file.put()
para carregar o arquivo no estágio na memória e depois useSession.read()
para ler o conteúdo.
Exemplo
Suponha que haja um arquivo, data.csv
, com o seguinte conteúdo:
col1,col2,col3,col4
1,a,true,1.23
2,b,false,4.56
É possível usar o seguinte código para carregar data.csv
em um DataFrame Snowpark. É necessário colocar o arquivo em um estágio primeiro; caso contrário, você receberá um erro “arquivo não pode ser encontrado”.
from snowflake.snowpark.types import StructType, StructField, IntegerType, BooleanType, StringType, DoubleType
# Put file onto stage
session.file.put("data.csv", "@mystage", auto_compress=False)
schema = StructType(
[
StructField("col1", IntegerType()),
StructField("col2", StringType()),
StructField("col3", BooleanType()),
StructField("col4", DoubleType()),
]
)
# with option SKIP_HEADER set to 1, the header will be skipped when the csv file is loaded
dataframe = session.read.schema(schema).option("SKIP_HEADER", 1).csv("@mystage/data.csv")
dataframe.show()
Saída esperada:
-------------------------------------
|"COL1" |"COL2" |"COL3" |"COL4" |
-------------------------------------
|1 |a |True |1.23 |
|2 |b |False |4.56 |
-------------------------------------
Carregamento de dados pandas¶
Para criar um DataFrame Snowpark Python a partir de um DataFrame pandas, chame o método
create_dataframe
e passe os dados como um DataFrame pandas.
Exemplo
import pandas as pd
pandas_df = pd.DataFrame(
data={
"col1": pd.Series(["value1", "value2"]),
"col2": pd.Series([1.23, 4.56]),
"col3": pd.Series([123, 456]),
"col4": pd.Series([True, False]),
}
)
dataframe = session.create_dataframe(data=pandas_df)
dataframe.show()
Saída esperada:
-------------------------------------
|"col1" |"col2" |"col3" |"col4" |
-------------------------------------
|value1 |1.23 |123 |True |
|value2 |4.56 |456 |False |
-------------------------------------
Para converter um DataFrame Snowpark Python em um DataFrame pandas, chame o método
to_pandas
no DataFrame.
Exemplo
from snowflake.snowpark.types import StructType, StructField, StringType, DoubleType, LongType, BooleanType
dataframe = session.create_dataframe(
data=[
["value1", 1.23, 123, True],
["value2", 4.56, 456, False],
],
schema=StructType([
StructField("col1", StringType()),
StructField("col2", DoubleType()),
StructField("col3", LongType()),
StructField("col4", BooleanType()),
])
)
pandas_dataframe = dataframe.to_pandas()
print(pandas_dataframe.to_string())
Saída esperada:
COL1 COL2 COL3 COL4
0 value1 1.23 123 True
1 value2 4.56 456 False
Criação de um acessório do PyTest para uma sessão¶
Os acessórios do PyTest são funções executadas antes de um teste (ou módulo de testes), normalmente para fornecer dados ou conexões para testes. Neste procedimento, você cria um acessório que retorna um objeto Session
Snowpark.
Se você ainda não tiver um diretório
test
, crie um.No diretório
test
, crie um arquivo chamadoconftest.py
com o seguinte conteúdo, ondeconnection_parameters
é um dicionário com as credenciais de sua conta Snowflake:# test/conftest.py import pytest from snowflake.snowpark.session import Session def pytest_addoption(parser): parser.addoption("--snowflake-session", action="store", default="live") @pytest.fixture(scope='module') def session(request) -> Session: if request.config.getoption('--snowflake-session') == 'local': return Session.builder.config('local_testing', True).create() else: return Session.builder.configs(CONNECTION_PARAMETERS).create()
Para obter mais informações sobre o formato do dicionário, consulte Como criar uma sessão.
A chamada para pytest_addoption
adiciona uma opção de linha de comando chamada snowflake-session
ao comando pytest
. O acessório de Session
verifica esta opção de linha de comando e cria um local ou Session
ativa dependendo de seu valor. Isso permite que você alterne facilmente entre os modos local e ativo para testes, conforme mostrado nos seguintes exemplos de linha de comando:
# Using local mode:
pytest --snowflake-session local
# Using live mode
pytest
Operações SQL¶
Session.sql(...)
não é compatível com a estrutura de teste local. Use as APIs de DataFrame do Snowpark sempre que possível e, nos casos em que for necessário usar Session.sql(...)
, é possível simular o valor de retorno tabular usando o unittest.mock.patch
Python para corrigir a resposta esperada de uma determinada chamada Session.sql()
.
No exemplo a seguir, mock_sql()
mapeia o texto da consulta SQL para a resposta de DataFrame desejada. A instrução condicional verifica se a sessão atual está usando testes locais e, em caso afirmativo, aplica o patch ao método Session.sql()
.
from unittest import mock
from functools import partial
def test_something(pytestconfig, session):
def mock_sql(session, sql_string): # patch for SQL operations
if sql_string == "select 1,2,3":
return session.create_dataframe([[1,2,3]])
else:
raise RuntimeError(f"Unexpected query execution: {sql_string}")
if pytestconfig.getoption('--snowflake-session') == 'local':
with mock.patch.object(session, 'sql', wraps=partial(mock_sql, session)): # apply patch for SQL operations
assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
else:
assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
Quando o teste local está ativado, todas as tabelas criadas por DataFrame.save_as_table()
são salvas como tabelas temporárias na memória e podem ser recuperadas usando Session.table()
. Você pode usar as operações de DataFrame suportadas na tabela normalmente.
Patch de funções internas¶
Algumas das funções internas em snowflake.snowpark.functions
não são compatíveis na estrutura de testes local. Se você usar uma função que não seja compatível, é possível usar o decorador @patch
de snowflake.snowpark.mock
para criar um patch.
Para que a função corrigida seja definida e implementada, a assinatura (lista de parâmetro) deve estar alinhada com os parâmetros da função interna. A estrutura de teste local passa parâmetros para a função com patch usando as seguintes regras:
Para parâmetros do tipo
ColumnOrName
na assinatura de funções internas,ColumnEmulator
é passado como parâmetro das funções com patch.ColumnEmulator
é semelhante a um objetopandas.Series
que contém os dados da coluna.Para parâmetros do tipo
LiteralType
na assinatura de funções internas, o valor literal é passado como parâmetro das funções com patch.Caso contrário, o valor bruto é passado como parâmetro das funções com patch.
Quanto ao tipo de retorno das funções com patch, o retorno de uma instância de ColumnEmulator
é esperado em correspondência com o tipo de retorno de Column
das funções internas.
Por exemplo, a função interna to_timestamp()
poderia receber um patch assim:
import datetime
from snowflake.snowpark.mock import patch, ColumnEmulator, ColumnType
from snowflake.snowpark.functions import to_timestamp
from snowflake.snowpark.types import TimestampType
@patch(to_timestamp)
def mock_to_timestamp(column: ColumnEmulator, format = None) -> ColumnEmulator:
ret_column = ColumnEmulator(data=[datetime.datetime.strptime(row, '%Y-%m-%dT%H:%M:%S%z') for row in column])
ret_column.sf_type = ColumnType(TimestampType(), True)
return ret_column
Como ignorar casos de teste¶
Se o seu conjunto de testes PyTest contiver um caso de teste que não seja bem compatível pelos testes locais, é possível ignorar esses casos usando o decorador mark.skipif
de PyTest. O exemplo a seguir pressupõe que você tenha configurado sua sessão e parâmetros conforme descrito anteriormente. A condição verifica se local_testing_mode
está definido como local
; se estiver, o caso de teste é ignorado com uma mensagem explicativa.
import pytest
@pytest.mark.skipif(
condition="config.getvalue('local_testing_mode') == 'local'",
reason="Test case disabled for local testing"
)
def test_case(session):
...
Registro de UDFs e procedimentos armazenados¶
Você pode criar e chamar funções definidas pelo usuário (UDFs) e procedimentos armazenados na estrutura de testes local. Para criar os objetos, é possível usar as seguintes opções de sintaxe:
Sintaxe |
UDF |
Procedimento armazenado |
---|---|---|
Decoradores |
|
|
Métodos de registro |
|
|
Métodos de registro a partir de arquivo |
|
|
Exemplo
O exemplo de código a seguir cria uma UDF e um procedimento armazenado usando os decoradores e, em seguida, chama ambos pelo nome:
from snowflake.snowpark.session import Session
from snowflake.snowpark.dataframe import col, DataFrame
from snowflake.snowpark.functions import udf, sproc, call_udf
from snowflake.snowpark.types import IntegerType, StringType
# Create local session
session = Session.builder.config('local_testing', True).create()
# Create local table
table = 'example'
session.create_dataframe([[1,2],[3,4]],['a','b']).write.save_as_table(table)
# Register a UDF, which is called from the stored procedure
@udf(name='example_udf', return_type=IntegerType(), input_types=[IntegerType(), IntegerType()])
def example_udf(a, b):
return a + b
# Register stored procedure
@sproc(name='example_proc', return_type=IntegerType(), input_types=[StringType()])
def example_proc(session, table_name):
return session.table(table_name)\
.with_column('c', call_udf('example_udf', col('a'), col('b')))\
.count()
# Call the stored procedure by name
output = session.call('example_proc', table)
print(output)
Limitações¶
A lista a seguir contém as limitações conhecidas e lacunas de comportamento na estrutura de testes local. A Snowflake atualmente não tem planos de abordar esses itens.
Cadeias de caracteres SQL brutas e operações que exigem análise de cadeias de cadeias de caracteres SQL, como
session.sql
eDataFrame.filter("col1 > 12")
, não são compatíveis.Operações assíncronas não são suportadas.
Objetos de banco de dados, como tabelas, procedimentos armazenados e UDFs não são persistidos além do nível da sessão e todas as operações são executadas na memória. Por exemplo, procedimentos armazenados permanentes registrados em uma sessão simulada não são visíveis para outras sessões simuladas.
Recursos relacionados a agrupamento de cadeias de caracteres, como
Column.collate
, não são suportados.Tipos de dados
Variant
,Array
eObject
só são suportados com codificação e decodificação JSON padrão. Expressões como [1,2,,3,] são consideradas como JSON válido no Snowflake, mas não em testes locais, onde as funcionalidades JSON internas do Python são usadas. Você pode especificar as variáveis de nível de módulosnowflake.snowpark.mock.CUSTOM_JSON_ENCODER
esnowflake.snowpark.mock.CUSTOM_JSON_DECODER
para substituir as configurações padrão.Apenas um subconjunto de funções do Snowflake (incluindo funções de janela) é implementado. Para saber como injetar sua própria definição de função, consulte Patch de funções internas.
Atualmente, não há suporte para corrigir funções relacionadas à classificação.
Modelos de formato SQL não são suportados. Por exemplo, a implementação simulada de
to_decimal
não manipula o parâmetro opcionalformat
.A biblioteca Snowpark Python não tem uma API Python integrada para criar ou remover estágios, então a estrutura de teste local assume que cada estágio de entrada já tenha sido criado.
A implementação atual de UDFs e os procedimentos armazenados não realizam nenhuma validação de pacote. Todos os pacotes referenciados no seu código precisam ser instalados antes que o programa seja executado.
Tags de consulta não são suportadas.
O histórico de consultas não é suportado.
A linhagem não é suportada.
Quando uma UDF ou procedimento armazenado é registrado, parâmetros opcionais como
parallel
,execute_as
,statement_params
,source_code_display
,external_access_integrations
,secrets
ecomment
são ignorados.Para
Table.sample
, a amostragem de SYSTEM ou BLOCK é a mesma que a amostragem ROW.O Snowflake não oferece suporte oficial à execução da estrutura de testes local dentro de procedimentos armazenados. Sessões do modo de teste local dentro de procedimentos armazenados podem encontrar ou acionar erros inesperados.
Recursos sem suporte¶
A seguir está uma lista de recursos que atualmente não estão implementados na estrutura de testes local. A Snowflake está trabalhando ativamente para resolver esses itens.
Em geral, qualquer referência a essas funcionalidades deve gerar um NotImplementedError
:
UDTFs (funções de tabela definidas pelo usuário)
UDAFs (funções agregadas definidas pelo usuário)
UDFs e UDTFs vetorizadas
Funções de tabela integradas
Procedimentos armazenados de tabela
Tipos de dados
Geometry
,Geography
eVector
.Expressões de intervalo
Leitura de formatos de arquivo diferentes de JSON e CSV
Para um formato de arquivo suportado, nem todas as opções de leitura são suportadas. Por exemplo,
infer_schema
não é suportado para o formato CSV.
Para quaisquer recursos não listados aqui como não suportados ou como uma limitação conhecida, verifique a lista mais recente de solicitações de recursos para testes locais, ou crie uma solicitação de recurso no repositório snowpark-python
GitHub.
Problemas conhecidos¶
A seguir está uma lista de problemas conhecidos ou lacunas de comportamento que existem na estrutura de testes local. A Snowflake está planejando ativamente resolver esses problemas.
O uso de funções de janela dentro de
DataFrame.groupby
ou outras operações de agregação não são suportadas.# Selecting window function expressions is supported df.select("key", "value", sum_("value").over(), avg("value").over()) # Aggregating window function expressions is NOT supported df.group_by("key").agg([sum_("value"), sum_(sum_("value")).over(window) - sum_("value")])
Selecionar colunas com o mesmo nome retornará apenas uma coluna. Como solução alternativa, use
Column.alias
para renomear as colunas para que tenham nomes distintos.df.select(lit(1), lit(1)).show() # col("a"), col("a") #--------- #|"'1'" | #--------- #|1 | #|... | #--------- # Workaround: Column.alias DataFrame.select(lit(1).alias("col1_1"), lit(1).alias("col1_2")) # "col1_1", "col1_2"
Para
Table.merge
eTable.update
, os parâmetros da sessãoERROR_ON_NONDETERMINISTIC_UPDATE
eERROR_ON_NONDETERMINISTIC_MERGE
devem ser definidos comoFalse
. Isso significa que, para junções múltiplas, uma das linhas correspondentes é atualizada.Nomes de estágio totalmente qualificados em operações de arquivo GET e PUT não são suportados. Nomes de banco de dados e esquemas são tratados como parte do nome do estágio.
A implementação de
mock_to_char
só suporta carimbos de data/hora em um formato que tenha separadores entre diferentes partes de tempo.DataFrame.pivot
tem um parâmetro chamadovalues
que permite que um pivô seja limitado a valores específicos. Somente valores definidos estatisticamente podem ser usados neste momento. Valores fornecidos usando uma subconsulta gerarão um erro.Não há suporte para a criação de um
DataFrame
a partir de umDataFrame
pandas que contenha um carimbo de data/hora com informações de fuso horário.
Para quaisquer problemas não mencionados nesta lista, verifique a lista de problemas em aberto mais recente ou crie um relatório de bug no repositório snowpark-python
GitHub.