Estrutura de testes local

Este tópico explica como testar seu código localmente ao trabalhar com a biblioteca Python do Snowpark.

Neste tópico:

A estrutura de teste local do Snowpark Python é um emulador que permite que você crie e opere em DataFrames do Snowpark Python localmente sem se conectar a uma conta Snowflake. Você pode usar a estrutura de teste local para testar suas operações de DataFrame, em sua máquina de desenvolvimento ou em um pipeline de CI (integração contínua), antes de implantar alterações de código em sua conta. A API é a mesma, então é possível executar seus testes localmente ou em uma conta Snowflake sem fazer alterações no código.

Pré-requisitos

Para usar a estrutura de teste local:

  • Você deve usar a versão 1.18.0 ou posterior da biblioteca Snowpark Python com a dependência opcional localtest.

  • As versões suportadas do Python são:

    • 3.9

    • 3,10

    • 3,11

    • 3,12

Instalação da biblioteca Snowpark Python

  • Para instalar a biblioteca com a dependência opcional, execute o seguinte comando:

    pip install "snowflake-snowpark-python[localtest]"
    
    Copy

Criação de uma sessão e habilitação de teste local

  1. Crie uma Session Snowpark e defina a configuração de teste local como True:

    from snowflake.snowpark import Session
    
    session = Session.builder.config('local_testing', True).create()
    
    Copy
  2. Use a sessão para criar e operar em DataFrames:

    df = session.create_dataframe([[1,2],[3,4]],['a','b'])
    df.with_column('c', df['a']+df['b']).show()
    
    Copy

Carregamento de dados

É possível criar DataFrames do Snowpark a partir de primitivos Python, arquivos e DataFrames pandas. Isto é útil para especificar a entrada e a saída esperada dos casos de teste. Com esse método, os dados ficam no controle de origem, o que facilita manter os dados de teste sincronizados com os casos de teste.

Carregamento de dados CSV:

  • Para carregar arquivos CSV em um DataFrame Snowpark, primeiro chame Session.file.put() para carregar o arquivo no estágio na memória e depois use Session.read() para ler o conteúdo.

Exemplo

Suponha que haja um arquivo, data.csv, com o seguinte conteúdo:

col1,col2,col3,col4
1,a,true,1.23
2,b,false,4.56
Copy

É possível usar o seguinte código para carregar data.csv em um DataFrame Snowpark. É necessário colocar o arquivo em um estágio primeiro; caso contrário, você receberá um erro “arquivo não pode ser encontrado”.

from snowflake.snowpark.types import StructType, StructField, IntegerType, BooleanType, StringType, DoubleType


# Put file onto stage
session.file.put("data.csv", "@mystage", auto_compress=False)
schema = StructType(
    [
        StructField("col1", IntegerType()),
        StructField("col2", StringType()),
        StructField("col3", BooleanType()),
        StructField("col4", DoubleType()),
    ]
)

# with option SKIP_HEADER set to 1, the header will be skipped when the csv file is loaded
dataframe = session.read.schema(schema).option("SKIP_HEADER", 1).csv("@mystage/data.csv")
dataframe.show()
Copy

Saída esperada:

-------------------------------------
|"COL1"  |"COL2"  |"COL3"  |"COL4"  |
-------------------------------------
|1       |a       |True    |1.23    |
|2       |b       |False   |4.56    |
-------------------------------------

Carregamento de dados pandas

  • Para criar um DataFrame Snowpark Python a partir de um DataFrame pandas, chame o método create_dataframe e passe os dados como um DataFrame pandas.

Exemplo

import pandas as pd

pandas_df = pd.DataFrame(
    data={
        "col1": pd.Series(["value1", "value2"]),
        "col2": pd.Series([1.23, 4.56]),
        "col3": pd.Series([123, 456]),
        "col4": pd.Series([True, False]),
    }
)

dataframe = session.create_dataframe(data=pandas_df)
dataframe.show()
Copy

Saída esperada:

-------------------------------------
|"col1"  |"col2"  |"col3"  |"col4"  |
-------------------------------------
|value1  |1.23    |123     |True    |
|value2  |4.56    |456     |False   |
-------------------------------------
  • Para converter um DataFrame Snowpark Python em um DataFrame pandas, chame o método to_pandas no DataFrame.

Exemplo

from snowflake.snowpark.types import StructType, StructField, StringType, DoubleType, LongType, BooleanType

dataframe = session.create_dataframe(
    data=[
        ["value1", 1.23, 123, True],
        ["value2", 4.56, 456, False],
    ],
    schema=StructType([
        StructField("col1", StringType()),
        StructField("col2", DoubleType()),
        StructField("col3", LongType()),
        StructField("col4", BooleanType()),
    ])
)

pandas_dataframe = dataframe.to_pandas()
print(pandas_dataframe.to_string())
Copy

Saída esperada:

    COL1  COL2  COL3   COL4
0  value1  1.23   123   True
1  value2  4.56   456  False

Criação de um acessório do PyTest para uma sessão

Os acessórios do PyTest são funções executadas antes de um teste (ou módulo de testes), normalmente para fornecer dados ou conexões para testes. Neste procedimento, você cria um acessório que retorna um objeto Session Snowpark.

  1. Se você ainda não tiver um diretório test, crie um.

  2. No diretório test, crie um arquivo chamado conftest.py com o seguinte conteúdo, onde connection_parameters é um dicionário com as credenciais de sua conta Snowflake:

    # test/conftest.py
    import pytest
    from snowflake.snowpark.session import Session
    
    def pytest_addoption(parser):
        parser.addoption("--snowflake-session", action="store", default="live")
    
    @pytest.fixture(scope='module')
    def session(request) -> Session:
        if request.config.getoption('--snowflake-session') == 'local':
            return Session.builder.config('local_testing', True).create()
        else:
            return Session.builder.configs(CONNECTION_PARAMETERS).create()
    
    Copy

Para obter mais informações sobre o formato do dicionário, consulte Como criar uma sessão.

A chamada para pytest_addoption adiciona uma opção de linha de comando chamada snowflake-session ao comando pytest. O acessório de Session verifica esta opção de linha de comando e cria um local ou Session ativa dependendo de seu valor. Isso permite que você alterne facilmente entre os modos local e ativo para testes, conforme mostrado nos seguintes exemplos de linha de comando:

# Using local mode:
pytest --snowflake-session local

# Using live mode
pytest
Copy

Operações SQL

Session.sql(...) não é compatível com a estrutura de teste local. Use as APIs de DataFrame do Snowpark sempre que possível e, nos casos em que for necessário usar Session.sql(...), é possível simular o valor de retorno tabular usando o unittest.mock.patch Python para corrigir a resposta esperada de uma determinada chamada Session.sql().

No exemplo a seguir, mock_sql() mapeia o texto da consulta SQL para a resposta de DataFrame desejada. A instrução condicional verifica se a sessão atual está usando testes locais e, em caso afirmativo, aplica o patch ao método Session.sql().

from unittest import mock
from functools import partial

def test_something(pytestconfig, session):

    def mock_sql(session, sql_string):  # patch for SQL operations
        if sql_string == "select 1,2,3":
            return session.create_dataframe([[1,2,3]])
        else:
            raise RuntimeError(f"Unexpected query execution: {sql_string}")

    if pytestconfig.getoption('--snowflake-session') == 'local':
        with mock.patch.object(session, 'sql', wraps=partial(mock_sql, session)): # apply patch for SQL operations
            assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
    else:
        assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
Copy

Quando o teste local está ativado, todas as tabelas criadas por DataFrame.save_as_table() são salvas como tabelas temporárias na memória e podem ser recuperadas usando Session.table(). Você pode usar as operações de DataFrame suportadas na tabela normalmente.

Patch de funções internas

Algumas das funções internas em snowflake.snowpark.functions não são compatíveis na estrutura de testes local. Se você usar uma função que não seja compatível, é possível usar o decorador @patch de snowflake.snowpark.mock para criar um patch.

Para que a função corrigida seja definida e implementada, a assinatura (lista de parâmetro) deve estar alinhada com os parâmetros da função interna. A estrutura de teste local passa parâmetros para a função com patch usando as seguintes regras:

  • Para parâmetros do tipo ColumnOrName na assinatura de funções internas, ColumnEmulator é passado como parâmetro das funções com patch. ColumnEmulator é semelhante a um objeto pandas.Series que contém os dados da coluna.

  • Para parâmetros do tipo LiteralType na assinatura de funções internas, o valor literal é passado como parâmetro das funções com patch.

  • Caso contrário, o valor bruto é passado como parâmetro das funções com patch.

Quanto ao tipo de retorno das funções com patch, o retorno de uma instância de ColumnEmulator é esperado em correspondência com o tipo de retorno de Column das funções internas.

Por exemplo, a função interna to_timestamp() poderia receber um patch assim:

import datetime
from snowflake.snowpark.mock import patch, ColumnEmulator, ColumnType
from snowflake.snowpark.functions import to_timestamp
from snowflake.snowpark.types import TimestampType

@patch(to_timestamp)
def mock_to_timestamp(column: ColumnEmulator, format = None) -> ColumnEmulator:
    ret_column = ColumnEmulator(data=[datetime.datetime.strptime(row, '%Y-%m-%dT%H:%M:%S%z') for row in column])
    ret_column.sf_type = ColumnType(TimestampType(), True)
    return ret_column
Copy

Como ignorar casos de teste

Se o seu conjunto de testes PyTest contiver um caso de teste que não seja bem compatível pelos testes locais, é possível ignorar esses casos usando o decorador mark.skipif de PyTest. O exemplo a seguir pressupõe que você tenha configurado sua sessão e parâmetros conforme descrito anteriormente. A condição verifica se local_testing_mode está definido como local; se estiver, o caso de teste é ignorado com uma mensagem explicativa.

import pytest

@pytest.mark.skipif(
    condition="config.getvalue('local_testing_mode') == 'local'",
reason="Test case disabled for local testing"
)
def test_case(session):
    ...
Copy

Registro de UDFs e procedimentos armazenados

Você pode criar e chamar funções definidas pelo usuário (UDFs) e procedimentos armazenados na estrutura de testes local. Para criar os objetos, é possível usar as seguintes opções de sintaxe:

Sintaxe

UDF

Procedimento armazenado

Decoradores

@udf

@sproc

Métodos de registro

udf.register()

sproc.register()

Métodos de registro a partir de arquivo

udf.register_from_file()

sproc.register_from_file()

Exemplo

O exemplo de código a seguir cria uma UDF e um procedimento armazenado usando os decoradores e, em seguida, chama ambos pelo nome:

from snowflake.snowpark.session import Session
from snowflake.snowpark.dataframe import col, DataFrame
from snowflake.snowpark.functions import udf, sproc, call_udf
from snowflake.snowpark.types import IntegerType, StringType

# Create local session
session = Session.builder.config('local_testing', True).create()

# Create local table
table = 'example'
session.create_dataframe([[1,2],[3,4]],['a','b']).write.save_as_table(table)

# Register a UDF, which is called from the stored procedure
@udf(name='example_udf', return_type=IntegerType(), input_types=[IntegerType(), IntegerType()])
def example_udf(a, b):
    return a + b

# Register stored procedure
@sproc(name='example_proc', return_type=IntegerType(), input_types=[StringType()])
def example_proc(session, table_name):
    return session.table(table_name)\
        .with_column('c', call_udf('example_udf', col('a'), col('b')))\
        .count()

# Call the stored procedure by name
output = session.call('example_proc', table)

print(output)
Copy

Limitações

A lista a seguir contém as limitações conhecidas e lacunas de comportamento na estrutura de testes local. A Snowflake atualmente não tem planos de abordar esses itens.

  • Cadeias de caracteres SQL brutas e operações que exigem análise de cadeias de cadeias de caracteres SQL, como session.sql e DataFrame.filter("col1 > 12"), não são compatíveis.

  • Operações assíncronas não são suportadas.

  • Objetos de banco de dados, como tabelas, procedimentos armazenados e UDFs não são persistidos além do nível da sessão e todas as operações são executadas na memória. Por exemplo, procedimentos armazenados permanentes registrados em uma sessão simulada não são visíveis para outras sessões simuladas.

  • Recursos relacionados a agrupamento de cadeias de caracteres, como Column.collate, não são suportados.

  • Tipos de dados Variant, Array e Object só são suportados com codificação e decodificação JSON padrão. Expressões como [1,2,,3,] são consideradas como JSON válido no Snowflake, mas não em testes locais, onde as funcionalidades JSON internas do Python são usadas. Você pode especificar as variáveis de nível de módulo snowflake.snowpark.mock.CUSTOM_JSON_ENCODER e snowflake.snowpark.mock.CUSTOM_JSON_DECODER para substituir as configurações padrão.

  • Apenas um subconjunto de funções do Snowflake (incluindo funções de janela) é implementado. Para saber como injetar sua própria definição de função, consulte Patch de funções internas.

    • Atualmente, não há suporte para corrigir funções relacionadas à classificação.

  • Modelos de formato SQL não são suportados. Por exemplo, a implementação simulada de to_decimal não manipula o parâmetro opcional format.

  • A biblioteca Snowpark Python não tem uma API Python integrada para criar ou remover estágios, então a estrutura de teste local assume que cada estágio de entrada já tenha sido criado.

  • A implementação atual de UDFs e os procedimentos armazenados não realizam nenhuma validação de pacote. Todos os pacotes referenciados no seu código precisam ser instalados antes que o programa seja executado.

  • Tags de consulta não são suportadas.

  • O histórico de consultas não é suportado.

  • A linhagem não é suportada.

  • Quando uma UDF ou procedimento armazenado é registrado, parâmetros opcionais como parallel, execute_as, statement_params, source_code_display, external_access_integrations, secrets e comment são ignorados.

  • Para Table.sample, a amostragem de SYSTEM ou BLOCK é a mesma que a amostragem ROW.

  • O Snowflake não oferece suporte oficial à execução da estrutura de testes local dentro de procedimentos armazenados. Sessões do modo de teste local dentro de procedimentos armazenados podem encontrar ou acionar erros inesperados.

Recursos sem suporte

A seguir está uma lista de recursos que atualmente não estão implementados na estrutura de testes local. A Snowflake está trabalhando ativamente para resolver esses itens.

Em geral, qualquer referência a essas funcionalidades deve gerar um NotImplementedError:

  • UDTFs (funções de tabela definidas pelo usuário)

  • UDAFs (funções agregadas definidas pelo usuário)

  • UDFs e UDTFs vetorizadas

  • Funções de tabela integradas

  • Procedimentos armazenados de tabela

  • Tipos de dados Geometry, Geography e Vector.

  • Expressões de intervalo

  • Leitura de formatos de arquivo diferentes de JSON e CSV

    • Para um formato de arquivo suportado, nem todas as opções de leitura são suportadas. Por exemplo, infer_schema não é suportado para o formato CSV.

Para quaisquer recursos não listados aqui como não suportados ou como uma limitação conhecida, verifique a lista mais recente de solicitações de recursos para testes locais, ou crie uma solicitação de recurso no repositório snowpark-python GitHub.

Problemas conhecidos

A seguir está uma lista de problemas conhecidos ou lacunas de comportamento que existem na estrutura de testes local. A Snowflake está planejando ativamente resolver esses problemas.

  • O uso de funções de janela dentro de DataFrame.groupby ou outras operações de agregação não são suportadas.

    # Selecting window function expressions is supported
    df.select("key", "value", sum_("value").over(), avg("value").over())
    
    # Aggregating window function expressions is NOT supported
    df.group_by("key").agg([sum_("value"), sum_(sum_("value")).over(window) - sum_("value")])
    
    Copy
  • Selecionar colunas com o mesmo nome retornará apenas uma coluna. Como solução alternativa, use Column.alias para renomear as colunas para que tenham nomes distintos.

    df.select(lit(1), lit(1)).show() # col("a"), col("a")
    #---------
    #|"'1'"  |
    #---------
    #|1      |
    #|...    |
    #---------
    
    # Workaround: Column.alias
    DataFrame.select(lit(1).alias("col1_1"), lit(1).alias("col1_2"))
    # "col1_1", "col1_2"
    
    Copy
  • Para Table.merge e Table.update, os parâmetros da sessão ERROR_ON_NONDETERMINISTIC_UPDATE e ERROR_ON_NONDETERMINISTIC_MERGE devem ser definidos como False. Isso significa que, para junções múltiplas, uma das linhas correspondentes é atualizada.

  • Nomes de estágio totalmente qualificados em operações de arquivo GET e PUT não são suportados. Nomes de banco de dados e esquemas são tratados como parte do nome do estágio.

  • A implementação de mock_to_char só suporta carimbos de data/hora em um formato que tenha separadores entre diferentes partes de tempo.

  • DataFrame.pivot tem um parâmetro chamado values que permite que um pivô seja limitado a valores específicos. Somente valores definidos estatisticamente podem ser usados neste momento. Valores fornecidos usando uma subconsulta gerarão um erro.

  • Não há suporte para a criação de um DataFrame a partir de um DataFrame pandas que contenha um carimbo de data/hora com informações de fuso horário.

Para quaisquer problemas não mencionados nesta lista, verifique a lista de problemas em aberto mais recente ou crie um relatório de bug no repositório snowpark-python GitHub.