Estrutura de teste local

Este tópico explica como testar seu código localmente ao trabalhar com a biblioteca Snowpark.

Neste tópico:

A estrutura de testes locais do Snowpark Python permite criar e operar em DataFrames do Snowpark Python localmente sem se conectar a uma conta Snowflake. Você pode usar a estrutura de teste local para testar suas operações de DataFrame localmente, em sua máquina de desenvolvimento ou em um pipeline de CI (integração contínua), antes de implantar alterações de código em sua conta. A API é a mesma, então você pode executar seus testes localmente ou em uma conta Snowflake, sem fazer alterações no código.

Pré-requisitos

Para usar a estrutura de teste local:

  • Você deve usar a versão 1.11.1 ou superior da biblioteca Snowpark Python com a dependência opcional pandas. Instalação executando pip install "snowflake-snowpark-python[pandas]"

  • As versões suportadas do Python são:

    • 3.8

    • 3.9

    • 3,10

    • 3,11

Criação de uma sessão e ativação de testes locais

Para começar, crie uma Session Snowpark e defina a configuração de teste local como True.

from snowflake.snowpark import Session

session = Session.builder.config('local_testing', True).create()
Copy

Depois que a sessão for criada, você poderá usá-la para criar e operar em DataFrames.

df = session.create_dataframe([[1,2],[3,4]],['a','b'])
df.with_column('c', df['a']+df['b']).show()
Copy

Carregamento de dados

Você pode criar DataFrames do Snowpark a partir de primitivos Python, arquivos e DataFrames Pandas. Isto é útil para especificar a entrada e a saída esperada dos casos de teste. Ao fazer isso dessa forma, os dados ficam no controle de origem, o que torna mais fácil manter os dados de teste sincronizados com os casos de teste.

Carregamento de dados CSV

Você pode carregar arquivos CSV em um DataFrame do Snowpark chamando primeiro Session.file.put() para carregar o arquivo no estágio na memória e depois usando Session.read() para ler o conteúdo. Suponha que haja um arquivo, data.csv, e que o arquivo tenha o seguinte conteúdo:

col1,col2,col3,col4
1,a,true,1.23
2,b,false,4.56
Copy

Você pode carregar data.csv em um DataFrame do Snowpark usando o código a seguir. Você precisa primeiro colocar o arquivo em um estágio. Caso contrário, você receberá um erro de arquivo não encontrado.

from snowflake.snowpark.types import StructType, StructField, IntegerType, BooleanType, StringType, DoubleType


# Put file onto stage
session.file.put("data.csv", "@mystage", auto_compress=False)
schema = StructType(
    [
        StructField("col1", IntegerType()),
        StructField("col2", StringType()),
        StructField("col3", BooleanType()),
        StructField("col4", DoubleType()),
    ]
)

# with option SKIP_HEADER set to 1, the header will be skipped when the csv file is loaded
dataframe = session.read.schema(schema).option("SKIP_HEADER", 1).csv("@mystage/data.csv")
dataframe.show()
Copy

A saída de dataframe.show() será:

-------------------------------------
|"COL1"  |"COL2"  |"COL3"  |"COL4"  |
-------------------------------------
|1       |a       |True    |1.23    |
|2       |b       |False   |4.56    |
-------------------------------------

Carregamento de dados do Pandas

Você pode criar um DataFrame do Snowpark Python a partir de um Pandas DataFrame chamando o método create_dataframe e passando os dados como um Pandas DataFrame.

import pandas as pd

pandas_df = pd.DataFrame(
    data={
        "col1": pd.Series(["value1", "value2"]),
        "col2": pd.Series([1.23, 4.56]),
        "col3": pd.Series([123, 456]),
        "col4": pd.Series([True, False]),
    }
)

dataframe = session.create_dataframe(data=pandas_df)
dataframe.show()
Copy

A saída de dataframe.show() é a seguinte:

-------------------------------------
|"col1"  |"col2"  |"col3"  |"col4"  |
-------------------------------------
|value1  |1.23    |123     |True    |
|value2  |4.56    |456     |False   |
-------------------------------------

Um DataFrame do Snowpark Python também pode ser convertido em um Pandas DataFrame chamando o método to_pandas no DataFrame.

from snowflake.snowpark.types import StructType, StructField, StringType, DoubleType, LongType, BooleanType

dataframe = session.create_dataframe(
    data=[
        ["value1", 1.23, 123, True],
        ["value2", 4.56, 456, False],
    ],
    schema=StructType([
        StructField("col1", StringType()),
        StructField("col2", DoubleType()),
        StructField("col3", LongType()),
        StructField("col4", BooleanType()),
    ])
)

pandas_dataframe = dataframe.to_pandas()
print(pandas_dataframe.to_string())
Copy

A chamada para print(pandas_dataframe.to_string()) gera a seguinte saída:

    COL1  COL2  COL3   COL4
0  value1  1.23   123   True
1  value2  4.56   456  False

Criação de um acessório do PyTest para sessão

Os acessórios do PyTest são funções executadas antes de um teste (ou módulo de testes), normalmente para fornecer dados ou conexões para testes. Neste caso, crie um acessório que retorne um objeto Snowpark Session. Primeiro, crie um diretório test se ainda não tiver um. Em seguida, no diretório test, crie um arquivo conftest.py com o seguinte conteúdo, onde connection_parameters é um dicionário com as credenciais da sua conta Snowflake. Para obter mais informações sobre o formato do dicionário, consulte Como criar uma sessão.

# test/conftest.py
import pytest
from snowflake.snowpark.session import Session

def pytest_addoption(parser):
    parser.addoption("--snowflake-session", action="store", default="live")

@pytest.fixture(scope='module')
def session(request) -> Session:
    if request.config.getoption('--snowflake-session') == 'local':
        return Session.builder.config('local_testing', True).create()
    else:
        return Session.builder.configs(CONNECTION_PARAMETERS).create()
Copy

A chamada para pytest_addoption adiciona uma opção de linha de comando chamada snowflake-session ao comando pytest. O acessório Session verifica esta opção de linha de comando e cria um Session local ou ativo dependendo de seu valor. Isso permite alternar facilmente entre os modos local e ativo para teste.

# Using local mode:
pytest --snowflake-session local

# Using live mode
pytest
Copy

Operações SQL

Session.sql(...) não é compatível com a estrutura de teste local. Use as APIs do DataFrame do Snowpark sempre que possível e, nos casos em que você deve usar Session.sql(...), você pode simular o valor de retorno tabular usando unittest.mock.patch do Python para corrigir a resposta esperada de uma determinada chamada de Session.sql().

No exemplo abaixo, mock_sql() mapeia o texto da consulta SQL para a resposta de DataFrame desejada. A instrução condicional a seguir verifica se a sessão atual está usando testes locais e, em caso afirmativo, aplica o patch ao método Session.sql().

from unittest import mock
from functools import partial

def test_something(pytestconfig, session):

    def mock_sql(session, sql_string):  # patch for SQL operations
        if sql_string == "select 1,2,3":
            return session.create_dataframe([[1,2,3]])
        else:
            raise RuntimeError(f"Unexpected query execution: {sql_string}")

    if pytestconfig.getoption('--snowflake-session') == 'local':
        with mock.patch.object(session, 'sql', wraps=partial(mock_sql, session)): # apply patch for SQL operations
            assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
    else:
        assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
Copy

Quando o teste local está ativado, todas as tabelas criadas por DataFrame.save_as_table() são salvas como tabelas temporárias na memória e podem ser recuperadas usando Session.table(). Você pode usar as operações de DataFrame suportadas na tabela normalmente.

Patch de funções internas

Nem todas as funções internas em snowflake.snowpark.functions são suportadas na estrutura de teste local. Se você usar uma função que não é suportada, será necessário usar o decorador @patch de snowflake.snowpark.mock para criar um patch.

Para definir e implementar a função com patch, a assinatura (lista de parâmetros) deve estar alinhada com os parâmetros da função interna. A estrutura de teste local passa parâmetros para a função com patch usando as seguintes regras:

  • Para parâmetros do tipo ColumnOrName na assinatura de funções internas, ColumnEmulator é passado como parâmetro das funções com patch. ColumnEmulator é semelhante a um objeto pandas.Series que contém os dados da coluna.

  • Para parâmetros do tipo LiteralType na assinatura de funções internas, o valor literal é passado como parâmetro das funções com patch.

  • Caso contrário, o valor bruto é passado como parâmetro das funções com patch.

Quanto ao tipo de retorno das funções com patch, o retorno de uma instância de ColumnEmulator é esperado em correspondência com o tipo de retorno de Column das funções internas.

Por exemplo, a função interna to_timestamp() poderia receber um patch assim:

import datetime
from snowflake.snowpark.mock import patch, ColumnEmulator, ColumnType
from snowflake.snowpark.functions import to_timestamp
from snowflake.snowpark.types import TimestampType

@patch(to_timestamp)
def mock_to_timestamp(column: ColumnEmulator, format = None) -> ColumnEmulator:
    ret_column = ColumnEmulator(data=[datetime.datetime.strptime(row, '%Y-%m-%dT%H:%M:%S%z') for row in column])
    ret_column.sf_type = ColumnType(TimestampType(), True)
    return ret_column
Copy

Como ignorar casos de teste

Se seu conjunto de testes PyTest contiver um caso de teste que não seja bem suportado por testes locais, você poderá ignorar esses casos usando o decorador mark.skipif de PyTest. O exemplo abaixo pressupõe que você tenha configurado sua sessão e parâmetros conforme descrito anteriormente. A condição verifica se local_testing_mode está definido como local e, em caso afirmativo, o caso de teste é ignorado com uma mensagem explicando por que foi ignorado.

import pytest

@pytest.mark.skipif(
    condition="config.getvalue('local_testing_mode') == 'local'",
reason="Test case disabled for local testing"
)
def test_case(session):
    ...
Copy

Limitações

A seguinte funcionalidade não é suportada:

  • Cadeias de caracteres SQL brutas e operações que exigem análise de cadeias de caracteres SQL. Por exemplo, session.sql e DataFrame.filter("col1 > 12") não são suportados.

  • UDFs, UDTFs e procedimentos armazenados.

  • Funções de tabela.

  • AsyncJobs.

  • Operações de sessão, como alteração de warehouses, esquemas e outras propriedades de sessão.

  • Tipos de dados Geometry e Geography.

  • Agregando funções de janela.

    # Selecting window function expressions is supported
    df.select("key", "value", sum_("value").over(), avg("value").over())
    
    # Aggregating window function expressions is NOT supported
    df.group_by("key").agg([sum_("value"), sum_(sum_("value")).over(window) - sum_("value")])
    
    Copy

Outras limitações são:

  • Tipos de dados Variant, Array e Object só são suportados com codificação e decodificação JSON padrão. Expressões como [1,2,,3,] são consideradas como JSON válido no Snowflake, mas não em testes locais, onde as funcionalidades JSON internas do Python são usadas. Você pode especificar as variáveis de nível de módulo snowflake.snowpark.mock.CUSTOM_JSON_ENCODER e snowflake.snowpark.mock.CUSTOM_JSON_DECODER para substituir as configurações padrão.

  • Apenas um subconjunto de funções do Snowflake (incluindo funções de janela) é implementado. Consulte Patch de funções internas para saber como injetar sua própria definição de função.

  • Atualmente, não há suporte para patch de funções relacionadas à classificação.

  • Selecionar colunas com o mesmo nome retornará apenas uma coluna. Como solução alternativa, renomeie as colunas para que tenham nomes distintos usando Column.alias.

    df.select(lit(1), lit(1)).show() # col("a"), col("a")
    #---------
    #|"'1'"  |
    #---------
    #|1      |
    #|...    |
    #---------
    
    # Workaround: Column.alias
    DataFrame.select(lit(1).alias("col1_1"), lit(1).alias("col1_2"))
    # "col1_1", "col1_2"
    
    Copy
  • A conversão de tipo explícito usando Column.cast tem a limitação de que as cadeias de caracteres de formato não são suportadas para entradas: to_decimal, to_number, to_numeric, to_double, to_date, to_time, to_timestamp e saídas: to_char, to_varchar, to_binary.

  • Cadeias de caracteres JSON armazenadas em VariantType não podem ser convertidas em tipos Datetime.

  • Para Table.merge e Table.update, a implementação oferece suporte apenas ao comportamento quando os parâmetros de sessão ERROR_ON_NONDETERMINISTIC_UPDATE e ERROR_ON_NONDETERMINISTIC_MERGE estão definidos como False. Isso significa que para junções múltiplas, ele atualiza uma das linhas correspondentes.

Lista de APIs suportadas

Sessão de Snowpark

Session.createDataFrame

Session.create_dataframe

Session.flatten

Session.range

Session.table

Entrada/Saída

DataFrameReader.csv

DataFrameReader.table

DataFrameWriter.saveAsTable

DataFrameWriter.save_as_table

DataFrame

DataFrame.agg

DataFrame.cache_result

DataFrame.col

DataFrame.collect

DataFrame.collect_nowait

DataFrame.copy_into_table

DataFrame.count

DataFrame.createOrReplaceTempView

DataFrame.createOrReplaceView

DataFrame.create_or_replace_temp_view

DataFrame.create_or_replace_view

DataFrame.crossJoin

DataFrame.cross_join

DataFrame.distinct

DataFrame.drop

DataFrame.dropDuplicates

DataFrame.drop_duplicates

DataFrame.dropna

DataFrame.except_

DataFrame.explain

DataFrame.fillna

DataFrame.filter

DataFrame.first

DataFrame.groupBy

DataFrame.group_by

DataFrame.intersect

DataFrame.join

DataFrame.limit

DataFrame.minus

DataFrame.natural_join

DataFrame.orderBy

DataFrame.order_by

DataFrame.rename

DataFrame.replace

DataFrame.rollup

DataFrame.sample

DataFrame.select

DataFrame.show

DataFrame.sort

DataFrame.subtract

DataFrame.take

DataFrame.toDF

DataFrame.toLocalIterator

DataFrame.toPandas

DataFrame.to_df

DataFrame.to_local_iterator

DataFrame.to_pandas

DataFrame.to_pandas_batches

DataFrame.union

DataFrame.unionAll

DataFrame.unionAllByName

DataFrame.unionByName

DataFrame.union_all

DataFrame.union_all_by_name

DataFrame.union_by_name

DataFrame.unpivot

DataFrame.where

DataFrame.withColumn

DataFrame.withColumnRenamed

DataFrame.with_column

DataFrame.with_column_renamed

DataFrame.with_columns

DataFrameNaFunctions.drop

DataFrameNaFunctions.fill

DataFrameNaFunctions.replace

Coluna

Column.alias

Column.as_

Column.asc

Column.asc_nulls_first

Column.asc_nulls_last

Column.astype

Column.between

Column.bitand

Column.bitor

Column.bitwiseAnd

Column.bitwiseOR

Column.bitwiseXOR

Column.bitxor

Column.cast

Column.collate

Column.desc

Column.desc_nulls_first

Column.desc_nulls_last

Column.endswith

Column.eqNullSafe

Column.equal_nan

Column.equal_null

Column.getItem

Column.getName

Column.get_name

Column.in_

Column.isNotNull

Column.isNull

Column.is_not_null

Column.is_null

Column.isin

Column.like

Column.name

Column.over

Column.regexp

Column.rlike

Column.startswith

Column.substr

Column.substring

Column.try_cast

Column.within_group

CaseExpr.when

CaseExpr.otherwise

Tipos de dados

ArrayType

BinaryType

BooleanType

ByteType

ColumnIdentifier

DataType

DateType

DecimalType

DoubleType

FloatType

IntegerType

LongType

MapType

NullType

ShortType

StringType

StructField

StructType

Timestamp

TimestampType

TimeType

Variant

VariantType

Linha

Row.asDict

Row.as_dict

Row.count

Row.index

Função

abs

avg

coalesce

contains

count

count_distinct

covar_pop

endswith

first_value

iff

lag

last_value

lead

list_agg

max

median

min

parse_json

row_number

startswith

substring

sum

to_array

to_binary

to_boolean

to_char

to_date

to_decimal

to_double

to_object

to_time

to_timestamp

to_variant

Janela

Window.orderBy

Window.order_by

Window.partitionBy

Window.partition_by

Window.rangeBetween

Window.range_between

Window.rowsBetween

Window.rows_between

WindowSpec.orderBy

WindowSpec.order_by

WindowSpec.partitionBy

WindowSpec.partition_by

WindowSpec.rangeBetween

WindowSpec.range_between

WindowSpec.rowsBetween

WindowSpec.rows_between

Agrupamento

RelationalGroupedDataFrame.agg

RelationalGroupedDataFrame.apply_in_pandas

RelationalGroupedDataFrame.applyInPandas

RelationalGroupedDataFrame.avg

RelationalGroupedDataFrame.builtin

RelationalGroupedDataFrame.count

RelationalGroupedDataFrame.function

RelationalGroupedDataFrame.max

RelationalGroupedDataFrame.mean

RelationalGroupedDataFrame.median

RelationalGroupedDataFrame.min

RelationalGroupedDataFrame.sum

Tabela

Table.delete

Table.drop_table

Table.merge

Table.sample

Table.update

WhenMatchedClause.delete

WhenMatchedClause.update

WhenNotMatchedClause.insert