Estrutura de teste local¶
Este tópico explica como testar seu código localmente ao trabalhar com a biblioteca Snowpark.
Neste tópico:
A estrutura de testes locais do Snowpark Python permite criar e operar em DataFrames do Snowpark Python localmente sem se conectar a uma conta Snowflake. Você pode usar a estrutura de teste local para testar suas operações de DataFrame localmente, em sua máquina de desenvolvimento ou em um pipeline de CI (integração contínua), antes de implantar alterações de código em sua conta. A API é a mesma, então você pode executar seus testes localmente ou em uma conta Snowflake, sem fazer alterações no código.
Pré-requisitos¶
Para usar a estrutura de teste local:
Você deve usar a versão 1.11.1 ou superior da biblioteca Snowpark Python com a dependência opcional
pandas
. Instalação executandopip install "snowflake-snowpark-python[pandas]"
As versões suportadas do Python são:
3.8
3.9
3,10
3,11
Criação de uma sessão e ativação de testes locais¶
Para começar, crie uma Session
Snowpark e defina a configuração de teste local como True
.
from snowflake.snowpark import Session
session = Session.builder.config('local_testing', True).create()
Depois que a sessão for criada, você poderá usá-la para criar e operar em DataFrames.
df = session.create_dataframe([[1,2],[3,4]],['a','b'])
df.with_column('c', df['a']+df['b']).show()
Carregamento de dados¶
Você pode criar DataFrames do Snowpark a partir de primitivos Python, arquivos e DataFrames Pandas. Isto é útil para especificar a entrada e a saída esperada dos casos de teste. Ao fazer isso dessa forma, os dados ficam no controle de origem, o que torna mais fácil manter os dados de teste sincronizados com os casos de teste.
Carregamento de dados CSV¶
Você pode carregar arquivos CSV em um DataFrame do Snowpark chamando primeiro Session.file.put()
para carregar o arquivo no estágio na memória e depois usando Session.read()
para ler o conteúdo. Suponha que haja um arquivo, data.csv
, e que o arquivo tenha o seguinte conteúdo:
col1,col2,col3,col4
1,a,true,1.23
2,b,false,4.56
Você pode carregar data.csv
em um DataFrame do Snowpark usando o código a seguir. Você precisa primeiro colocar o arquivo em um estágio. Caso contrário, você receberá um erro de arquivo não encontrado.
from snowflake.snowpark.types import StructType, StructField, IntegerType, BooleanType, StringType, DoubleType
# Put file onto stage
session.file.put("data.csv", "@mystage", auto_compress=False)
schema = StructType(
[
StructField("col1", IntegerType()),
StructField("col2", StringType()),
StructField("col3", BooleanType()),
StructField("col4", DoubleType()),
]
)
# with option SKIP_HEADER set to 1, the header will be skipped when the csv file is loaded
dataframe = session.read.schema(schema).option("SKIP_HEADER", 1).csv("@mystage/data.csv")
dataframe.show()
A saída de dataframe.show()
será:
-------------------------------------
|"COL1" |"COL2" |"COL3" |"COL4" |
-------------------------------------
|1 |a |True |1.23 |
|2 |b |False |4.56 |
-------------------------------------
Carregamento de dados do Pandas¶
Você pode criar um DataFrame do Snowpark Python a partir de um Pandas DataFrame chamando o método create_dataframe
e passando os dados como um Pandas DataFrame.
import pandas as pd
pandas_df = pd.DataFrame(
data={
"col1": pd.Series(["value1", "value2"]),
"col2": pd.Series([1.23, 4.56]),
"col3": pd.Series([123, 456]),
"col4": pd.Series([True, False]),
}
)
dataframe = session.create_dataframe(data=pandas_df)
dataframe.show()
A saída de dataframe.show()
é a seguinte:
-------------------------------------
|"col1" |"col2" |"col3" |"col4" |
-------------------------------------
|value1 |1.23 |123 |True |
|value2 |4.56 |456 |False |
-------------------------------------
Um DataFrame do Snowpark Python também pode ser convertido em um Pandas DataFrame chamando o método to_pandas
no DataFrame.
from snowflake.snowpark.types import StructType, StructField, StringType, DoubleType, LongType, BooleanType
dataframe = session.create_dataframe(
data=[
["value1", 1.23, 123, True],
["value2", 4.56, 456, False],
],
schema=StructType([
StructField("col1", StringType()),
StructField("col2", DoubleType()),
StructField("col3", LongType()),
StructField("col4", BooleanType()),
])
)
pandas_dataframe = dataframe.to_pandas()
print(pandas_dataframe.to_string())
A chamada para print(pandas_dataframe.to_string())
gera a seguinte saída:
COL1 COL2 COL3 COL4
0 value1 1.23 123 True
1 value2 4.56 456 False
Criação de um acessório do PyTest para sessão¶
Os acessórios do PyTest são funções executadas antes de um teste (ou módulo de testes), normalmente para fornecer dados ou conexões para testes. Neste caso, crie um acessório que retorne um objeto Snowpark Session
. Primeiro, crie um diretório test
se ainda não tiver um. Em seguida, no diretório test
, crie um arquivo conftest.py
com o seguinte conteúdo, onde connection_parameters
é um dicionário com as credenciais da sua conta Snowflake. Para obter mais informações sobre o formato do dicionário, consulte Como criar uma sessão.
# test/conftest.py
import pytest
from snowflake.snowpark.session import Session
def pytest_addoption(parser):
parser.addoption("--snowflake-session", action="store", default="live")
@pytest.fixture(scope='module')
def session(request) -> Session:
if request.config.getoption('--snowflake-session') == 'local':
return Session.builder.config('local_testing', True).create()
else:
return Session.builder.configs(CONNECTION_PARAMETERS).create()
A chamada para pytest_addoption
adiciona uma opção de linha de comando chamada snowflake-session
ao comando pytest
. O acessório Session
verifica esta opção de linha de comando e cria um Session
local ou ativo dependendo de seu valor. Isso permite alternar facilmente entre os modos local e ativo para teste.
# Using local mode:
pytest --snowflake-session local
# Using live mode
pytest
Operações SQL¶
Session.sql(...)
não é compatível com a estrutura de teste local. Use as APIs do DataFrame do Snowpark sempre que possível e, nos casos em que você deve usar Session.sql(...)
, você pode simular o valor de retorno tabular usando unittest.mock.patch
do Python para corrigir a resposta esperada de uma determinada chamada de Session.sql()
.
No exemplo abaixo, mock_sql()
mapeia o texto da consulta SQL para a resposta de DataFrame desejada. A instrução condicional a seguir verifica se a sessão atual está usando testes locais e, em caso afirmativo, aplica o patch ao método Session.sql()
.
from unittest import mock
from functools import partial
def test_something(pytestconfig, session):
def mock_sql(session, sql_string): # patch for SQL operations
if sql_string == "select 1,2,3":
return session.create_dataframe([[1,2,3]])
else:
raise RuntimeError(f"Unexpected query execution: {sql_string}")
if pytestconfig.getoption('--snowflake-session') == 'local':
with mock.patch.object(session, 'sql', wraps=partial(mock_sql, session)): # apply patch for SQL operations
assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
else:
assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
Quando o teste local está ativado, todas as tabelas criadas por DataFrame.save_as_table()
são salvas como tabelas temporárias na memória e podem ser recuperadas usando Session.table()
. Você pode usar as operações de DataFrame suportadas na tabela normalmente.
Patch de funções internas¶
Nem todas as funções internas em snowflake.snowpark.functions
são suportadas na estrutura de teste local. Se você usar uma função que não é suportada, será necessário usar o decorador @patch
de snowflake.snowpark.mock
para criar um patch.
Para definir e implementar a função com patch, a assinatura (lista de parâmetros) deve estar alinhada com os parâmetros da função interna. A estrutura de teste local passa parâmetros para a função com patch usando as seguintes regras:
Para parâmetros do tipo
ColumnOrName
na assinatura de funções internas,ColumnEmulator
é passado como parâmetro das funções com patch.ColumnEmulator
é semelhante a um objetopandas.Series
que contém os dados da coluna.Para parâmetros do tipo
LiteralType
na assinatura de funções internas, o valor literal é passado como parâmetro das funções com patch.Caso contrário, o valor bruto é passado como parâmetro das funções com patch.
Quanto ao tipo de retorno das funções com patch, o retorno de uma instância de ColumnEmulator
é esperado em correspondência com o tipo de retorno de Column
das funções internas.
Por exemplo, a função interna to_timestamp()
poderia receber um patch assim:
import datetime
from snowflake.snowpark.mock import patch, ColumnEmulator, ColumnType
from snowflake.snowpark.functions import to_timestamp
from snowflake.snowpark.types import TimestampType
@patch(to_timestamp)
def mock_to_timestamp(column: ColumnEmulator, format = None) -> ColumnEmulator:
ret_column = ColumnEmulator(data=[datetime.datetime.strptime(row, '%Y-%m-%dT%H:%M:%S%z') for row in column])
ret_column.sf_type = ColumnType(TimestampType(), True)
return ret_column
Como ignorar casos de teste¶
Se seu conjunto de testes PyTest contiver um caso de teste que não seja bem suportado por testes locais, você poderá ignorar esses casos usando o decorador mark.skipif
de PyTest. O exemplo abaixo pressupõe que você tenha configurado sua sessão e parâmetros conforme descrito anteriormente. A condição verifica se local_testing_mode
está definido como local
e, em caso afirmativo, o caso de teste é ignorado com uma mensagem explicando por que foi ignorado.
import pytest
@pytest.mark.skipif(
condition="config.getvalue('local_testing_mode') == 'local'",
reason="Test case disabled for local testing"
)
def test_case(session):
...
Limitações¶
A seguinte funcionalidade não é suportada:
Cadeias de caracteres SQL brutas e operações que exigem análise de cadeias de caracteres SQL. Por exemplo,
session.sql
eDataFrame.filter("col1 > 12")
não são suportados.UDFs, UDTFs e procedimentos armazenados.
Funções de tabela.
AsyncJobs.
Operações de sessão, como alteração de warehouses, esquemas e outras propriedades de sessão.
Tipos de dados
Geometry
eGeography
.Agregando funções de janela.
# Selecting window function expressions is supported df.select("key", "value", sum_("value").over(), avg("value").over()) # Aggregating window function expressions is NOT supported df.group_by("key").agg([sum_("value"), sum_(sum_("value")).over(window) - sum_("value")])
Outras limitações são:
Tipos de dados
Variant
,Array
eObject
só são suportados com codificação e decodificação JSON padrão. Expressões como [1,2,,3,] são consideradas como JSON válido no Snowflake, mas não em testes locais, onde as funcionalidades JSON internas do Python são usadas. Você pode especificar as variáveis de nível de módulosnowflake.snowpark.mock.CUSTOM_JSON_ENCODER
esnowflake.snowpark.mock.CUSTOM_JSON_DECODER
para substituir as configurações padrão.Apenas um subconjunto de funções do Snowflake (incluindo funções de janela) é implementado. Consulte Patch de funções internas para saber como injetar sua própria definição de função.
Atualmente, não há suporte para patch de funções relacionadas à classificação.
Selecionar colunas com o mesmo nome retornará apenas uma coluna. Como solução alternativa, renomeie as colunas para que tenham nomes distintos usando
Column.alias
.df.select(lit(1), lit(1)).show() # col("a"), col("a") #--------- #|"'1'" | #--------- #|1 | #|... | #--------- # Workaround: Column.alias DataFrame.select(lit(1).alias("col1_1"), lit(1).alias("col1_2")) # "col1_1", "col1_2"
A conversão de tipo explícito usando
Column.cast
tem a limitação de que as cadeias de caracteres de formato não são suportadas para entradas:to_decimal
,to_number
,to_numeric
,to_double
,to_date
,to_time
,to_timestamp
e saídas:to_char
,to_varchar
,to_binary
.Cadeias de caracteres JSON armazenadas em
VariantType
não podem ser convertidas em tiposDatetime
.Para
Table.merge
eTable.update
, a implementação oferece suporte apenas ao comportamento quando os parâmetros de sessãoERROR_ON_NONDETERMINISTIC_UPDATE
eERROR_ON_NONDETERMINISTIC_MERGE
estão definidos comoFalse
. Isso significa que para junções múltiplas, ele atualiza uma das linhas correspondentes.
Lista de APIs suportadas¶
Sessão de Snowpark¶
Session.createDataFrame
Session.create_dataframe
Session.flatten
Session.range
Session.table
Entrada/Saída¶
DataFrameReader.csv
DataFrameReader.table
DataFrameWriter.saveAsTable
DataFrameWriter.save_as_table
DataFrame¶
DataFrame.agg
DataFrame.cache_result
DataFrame.col
DataFrame.collect
DataFrame.collect_nowait
DataFrame.copy_into_table
DataFrame.count
DataFrame.createOrReplaceTempView
DataFrame.createOrReplaceView
DataFrame.create_or_replace_temp_view
DataFrame.create_or_replace_view
DataFrame.crossJoin
DataFrame.cross_join
DataFrame.distinct
DataFrame.drop
DataFrame.dropDuplicates
DataFrame.drop_duplicates
DataFrame.dropna
DataFrame.except_
DataFrame.explain
DataFrame.fillna
DataFrame.filter
DataFrame.first
DataFrame.groupBy
DataFrame.group_by
DataFrame.intersect
DataFrame.join
DataFrame.limit
DataFrame.minus
DataFrame.natural_join
DataFrame.orderBy
DataFrame.order_by
DataFrame.rename
DataFrame.replace
DataFrame.rollup
DataFrame.sample
DataFrame.select
DataFrame.show
DataFrame.sort
DataFrame.subtract
DataFrame.take
DataFrame.toDF
DataFrame.toLocalIterator
DataFrame.toPandas
DataFrame.to_df
DataFrame.to_local_iterator
DataFrame.to_pandas
DataFrame.to_pandas_batches
DataFrame.union
DataFrame.unionAll
DataFrame.unionAllByName
DataFrame.unionByName
DataFrame.union_all
DataFrame.union_all_by_name
DataFrame.union_by_name
DataFrame.unpivot
DataFrame.where
DataFrame.withColumn
DataFrame.withColumnRenamed
DataFrame.with_column
DataFrame.with_column_renamed
DataFrame.with_columns
DataFrameNaFunctions.drop
DataFrameNaFunctions.fill
DataFrameNaFunctions.replace
Coluna¶
Column.alias
Column.as_
Column.asc
Column.asc_nulls_first
Column.asc_nulls_last
Column.astype
Column.between
Column.bitand
Column.bitor
Column.bitwiseAnd
Column.bitwiseOR
Column.bitwiseXOR
Column.bitxor
Column.cast
Column.collate
Column.desc
Column.desc_nulls_first
Column.desc_nulls_last
Column.endswith
Column.eqNullSafe
Column.equal_nan
Column.equal_null
Column.getItem
Column.getName
Column.get_name
Column.in_
Column.isNotNull
Column.isNull
Column.is_not_null
Column.is_null
Column.isin
Column.like
Column.name
Column.over
Column.regexp
Column.rlike
Column.startswith
Column.substr
Column.substring
Column.try_cast
Column.within_group
CaseExpr.when
CaseExpr.otherwise
Tipos de dados¶
ArrayType
BinaryType
BooleanType
ByteType
ColumnIdentifier
DataType
DateType
DecimalType
DoubleType
FloatType
IntegerType
LongType
MapType
NullType
ShortType
StringType
StructField
StructType
Timestamp
TimestampType
TimeType
Variant
VariantType
Linha¶
Row.asDict
Row.as_dict
Row.count
Row.index
Função¶
abs
avg
coalesce
contains
count
count_distinct
covar_pop
endswith
first_value
iff
lag
last_value
lead
list_agg
max
median
min
parse_json
row_number
startswith
substring
sum
to_array
to_binary
to_boolean
to_char
to_date
to_decimal
to_double
to_object
to_time
to_timestamp
to_variant
Janela¶
Window.orderBy
Window.order_by
Window.partitionBy
Window.partition_by
Window.rangeBetween
Window.range_between
Window.rowsBetween
Window.rows_between
WindowSpec.orderBy
WindowSpec.order_by
WindowSpec.partitionBy
WindowSpec.partition_by
WindowSpec.rangeBetween
WindowSpec.range_between
WindowSpec.rowsBetween
WindowSpec.rows_between
Agrupamento¶
RelationalGroupedDataFrame.agg
RelationalGroupedDataFrame.apply_in_pandas
RelationalGroupedDataFrame.applyInPandas
RelationalGroupedDataFrame.avg
RelationalGroupedDataFrame.builtin
RelationalGroupedDataFrame.count
RelationalGroupedDataFrame.function
RelationalGroupedDataFrame.max
RelationalGroupedDataFrame.mean
RelationalGroupedDataFrame.median
RelationalGroupedDataFrame.min
RelationalGroupedDataFrame.sum
Tabela¶
Table.delete
Table.drop_table
Table.merge
Table.sample
Table.update
WhenMatchedClause.delete
WhenMatchedClause.update
WhenNotMatchedClause.insert