Criação de funções definidas pelo usuário (UDFs) para DataFrames em Python

A API do Snowpark fornece métodos que você pode usar para criar uma função definida pelo usuário a partir de uma lambda ou função em Python. Este tópico explica como criar esses tipos de funções.

Neste tópico:

Introdução

Com o Snowpark, você pode criar funções definidas pelo usuário (UDFs) para suas lambdas e funções personalizadas, e pode chamar essas UDFs para processar os dados em seu DataFrame.

Quando você usa a API do Snowpark para criar uma UDF, a biblioteca do Snowpark carrega o código de sua função para um estágio interno. Quando você chama a UDF, a biblioteca do Snowpark executa sua função no servidor, onde os dados estão. Como resultado, os dados não precisam ser transferidos para o cliente para que a função possa processar os dados.

Em seu código personalizado, você também pode importar módulos de arquivos Python ou pacotes de terceiros.

Você pode criar uma UDF para seu código personalizado de duas maneiras:

  • Você pode criar uma UDF anônima e atribuir a função a uma variável. Desde que essa variável esteja no escopo, você pode usar a variável para chamar a UDF.

  • Você pode criar uma UDF nomeada e chamar a UDF pelo nome. Você pode usar isso se, por exemplo, precisar chamar uma UDF pelo nome ou usar a UDF em uma sessão posterior.

As próximas seções explicam como criar estas UDFs usando um ambiente de desenvolvimento local ou usando uma planilha Python.

Observe que se você definiu uma UDF executando o comando CREATE FUNCTION, você pode chamar essa UDF no Snowpark. Para obter mais detalhes, consulte Como chamar funções definidas pelo usuário (UDFs).

Nota

As UDFs vetorizadas de Python permitem que você defina funções Python que recebem lotes de linhas de entrada como DataFrames do Pandas. Isso resulta em um desempenho muito melhor com cenários de inferência de machine learning. Para obter mais informações, consulte Uso de UDFs vetorizadas.

Nota

Se você estiver trabalhando com uma planilha Python, use esses exemplos dentro da função do manipulador:

import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col

def main(session: snowpark.Session):
   df_table = session.table("sample_product_data")
Copy

Se os exemplos retornarem algo diferente de um DataFrame, como um objeto list de Row, altere o tipo de retorno para corresponder ao tipo de retorno do exemplo.

Depois de executar um exemplo de código, use a guia Results para visualizar qualquer saída retornada. Consulte Execução das planilhas Python para obter mais detalhes.

Especificação de dependências para uma UDF

Para definir uma UDF usando a API do Snowpark, você deve importar os arquivos que contenham quaisquer módulos dos quais sua UDF dependa, tais como arquivos Python, arquivos zip, arquivos de recursos etc.

Você também pode especificar um diretório, e a biblioteca do Snowpark o comprimirá automaticamente e o carregará como um arquivo zip. (Para obter mais detalhes sobre recursos de leitura de uma UDF, consulte Como ler arquivos com uma UDF).

Quando você chama Session.add_import(), a biblioteca do Snowpark carrega os arquivos especificados em um estágio interno e importa os arquivos ao executar sua UDF.

O exemplo a seguir demonstra como adicionar um arquivo zip em um estágio como uma dependência em relação ao seu código:

>>> # Add a zip file that you uploaded to a stage.
>>> session.add_import("@my_stage/<path>/my_library.zip")  
Copy

Os exemplos a seguir demonstram como adicionar um arquivo de Python a partir de sua máquina local:

>>> # Import a Python file from your local machine.
>>> session.add_import("/<path>/my_module.py")  

>>> # Import a Python file from your local machine and specify a relative Python import path.
>>> session.add_import("/<path>/my_module.py", import_path="my_dir.my_module")  
Copy

Os exemplos a seguir demonstram como adicionar outros tipos de dependências:

>>> # Add a directory of resource files.
>>> session.add_import("/<path>/my-resource-dir/")  

>>> # Add a resource file.
>>> session.add_import("/<path>/my-resource.xml")  
Copy

Nota

A biblioteca Python Snowpark não é carregada automaticamente.

Você não precisa especificar as seguintes dependências:

  • Suas bibliotecas incluídas no Python.

    Essas bibliotecas já estão disponíveis no ambiente de runtime no servidor onde suas UDFs são executadas.

Como usar pacotes de terceiros do Anaconda em uma UDF

Você pode usar pacotes de terceiros do canal Snowflake Anaconda em uma UDF.

  • Se você criar uma UDF Python em uma planilha Python, os pacotes Anaconda já estarão disponíveis em sua planilha. Consulte Como adicionar um arquivo Python de um estágio a uma planilha.

  • Se você criar uma UDF Python em seu ambiente de desenvolvimento local, você poderá especificar quais pacotes Anaconda serão instalados.

Quando consultas que chamam UDFs de Python são executadas dentro de um warehouse do Snowflake, os pacotes do Anaconda são instalados sem problemas e armazenados em cache no warehouse virtual em seu nome.

Para obter mais informações sobre práticas recomendadas, como visualizar os pacotes disponíveis e como criar um ambiente de desenvolvimento local, consulte Como usar pacotes de terceiros.

Se você escrever uma UDF Python em seu ambiente de desenvolvimento local, use session.add_packages para adicionar pacotes em nível de sessão.

Este exemplo de código mostra como importar pacotes e retornar suas versões.

>>> import numpy as np
>>> import pandas as pd
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import udf

>>> session.add_packages("numpy", "pandas", "xgboost==1.5.0")

>>> @udf
... def compute() -> list:
...    return [np.__version__, pd.__version__, xgb.__version__]
Copy

Você também pode usar session.add_requirements para especificar pacotes com um arquivo requirements.

>>> session.add_requirements("mydir/requirements.txt")  
Copy

Você pode adicionar os pacotes no nível da UDF para substituir os pacotes no nível da sessão que você possa ter adicionado anteriormente.

>>> import numpy as np
>>> import pandas as pd
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import udf

>>> @udf(packages=["numpy", "pandas", "xgboost==1.5.0"])
... def compute() -> list:
...     return [np.__version__, pd.__version__, xgb.__version__]
Copy

Importante

Se você não especificar uma versão de pacote, o Snowflake usará a versão mais recente ao resolver as dependências. Ao implementar a UDF para produção, você pode querer garantir que seu código sempre utilize as mesmas versões de dependência. Você pode fazer isso tanto para UDFs permanentes como para temporárias.

  • Quando você cria uma UDF permanente, a UDF é criada e registrada apenas uma vez. Isso resolve as dependências uma vez e a versão selecionada é usada para cargas de trabalho de produção. Quando a UDF é executada, ela sempre usará as mesmas versões de dependência.

  • Quando você criar uma UDF temporária, especifique as versões de dependência como parte da especificação da versão. Desta forma, quando a UDF for registrada, a resolução do pacote usará a versão especificada. Se você não especificar a versão, a dependência poderá ser atualizada quando uma nova versão estiver disponível.

Como criar uma UDF anônima

Para criar uma UDF anônima, você pode:

  • Chamar a função udf no módulo snowflake.snowpark.functions passando a definição da função anônima.

  • Chamar o método register na classe UDFRegistration passando a definição da função anônima.

Aqui está um exemplo de uma UDF anônima:

>>> from snowflake.snowpark.types import IntegerType
>>> from snowflake.snowpark.functions import udf

>>> add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()])
Copy

Nota

Ao escrever um código que possa ser executado em várias sessões, use o método register para registrar UDFs em vez de usar a função udf. Isso pode evitar erros nos quais o objeto Session padrão do Snowflake não pode ser encontrado.

Criação e registro de uma UDF nomeada

Se você quiser chamar uma UDF pelo nome (por exemplo, usando a função call_udf no módulo functions), você pode criar e registrar uma UDF nomeada. Para fazer isso, use uma das seguintes opções:

  • O método register, na classe UDFRegistration, com o argumento name.

  • A função udf, no módulo snowflake.snowpark.functions, com o argumento name.

Para acessar um atributo ou método da classe UDFRegistration, chame a propriedade udf da classe Session.

Chamar register ou udf criará uma UDF temporária que você pode usar na sessão atual.

Para criar uma UDF permanente, chame o método register ou a função udf e defina o argumento is_permanent como True. Quando você cria uma UDF permanente, você também deve definir o argumento stage_location como o local do estágio onde o arquivo de Python para a UDF e suas dependências são carregados.

Aqui está um exemplo de como registrar uma UDF nomeada temporária:

>>> from snowflake.snowpark.types import IntegerType
>>> from snowflake.snowpark.functions import udf

>>> add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()], name="my_udf", replace=True)
Copy

Aqui está um exemplo de como registrar uma UDF nomeada permanente definindo o argumento is_permanent como True:

>>> @udf(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True)
... def minus_one(x: int) -> int:
...   return x-1
Copy

Aqui está um exemplo de como essas UDFs são chamadas:

>>> df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b")
>>> df.select(add_one("a"), minus_one("b")).collect()
[Row(MY_UDF("A")=2, MINUS_ONE("B")=1), Row(MY_UDF("A")=4, MINUS_ONE("B")=3)]
Copy

Você também pode chamar a UDF usando SQL:

>>> session.sql("select minus_one(1)").collect()
[Row(MINUS_ONE(1)=0)]
Copy

Criação de um UDF a partir de um arquivo de origem Python

Se você criar sua UDF em seu ambiente de desenvolvimento local, você também poderá definir seu manipulador da UDF em um arquivo Python e depois usar o método register_from_file na classe UDFRegistration para criar uma UDF.

Nota

Você não pode usar este método em uma planilha Python.

Aqui estão exemplos da utilização de register_from_file.

Suponha que você tenha um arquivo Python test_udf_file.py que contenha:

def mod5(x: int) -> int:
    return x % 5
Copy

Depois você pode criar uma UDF a partir desta função do arquivo test_udf_file.py.

>>> # mod5() in that file has type hints
>>> mod5_udf = session.udf.register_from_file(
...     file_path="tests/resources/test_udf_dir/test_udf_file.py",
...     func_name="mod5",
... )  
>>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()  
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
Copy

Você também pode carregar o arquivo em um local preparado, depois usá-lo para criar a UDF.

>>> from snowflake.snowpark.types import IntegerType
>>> # suppose you have uploaded test_udf_file.py to stage location @mystage.
>>> mod5_udf = session.udf.register_from_file(
...     file_path="@mystage/test_udf_file.py",
...     func_name="mod5",
...     return_type=IntegerType(),
...     input_types=[IntegerType()],
... )  
>>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()  
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
Copy

Como ler arquivos com uma UDF

Para ler o conteúdo de um arquivo, seu código Python pode:

Como ler arquivos especificados estaticamente

A biblioteca Snowpark carrega e executa UDFs no servidor. Se sua UDF precisa ler dados de um arquivo, você deve assegurar-se de que o arquivo seja carregado com a UDF.

Nota

Se você escrever sua UDF em uma planilha Python, a UDF só poderá ler arquivos de um estágio.

Para configurar uma UDF para ler um arquivo:

  1. Especifique que o arquivo é uma dependência que faz o upload do arquivo para o servidor. Para obter mais informações, consulte Especificação de dependências para uma UDF.

    Por exemplo:

    >>> # Import a file from your local machine as a dependency.
    >>> session.add_import("/<path>/my_file.txt")  
    
    >>> # Or import a file that you uploaded to a stage as a dependency.
    >>> session.add_import("@my_stage/<path>/my_file.txt")  
    
    Copy
  2. Na UDF, leia o arquivo. No exemplo a seguir, o arquivo só será lido uma vez durante a criação do UDF, e não será lido novamente durante a execução do UDF. Isso é possível com uma biblioteca de terceiros cachetools.

    >>> import sys
    >>> import os
    >>> import cachetools
    >>> from snowflake.snowpark.types import StringType
    >>> @cachetools.cached(cache={})
    ... def read_file(filename):
    ...     import_dir = sys._xoptions.get("snowflake_import_directory")
    ...     if import_dir:
    ...         with open(os.path.join(import_dir, filename), "r") as f:
    ...             return f.read()
    >>>
    >>> # create a temporary text file for test
    >>> temp_file_name = "/tmp/temp.txt"
    >>> with open(temp_file_name, "w") as t:
    ...     _ = t.write("snowpark")
    >>> session.add_import(temp_file_name)
    >>> session.add_packages("cachetools")
    >>>
    >>> def add_suffix(s):
    ...     return f"{read_file(os.path.basename(temp_file_name))}-{s}"
    >>>
    >>> concat_file_content_with_str_udf = session.udf.register(
    ...     add_suffix,
    ...     return_type=StringType(),
    ...     input_types=[StringType()]
    ... )
    >>>
    >>> df = session.create_dataframe(["snowflake", "python"], schema=["a"])
    >>> df.select(concat_file_content_with_str_udf("a")).to_df("col1").collect()
    [Row(COL1='snowpark-snowflake'), Row(COL1='snowpark-python')]
    >>> os.remove(temp_file_name)
    >>> session.clear_imports()
    
    Copy

Como ler arquivos especificados dinamicamente com SnowflakeFile

Você pode ler um arquivo de um estágio usando a classe SnowflakeFile no módulo do Snowpark snowflake.snowpark.files. A classe SnowflakeFile fornece acesso dinâmico a arquivos, o que permite transmitir arquivos de qualquer tamanho. O acesso dinâmico a arquivos também é útil quando você deseja iterar vários arquivos. Para exemplos, consulte Processamento de vários arquivos.

Para obter mais informações e exemplos de leitura de arquivos usando SnowflakeFile, consulte Como ler um arquivo usando a classe SnowflakeFile em um manipulador de UDF Pythin.

O exemplo a seguir registra uma UDF temporária que lê um arquivo de texto de um estágio usando SnowflakeFile e retorna o tamanho do arquivo.

Registre a UDF:

import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import udf
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark.types import StringType, IntegerType

@udf(name="get_file_length", replace=True, input_types=[StringType()], return_type=IntegerType(), packages=['snowflake-snowpark-python'])
def get_file_length(file_path):
  with SnowflakeFile.open(file_path) as f:
    s = f.read()
  return len(s);
Copy

Chame a UDF:

session.sql("select get_file_length(build_scoped_file_url(@my_stage, 'example-file.txt'));")
Copy

Uso de UDFs vetorizadas

As UDFs de Python vetorizadas permitem que você defina funções Python que recebem lotes de linhas de entrada como DataFrames do Pandas e retornam lotes de resultados como matrizes ou séries do Pandas. A coluna no dataframe do Snowpark será vetorizada como uma Series do Pandas dentro da UDF.

Aqui está um exemplo de como usar a interface de lote:

from sklearn.linear_model import LinearRegression
model = LinearRegression()
model.fit(X, y)

@udf(packages=['pandas', 'scikit-learn','xgboost'])
def predict(df: PandasDataFrame[float, float, float, float]) -> PandasSeries[float]:
    # The input pandas DataFrame doesn't include column names. Specify the column names explicitly when needed.
    df.columns = ["col1", "col2", "col3", "col4"]
    return model.predict(df)
Copy

Você chama UDFs vetorizadas de Python da mesma forma que você chama outras UDFs de Python. Para obter mais informações, consulte UDFs vetorizadas de Python, que explica como criar um UDF vetorizado usando uma instrução SQL. Por exemplo, você pode usar o decorador vectorized quando especificar o código Python na instrução SQL. Ao usar a API Snowpark Python descrita neste documento, você não usará uma instrução SQL para criar uma UDF vetorizada. Então você não usa o decorador vectorized.

É possível limitar o número de linhas por lote. Para obter mais informações, consulte Como definir um tamanho alvo de lote.

Para obter mais explicações e exemplos de como usar a API Snowpark Python para criar UDFs vetorizados, consulte a seção de UDFs da referência de API Snowpark.