Criação de funções definidas pelo usuário (UDFs) para DataFrames em Python¶
A API do Snowpark fornece métodos que você pode usar para criar uma função definida pelo usuário a partir de uma lambda ou função em Python. Este tópico explica como criar esses tipos de funções.
Neste tópico:
Introdução¶
Com o Snowpark, você pode criar funções definidas pelo usuário (UDFs) para suas lambdas e funções personalizadas, e pode chamar essas UDFs para processar os dados em seu DataFrame.
Quando você usa a API do Snowpark para criar uma UDF, a biblioteca do Snowpark carrega o código de sua função para um estágio interno. Quando você chama a UDF, a biblioteca do Snowpark executa sua função no servidor, onde os dados estão. Como resultado, os dados não precisam ser transferidos para o cliente para que a função possa processar os dados.
Em seu código personalizado, você também pode importar módulos de arquivos Python ou pacotes de terceiros.
Você pode criar uma UDF para seu código personalizado de duas maneiras:
Você pode criar uma UDF anônima e atribuir a função a uma variável. Desde que essa variável esteja no escopo, você pode usar a variável para chamar a UDF.
Você pode criar uma UDF nomeada e chamar a UDF pelo nome. Você pode usar isso se, por exemplo, precisar chamar uma UDF pelo nome ou usar a UDF em uma sessão posterior.
As próximas seções explicam como criar estas UDFs usando um ambiente de desenvolvimento local ou usando uma planilha Python.
Observe que se você definiu uma UDF executando o comando CREATE FUNCTION
, você pode chamar essa UDF no Snowpark. Para obter mais detalhes, consulte Como chamar funções definidas pelo usuário (UDFs).
Nota
As UDFs vetorizadas de Python permitem que você defina funções Python que recebem lotes de linhas de entrada como DataFrames do Pandas. Isso resulta em um desempenho muito melhor com cenários de inferência de machine learning. Para obter mais informações, consulte Uso de UDFs vetorizadas.
Nota
Se você estiver trabalhando com uma planilha Python, use esses exemplos dentro da função do manipulador:
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
def main(session: snowpark.Session):
df_table = session.table("sample_product_data")
Se os exemplos retornarem algo diferente de um DataFrame, como um objeto list
de Row
, altere o tipo de retorno para corresponder ao tipo de retorno do exemplo.
Depois de executar um exemplo de código, use a guia Results para visualizar qualquer saída retornada. Consulte Execução das planilhas Python para obter mais detalhes.
Especificação de dependências para uma UDF¶
Para definir uma UDF usando a API do Snowpark, você deve importar os arquivos que contenham quaisquer módulos dos quais sua UDF dependa, tais como arquivos Python, arquivos zip, arquivos de recursos etc.
Para fazer isto usando planilhas Python, consulte Como adicionar um arquivo Python de um estágio a uma planilha.
Para fazer isto usando seu ambiente de desenvolvimento local, você deve chamar
Session.add_import()
em seu código.
Você também pode especificar um diretório, e a biblioteca do Snowpark o comprimirá automaticamente e o carregará como um arquivo zip. (Para obter mais detalhes sobre recursos de leitura de uma UDF, consulte Como ler arquivos com uma UDF).
Quando você chama Session.add_import()
, a biblioteca do Snowpark carrega os arquivos especificados em um estágio interno e importa os arquivos ao executar sua UDF.
O exemplo a seguir demonstra como adicionar um arquivo zip em um estágio como uma dependência em relação ao seu código:
>>> # Add a zip file that you uploaded to a stage.
>>> session.add_import("@my_stage/<path>/my_library.zip")
Os exemplos a seguir demonstram como adicionar um arquivo de Python a partir de sua máquina local:
>>> # Import a Python file from your local machine.
>>> session.add_import("/<path>/my_module.py")
>>> # Import a Python file from your local machine and specify a relative Python import path.
>>> session.add_import("/<path>/my_module.py", import_path="my_dir.my_module")
Os exemplos a seguir demonstram como adicionar outros tipos de dependências:
>>> # Add a directory of resource files.
>>> session.add_import("/<path>/my-resource-dir/")
>>> # Add a resource file.
>>> session.add_import("/<path>/my-resource.xml")
Nota
A biblioteca Python Snowpark não é carregada automaticamente.
Você não precisa especificar as seguintes dependências:
Suas bibliotecas incluídas no Python.
Essas bibliotecas já estão disponíveis no ambiente de runtime no servidor onde suas UDFs são executadas.
Como usar pacotes de terceiros do Anaconda em uma UDF¶
Você pode usar pacotes de terceiros do canal Snowflake Anaconda em uma UDF.
Se você criar uma UDF Python em uma planilha Python, os pacotes Anaconda já estarão disponíveis em sua planilha. Consulte Como adicionar um arquivo Python de um estágio a uma planilha.
Se você criar uma UDF Python em seu ambiente de desenvolvimento local, você poderá especificar quais pacotes Anaconda serão instalados.
Quando consultas que chamam UDFs de Python são executadas dentro de um warehouse do Snowflake, os pacotes do Anaconda são instalados sem problemas e armazenados em cache no warehouse virtual em seu nome.
Para obter mais informações sobre práticas recomendadas, como visualizar os pacotes disponíveis e como criar um ambiente de desenvolvimento local, consulte Como usar pacotes de terceiros.
Se você escrever uma UDF Python em seu ambiente de desenvolvimento local, use session.add_packages
para adicionar pacotes em nível de sessão.
Este exemplo de código mostra como importar pacotes e retornar suas versões.
>>> import numpy as np
>>> import pandas as pd
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import udf
>>> session.add_packages("numpy", "pandas", "xgboost==1.5.0")
>>> @udf
... def compute() -> list:
... return [np.__version__, pd.__version__, xgb.__version__]
Você também pode usar session.add_requirements
para especificar pacotes com um arquivo requirements.
>>> session.add_requirements("mydir/requirements.txt")
Você pode adicionar os pacotes no nível da UDF para substituir os pacotes no nível da sessão que você possa ter adicionado anteriormente.
>>> import numpy as np
>>> import pandas as pd
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import udf
>>> @udf(packages=["numpy", "pandas", "xgboost==1.5.0"])
... def compute() -> list:
... return [np.__version__, pd.__version__, xgb.__version__]
Importante
Se você não especificar uma versão de pacote, o Snowflake usará a versão mais recente ao resolver as dependências. Ao implementar a UDF para produção, você pode querer garantir que seu código sempre utilize as mesmas versões de dependência. Você pode fazer isso tanto para UDFs permanentes como para temporárias.
Quando você cria uma UDF permanente, a UDF é criada e registrada apenas uma vez. Isso resolve as dependências uma vez e a versão selecionada é usada para cargas de trabalho de produção. Quando a UDF é executada, ela sempre usará as mesmas versões de dependência.
Quando você criar uma UDF temporária, especifique as versões de dependência como parte da especificação da versão. Desta forma, quando a UDF for registrada, a resolução do pacote usará a versão especificada. Se você não especificar a versão, a dependência poderá ser atualizada quando uma nova versão estiver disponível.
Como criar uma UDF anônima¶
Para criar uma UDF anônima, você pode:
Chamar a função
udf
no módulosnowflake.snowpark.functions
passando a definição da função anônima.Chamar o método
register
na classeUDFRegistration
passando a definição da função anônima.
Aqui está um exemplo de uma UDF anônima:
>>> from snowflake.snowpark.types import IntegerType
>>> from snowflake.snowpark.functions import udf
>>> add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()])
Nota
Ao escrever um código que possa ser executado em várias sessões, use o método register
para registrar UDFs em vez de usar a função udf
. Isso pode evitar erros nos quais o objeto Session
padrão do Snowflake não pode ser encontrado.
Criação e registro de uma UDF nomeada¶
Se você quiser chamar uma UDF pelo nome (por exemplo, usando a função call_udf
no módulo functions
), você pode criar e registrar uma UDF nomeada. Para fazer isso, use uma das seguintes opções:
O método
register
, na classeUDFRegistration
, com o argumentoname
.A função
udf
, no módulosnowflake.snowpark.functions
, com o argumentoname
.
Para acessar um atributo ou método da classe UDFRegistration
, chame a propriedade udf
da classe Session
.
Chamar register
ou udf
criará uma UDF temporária que você pode usar na sessão atual.
Para criar uma UDF permanente, chame o método register
ou a função udf
e defina o argumento is_permanent
como True
. Quando você cria uma UDF permanente, você também deve definir o argumento stage_location
como o local do estágio onde o arquivo de Python para a UDF e suas dependências são carregados.
Aqui está um exemplo de como registrar uma UDF nomeada temporária:
>>> from snowflake.snowpark.types import IntegerType
>>> from snowflake.snowpark.functions import udf
>>> add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()], name="my_udf", replace=True)
Aqui está um exemplo de como registrar uma UDF nomeada permanente definindo o argumento is_permanent
como True
:
>>> @udf(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True)
... def minus_one(x: int) -> int:
... return x-1
Aqui está um exemplo de como essas UDFs são chamadas:
>>> df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b")
>>> df.select(add_one("a"), minus_one("b")).collect()
[Row(MY_UDF("A")=2, MINUS_ONE("B")=1), Row(MY_UDF("A")=4, MINUS_ONE("B")=3)]
Você também pode chamar a UDF usando SQL:
>>> session.sql("select minus_one(1)").collect()
[Row(MINUS_ONE(1)=0)]
Criação de um UDF a partir de um arquivo de origem Python¶
Se você criar sua UDF em seu ambiente de desenvolvimento local, você também poderá definir seu manipulador da UDF em um arquivo Python e depois usar o método register_from_file
na classe UDFRegistration
para criar uma UDF.
Nota
Você não pode usar este método em uma planilha Python.
Aqui estão exemplos da utilização de register_from_file
.
Suponha que você tenha um arquivo Python test_udf_file.py
que contenha:
def mod5(x: int) -> int:
return x % 5
Depois você pode criar uma UDF a partir desta função do arquivo test_udf_file.py
.
>>> # mod5() in that file has type hints
>>> mod5_udf = session.udf.register_from_file(
... file_path="tests/resources/test_udf_dir/test_udf_file.py",
... func_name="mod5",
... )
>>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
Você também pode carregar o arquivo em um local preparado, depois usá-lo para criar a UDF.
>>> from snowflake.snowpark.types import IntegerType
>>> # suppose you have uploaded test_udf_file.py to stage location @mystage.
>>> mod5_udf = session.udf.register_from_file(
... file_path="@mystage/test_udf_file.py",
... func_name="mod5",
... return_type=IntegerType(),
... input_types=[IntegerType()],
... )
>>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
Como ler arquivos com uma UDF¶
Para ler o conteúdo de um arquivo, seu código Python pode:
Ler um arquivo especificado estaticamente importando um arquivo e, em seguida, lendo-o do diretório pessoal da UDF.
Ler um arquivo especificado dinamicamente com SnowflakeFile. Você pode fazer isso se precisar acessar um arquivo durante a computação.
Como ler arquivos especificados estaticamente¶
A biblioteca Snowpark carrega e executa UDFs no servidor. Se sua UDF precisa ler dados de um arquivo, você deve assegurar-se de que o arquivo seja carregado com a UDF.
Nota
Se você escrever sua UDF em uma planilha Python, a UDF só poderá ler arquivos de um estágio.
Para configurar uma UDF para ler um arquivo:
Especifique que o arquivo é uma dependência que faz o upload do arquivo para o servidor. Para obter mais informações, consulte Especificação de dependências para uma UDF.
Por exemplo:
>>> # Import a file from your local machine as a dependency. >>> session.add_import("/<path>/my_file.txt") >>> # Or import a file that you uploaded to a stage as a dependency. >>> session.add_import("@my_stage/<path>/my_file.txt")
Na UDF, leia o arquivo. No exemplo a seguir, o arquivo só será lido uma vez durante a criação do UDF, e não será lido novamente durante a execução do UDF. Isso é possível com uma biblioteca de terceiros cachetools.
>>> import sys >>> import os >>> import cachetools >>> from snowflake.snowpark.types import StringType >>> @cachetools.cached(cache={}) ... def read_file(filename): ... import_dir = sys._xoptions.get("snowflake_import_directory") ... if import_dir: ... with open(os.path.join(import_dir, filename), "r") as f: ... return f.read() >>> >>> # create a temporary text file for test >>> temp_file_name = "/tmp/temp.txt" >>> with open(temp_file_name, "w") as t: ... _ = t.write("snowpark") >>> session.add_import(temp_file_name) >>> session.add_packages("cachetools") >>> >>> def add_suffix(s): ... return f"{read_file(os.path.basename(temp_file_name))}-{s}" >>> >>> concat_file_content_with_str_udf = session.udf.register( ... add_suffix, ... return_type=StringType(), ... input_types=[StringType()] ... ) >>> >>> df = session.create_dataframe(["snowflake", "python"], schema=["a"]) >>> df.select(concat_file_content_with_str_udf("a")).to_df("col1").collect() [Row(COL1='snowpark-snowflake'), Row(COL1='snowpark-python')] >>> os.remove(temp_file_name) >>> session.clear_imports()
Como ler arquivos especificados dinamicamente com SnowflakeFile
¶
Você pode ler um arquivo de um estágio usando a classe SnowflakeFile
no módulo do Snowpark snowflake.snowpark.files
. A classe SnowflakeFile
fornece acesso dinâmico a arquivos, o que permite transmitir arquivos de qualquer tamanho. O acesso dinâmico a arquivos também é útil quando você deseja iterar vários arquivos. Para exemplos, consulte Processamento de vários arquivos.
Para obter mais informações e exemplos de leitura de arquivos usando SnowflakeFile
, consulte Como ler um arquivo usando a classe SnowflakeFile em um manipulador de UDF Pythin.
O exemplo a seguir registra uma UDF temporária que lê um arquivo de texto de um estágio usando SnowflakeFile
e retorna o tamanho do arquivo.
Registre a UDF:
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import udf
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark.types import StringType, IntegerType
@udf(name="get_file_length", replace=True, input_types=[StringType()], return_type=IntegerType(), packages=['snowflake-snowpark-python'])
def get_file_length(file_path):
with SnowflakeFile.open(file_path) as f:
s = f.read()
return len(s);
Chame a UDF:
session.sql("select get_file_length(build_scoped_file_url(@my_stage, 'example-file.txt'));")
Uso de UDFs vetorizadas¶
As UDFs de Python vetorizadas permitem que você defina funções Python que recebem lotes de linhas de entrada como DataFrames do Pandas e retornam lotes de resultados como matrizes ou séries do Pandas. A coluna no dataframe
do Snowpark será vetorizada como uma Series do Pandas dentro da UDF.
Aqui está um exemplo de como usar a interface de lote:
from sklearn.linear_model import LinearRegression
model = LinearRegression()
model.fit(X, y)
@udf(packages=['pandas', 'scikit-learn','xgboost'])
def predict(df: PandasDataFrame[float, float, float, float]) -> PandasSeries[float]:
# The input pandas DataFrame doesn't include column names. Specify the column names explicitly when needed.
df.columns = ["col1", "col2", "col3", "col4"]
return model.predict(df)
Você chama UDFs vetorizadas de Python da mesma forma que você chama outras UDFs de Python. Para obter mais informações, consulte UDFs vetorizadas de Python, que explica como criar um UDF vetorizado usando uma instrução SQL. Por exemplo, você pode usar o decorador vectorized
quando especificar o código Python na instrução SQL. Ao usar a API Snowpark Python descrita neste documento, você não usará uma instrução SQL para criar uma UDF vetorizada. Então você não usa o decorador vectorized
.
É possível limitar o número de linhas por lote. Para obter mais informações, consulte Como definir um tamanho alvo de lote.
Para obter mais explicações e exemplos de como usar a API Snowpark Python para criar UDFs vetorizados, consulte a seção de UDFs da referência de API Snowpark.