Exemplos de manipuladores de UDF em Python¶
Este tópico inclui exemplos simples de código de manipulador de UDF escrito em Python.
Para obter mais informações sobre como usar o Python para criar um manipulador de UDF, consulte Como criar UDFs de Python.
Defina runtime_version
como a versão do Python runtime que seu código requer. As versões suportadas do Python são:
3.8
3.9
3,10
3,11
Como importar um pacote em um manipulador inline¶
Uma lista com curadoria de pacotes de terceiros do Anaconda está disponível. Para obter mais informações, consulte Como usar pacotes de terceiros.
Nota
Antes que você possa usar os pacotes fornecidos pelo Anaconda, seu administrador da organização Snowflake deve reconhecer os Termos de Terceiros do Snowflake. Para obter mais informações, consulte Como usar pacotes de terceiros do Anaconda.
O código a seguir mostra como importar pacotes e retornar suas versões.
Crie a UDF:
CREATE OR REPLACE FUNCTION py_udf()
RETURNS VARIANT
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
PACKAGES = ('numpy','pandas','xgboost==1.5.0')
HANDLER = 'udf'
AS $$
import numpy as np
import pandas as pd
import xgboost as xgb
def udf():
return [np.__version__, pd.__version__, xgb.__version__]
$$;
Chame a UDF:
SELECT py_udf();
+-------------+
| PY_UDF() |
|-------------|
| [ |
| "1.19.2", |
| "1.4.0", |
| "1.5.0" |
| ] |
+-------------+
Você pode usar a palavra-chave PACKAGES para especificar versões de pacotes da seguinte maneira:
Sem uma versão (por exemplo,
numpy
)Fixado em uma versão exata (por exemplo,
numpy==1.25.2
)Limitado a um prefixo de versão usando caracteres curinga (por exemplo,
numpy==1.*
)Restrito a um intervalo de versões (por exemplo,
numpy>=1.25
)Restrito por vários especificadores de versão (por exemplo,
numpy>=1.25,<2
) para que um pacote que satisfaça todos os especificadores de versão seja selecionado.
Nota
O uso de vários operadores de intervalo (por exemplo, numpy>=1.25,<2
) não é compatível com políticas de pacote, mas você pode usá-los ao criar UDF, UDTF e procedimentos armazenados do Python.
Aqui está um exemplo de como usar o curinga *
para restringir um pacote a um prefixo de versão.
CREATE OR REPLACE FUNCTION my_udf()
RETURNS STRING
LANGUAGE PYTHON
PACKAGES=('numpy==1.*')
RUNTIME_VERSION=3.10
HANDLER='echo'
AS $$
def echo():
return 'hi'
$$;
Este exemplo mostra como restringir um pacote para ser maior ou igual a uma versão especificada.
CREATE OR REPLACE FUNCTION my_udf()
RETURNS STRING
LANGUAGE PYTHON
PACKAGES=('numpy>=1.2')
RUNTIME_VERSION=3.10
HANDLER='echo'
AS $$
def echo():
return 'hi'
$$;
Este exemplo mostra como usar vários especificadores de versão de pacote.
CREATE OR REPLACE FUNCTION my_udf()
RETURNS STRING
LANGUAGE PYTHON
PACKAGES=('numpy>=1.2,<2')
RUNTIME_VERSION=3.10
HANDLER='echo'
AS $$
def echo():
return 'hi'
$$;
Como ler um arquivo¶
Você pode ler o conteúdo de um arquivo com o código do manipulador de UDF de Python. Por exemplo, você pode querer ler um arquivo para processar dados não estruturados.
Para ler o conteúdo de um arquivo, você pode:
Especificar estaticamente o caminho e nome do arquivo com a cláusula IMPORTS, depois lê-lo a partir do diretório inicial da UDF. Isso pode ser útil quando um nome de arquivo é estático e consistente dentro da função e você conhece o nome do arquivo com antecedência.
Especificar dinamicamente o arquivo e ler seu conteúdo com SnowflakeFile. Você pode fazer isso se precisar acessar um arquivo durante a computação.
Como ler um arquivo especificado estaticamente usando IMPORTS¶
Você pode ler um arquivo especificando o nome do arquivo e o nome do estágio na cláusula IMPORTS do comando CREATE FUNCTION.
Quando você especifica um arquivo na cláusula IMPORTS, o Snowflake copia esse arquivo do estágio para o diretório inicial da UDF (também chamado de diretório importar), que é o diretório do qual a UDF realmente lê o arquivo.
Snowflake copia arquivos importados para um único diretório. Todos os arquivos nesse diretório devem ter nomes exclusivos, portanto, cada arquivo em sua cláusula IMPORTS deve ter um nome distinto. Isso se aplica mesmo se os arquivos começarem em diferentes estágios ou diferentes subdiretórios dentro de um estágio.
Nota
Você só pode importar arquivos do diretório de nível superior em um estágio, não em subpastas.
O exemplo a seguir usa um manipulador Python inline que lê um arquivo chamado file.txt
de um estágio chamado my_stage
. O manipulador recupera a localização do diretório inicial da UDF usando o método Python sys._xoptions
com a opção do sistema snowflake_import_directory
.
O Snowflake lê o arquivo uma vez durante a criação da UDF, e não será lido novamente durante a execução da UDF se a leitura do arquivo for feita fora do manipulador de destino.
Crie a UDF com um manipulador inline:
CREATE OR REPLACE FUNCTION my_udf()
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION=3.8
IMPORTS=('@my_stage/file.txt')
HANDLER='compute'
AS
$$
import sys
import os
with open(os.path.join(sys._xoptions["snowflake_import_directory"], 'file.txt'), "r") as f:
s = f.read()
def compute():
return s
$$;
Como ler um arquivo especificado dinamicamente com SnowflakeFile
¶
Você pode ler um arquivo de um estágio usando a classe SnowflakeFile
no módulo do Snowpark snowflake.snowpark.files
. A classe SnowflakeFile
fornece acesso dinâmico a arquivos, o que permite transmitir arquivos de qualquer tamanho. O acesso dinâmico a arquivos também é útil quando você deseja iterar vários arquivos. Para exemplos, consulte Processamento de vários arquivos.
A classe SnowflakeFile
tem um método para abrir um arquivo: open
. O método open
retorna um objeto SnowflakeFile
que estende os objetos de arquivo IOBase
do Python.
O objeto SnowflakeFile
suporta os seguintes métodos IOBase
, BufferedIOBase
e RawIOBase
:
IOBase.fileno
IOBase.isatty
IOBase.readable
IOBase.readinto
IOBase.readline
IOBase.readlines
IOBase.seek
IOBase.seekable
IOBase.tell
BufferedIOBase.readinto1
RawIOBase.read
RawIOBase.readall
Para obter mais informações, consulte a Documentação do Python 3.8 em IOBase. Chamar métodos sem suporte em um servidor Snowflake, como o método fileno
, retornará um erro.
Nota
Por padrão, o acesso a arquivos com SnowflakeFile
requer URLs com escopo para tornar seu código resiliente a ataques de injeção de arquivo. Você pode criar uma URL com escopo em SQL usando a função interna BUILD_SCOPED_FILE_URL. Para obter mais informações sobre URLs com escopo, consulte Tipos de URLs disponíveis para acessar arquivos. Somente usuários com acesso ao arquivo podem criar um URL com escopo.
Pré-requisitos¶
Antes que seu código do manipulador Python possa ler um arquivo em um estágio, você deve fazer o seguinte para disponibilizar o arquivo para o código:
Crie um estágio que esteja disponível para seu manipulador.
Você pode usar um estágio externo ou interno. Se você usar um estágio interno, isso pode ser um estágio de usuário quando planeja criar um procedimento armazenado de direitos do chamador. Caso contrário, você deve usar um estágio nomeado. No momento, o Snowflake não oferece suporte ao uso de um estágio de tabela para dependências da UDF.
Para saber mais sobre a criação de um estágio, consulte CREATE STAGE. Para saber mais sobre a escolha de um tipo de estágio interno, consulte Escolha de um estágio interno para os arquivos locais.
Privilégios adequados no estágio devem ser atribuídos à seguinte função, dependendo do seu caso de uso:
Caso de uso
Função
UDF ou procedimento armazenado de direitos do proprietário
A função que possui a UDF ou procedimento armazenado em execução.
Procedimento armazenado de direitos do chamador
A função do usuário.
Para obter mais informações, consulte Granting Privileges for User-Defined Functions.
Copie o arquivo que seu código lerá para o estágio.
Você pode copiar o arquivo de uma unidade local para um estágio interno usando o comando PUT. Para obter mais informações sobre a preparação de arquivos com PUT, consulte Preparação de arquivos de dados de um sistema de arquivo local.
Você pode copiar o arquivo de uma unidade local para um local de estágio externo usando qualquer uma das ferramentas fornecidas pelo seu serviço de armazenamento em nuvem. Para obter ajuda, consulte a documentação do seu serviço de armazenamento em nuvem.
Como calcular o hash perceptivo de uma imagem com um manipulador Python inline¶
Este exemplo usa SnowflakeFile
para ler um par de arquivos de imagem preparados e usar o hash perceptivo (pHash) de cada arquivo para determinar a semelhança entre as imagens.
Crie uma UDF que retorna o valor de fase de uma imagem, especificando o modo de entrada como binário passando rb
para o argumento mode
:
CREATE OR REPLACE FUNCTION calc_phash(file_path string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','imagehash','pillow')
HANDLER = 'run'
AS
$$
from PIL import Image
import imagehash
from snowflake.snowpark.files import SnowflakeFile
def run(file_path):
with SnowflakeFile.open(file_path, 'rb') as f:
return imagehash.average_hash(Image.open(f))
$$;
Crie uma segunda UDF que calcula a distância entre os valores de fase de duas imagens:
CREATE OR REPLACE FUNCTION calc_phash_distance(h1 string, h2 string)
RETURNS INT
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('imagehash')
HANDLER = 'run'
as
$$
import imagehash
def run(h1, h2):
return imagehash.hex_to_hash(h1) - imagehash.hex_to_hash(h2)
$$;
Prepare os arquivos de imagem e atualize a tabela de diretórios:
PUT file:///tmp/image1.jpg @images AUTO_COMPRESS=FALSE;
PUT file:///tmp/image2.jpg @images AUTO_COMPRESS=FALSE;
ALTER STAGE images REFRESH;
Chame a UDFs:
SELECT
calc_phash_distance(
calc_phash(build_scoped_file_url(@images, 'image1.jpg')),
calc_phash(build_scoped_file_url(@images, 'image2.jpg'))
) ;
Como processar um arquivo CSV com uma UDTF¶
Este exemplo usa SnowflakeFile
para criar uma UDTF que extrai o conteúdo de um arquivo CSV e retorna as linhas em uma tabela.
Crie a UDTF com um manipulador inline:
CREATE FUNCTION parse_csv(file_path string)
RETURNS TABLE (col1 string, col2 string, col3 string)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'csvparser'
AS
$$
from snowflake.snowpark.files import SnowflakeFile
class csvparser:
def process(self, stagefile):
with SnowflakeFile.open(stagefile) as f:
for line in f.readlines():
lineStr = line.strip()
row = lineStr.split(",")
try:
# Read the columns from the line.
yield (row[1], row[0], row[2], )
except:
pass
$$;
Prepare arquivo CSV e atualize a tabela de diretórios:
PUT file:///tmp/sample.csv @data_stage AUTO_COMPRESS=FALSE;
ALTER STAGE data_stage REFRESH;
Chame a UDTF, passando um URL de arquivo:
SELECT * FROM TABLE(PARSE_CSV(build_scoped_file_url(@data_stage, 'sample.csv')));
Processamento de vários arquivos¶
Você pode ler e processar vários arquivos passando a coluna RELATIVE_PATH de uma tabela de diretórios para seu manipulador. Para obter mais informações sobre a coluna RELATIVE_PATH, consulte a saída de uma consulta de tabela de diretório.
Nota
Dependendo do tamanho do arquivo e das necessidades de computação, talvez você queira usar ALTER WAREHOUSE para dimensionar seu warehouse antes de executar uma instrução que lê e processa vários arquivos.
- Chame uma UDF para processar vários arquivos:
O exemplo a seguir chama uma UDF dentro de uma instrução CREATE TABLE para processar cada arquivo em um estágio e, em seguida, armazenar os resultados em uma nova tabela.
Para fins de demonstração, o exemplo considera o seguinte:
Existem vários arquivos de texto em um estágio chamado
my_stage
.Há uma UDF existente chamada
get_sentiment
que realiza análise de sentimento em texto não estruturado. A UDF toma um caminho para um arquivo de texto como entrada e retorna um valor que representa o sentimento.
CREATE OR REPLACE TABLE sentiment_results AS SELECT relative_path , get_sentiment(build_scoped_file_url(@my_stage, relative_path)) AS sentiment FROM directory(@my_stage);
- Chame uma UDTF para processar vários arquivos:
Este próximo exemplo chama uma UDTF chamada
parse_excel_udtf
. O exemplo passa orelative_path
da tabela de diretórios para o estágio chamadomy_excel_stage
.SELECT t.* FROM directory(@my_stage) d, table(parse_excel_udtf(build_scoped_file_url(@my_excel_stage, relative_path)) t;
Como ler arquivos com URIs e URLs do estágio¶
Acesso a arquivos com SnowflakeFile
exige URLs com escopo por padrão. Isso torna seu código resistente a ataques de injeção de arquivos. No entanto, você pode se referir a um local de arquivo usando um URI de estágio ou um URL de estágio em vez disso. Para isso, você deve chamar o método SnowflakeFile.open
com o argumento de palavra-chave require_scoped_url = False
.
Esta opção é útil quando você deseja permitir que um chamador forneça um URI que seja acessível apenas ao proprietário da UDF. Por exemplo, você pode usar um URI de estágio para acesso a arquivos se você possui uma UDF e você deseja ler em seus arquivos de configuração ou modelos de aprendizado de máquina. Não recomendamos esta opção quando você trabalha com arquivos que possuem nomes imprevisíveis, como arquivos criados com base na entrada do usuário.
Este exemplo lê um modelo de aprendizado de máquina de um arquivo e usa o modelo em uma função para executar o processamento de linguagem natural para análise de sentimento. O exemplo chama o open
com require_scoped_url = False
. Em ambos os formatos de localização de arquivo (URI do estágio e URL do estágio), o proprietário da UDF deve ter acesso ao arquivo de modelo.
Crie a UDF com um manipulador inline:
CREATE OR REPLACE FUNCTION extract_sentiment(input_data string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','scikit-learn')
HANDLER = 'run'
AS
$$
from snowflake.snowpark.files import SnowflakeFile
from sklearn.linear_model import SGDClassifier
import pickle
def run(input_data):
model_file = '@models/NLP_model.pickle'
# Specify 'mode = rb' to open the file in binary mode.
with SnowflakeFile.open(model_file, 'rb', require_scoped_url = False) as f:
model = pickle.load(f)
return model.predict([input_data])[0]
$$;
Prepare o arquivo de modelo e atualize a tabela de diretórios:
PUT file:///tmp/NLP_model.pickle @models AUTO_COMPRESS=FALSE;
ALTER STAGE models REFRESH;
Como alternativa, você pode especificar a UDF com o URL do estágio do modelo para extrair o sentimento.
Por exemplo, crie uma UDF com um manipulador inline que especifica um arquivo usando um URL do estágio:
CREATE OR REPLACE FUNCTION extract_sentiment(input_data string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','scikit-learn')
HANDLER = 'run'
AS
$$
from snowflake.snowpark.files import SnowflakeFile
from sklearn.linear_model import SGDClassifier
import pickle
def run(input_data):
model_file = 'https://my_account/api/files/my_db/my_schema/models/NLP_model.pickle'
# Specify 'rb' to open the file in binary mode.
with SnowflakeFile.open(model_file, 'rb', require_scoped_url = False) as f:
model = pickle.load(f)
return model.predict([input_data])[0]
$$;
Chame a UDF com os dados de entrada:
SELECT extract_sentiment('I am writing to express my interest in a recent posting made.');
Como escrever arquivos¶
Um manipulador de UDF pode escrever arquivos em um diretório /tmp
criado para a consulta chamando a UDF.
Tenha em mente que um diretório /tmp
é reservado para uma única consulta de chamada, mas vários processos de trabalhadores de Python podem estar em execução ao mesmo tempo. Para evitar colisões, você deve garantir que o acesso ao diretório /tmp seja sincronizado com outros processos de trabalhadores de Python ou que os nomes dos arquivos escritos no /tmp sejam únicos.
Para obter um código de exemplo, consulte Como descompactar um arquivo preparado neste tópico.
O código no exemplo a seguir grava a entrada text
para o diretório /tmp
. Ele também anexa o ID do processo da função para garantir a exclusividade do local do arquivo.
def func(text):
# Append the function's process ID to ensure the file name's uniqueness.
file_path = '/tmp/content' + str(os.getpid())
with open(file_path, "w") as file:
file.write(text)
Como descompactar um arquivo preparado¶
Você pode armazenar um arquivo .zip em um estágio e depois descompactá-lo em uma UDF usando o módulo Python zipfile.
Por exemplo, você pode carregar um arquivo .zip para um estágio, depois referenciar o arquivo .zip em seu local de estágio na cláusula IMPORTS ao criar a UDF. No runtime, o Snowflake copiará o arquivo preparado para um diretório de importação do qual seu código pode acessá-lo.
Para obter mais informações sobre como ler e gravar arquivos, consulte Como ler um arquivo e Como escrever arquivos.
No exemplo a seguir, o código da UDF usa um modelo de NLP para descobrir entidades no texto. O código retorna um conjunto dessas entidades. Para configurar o modelo de NLP para processar o texto, o código primeiro usa o módulo zipfile para extrair o arquivo para o modelo (en_core_web_sm-2.3.1) de um arquivo .zip. O código então usa o módulo spaCy para carregar o modelo a partir do arquivo.
Note que o código escreve o conteúdo do arquivo extraído no diretório /tmp criado para a consulta que chama essa função. O código usa bloqueios de arquivo para garantir que a extração seja sincronizada entre os processos de trabalhadores de Python; dessa forma, o conteúdo é descompactado apenas uma vez. Para obter mais informações sobre como escrever arquivos, consulte Como escrever arquivos.
Para saber mais sobre o módulo de arquivo zip, consulte a referência zipfile. Para saber mais sobre o módulo spaCy, consulte a documentação spaCy API.
Crie a UDF com um manipulador inline:
CREATE OR REPLACE FUNCTION py_spacy(str string)
RETURNS ARRAY
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'func'
PACKAGES = ('spacy')
IMPORTS = ('@spacy_stage/spacy_en_core_web_sm.zip')
AS
$$
import fcntl
import os
import spacy
import sys
import threading
import zipfile
# File lock class for synchronizing write access to /tmp.
class FileLock:
def __enter__(self):
self._lock = threading.Lock()
self._lock.acquire()
self._fd = open('/tmp/lockfile.LOCK', 'w+')
fcntl.lockf(self._fd, fcntl.LOCK_EX)
def __exit__(self, type, value, traceback):
self._fd.close()
self._lock.release()
# Get the location of the import directory. Snowflake sets the import
# directory location so code can retrieve the location via sys._xoptions.
IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
# Get the path to the ZIP file and set the location to extract to.
zip_file_path = import_dir + "spacy_en_core_web_sm.zip"
extracted = '/tmp/en_core_web_sm'
# Extract the contents of the ZIP. This is done under the file lock
# to ensure that only one worker process unzips the contents.
with FileLock():
if not os.path.isdir(extracted + '/en_core_web_sm/en_core_web_sm-2.3.1'):
with zipfile.ZipFile(zip_file_path, 'r') as myzip:
myzip.extractall(extracted)
# Load the model from the extracted file.
nlp = spacy.load(extracted + "/en_core_web_sm/en_core_web_sm-2.3.1")
def func(text):
doc = nlp(text)
result = []
for ent in doc.ents:
result.append((ent.text, ent.start_char, ent.end_char, ent.label_))
return result
$$;
Tratamento de valores NULL¶
O código a seguir mostra como os valores NULL são tratados. Para obter mais informações, consulte Valores NULL.
Crie a UDF:
CREATE OR REPLACE FUNCTION py_udf_null(a variant)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'udf'
AS $$
def udf(a):
if not a:
return 'JSON null'
elif getattr(a, "is_sql_null", False):
return 'SQL null'
else:
return 'not null'
$$;
Chame a UDF:
SELECT py_udf_null(null);
SELECT py_udf_null(parse_json('null'));
SELECT py_udf_null(10);
+-------------------+
| PY_UDF_NULL(NULL) |
|-------------------|
| SQL null |
+-------------------+
+---------------------------------+
| PY_UDF_NULL(PARSE_JSON('NULL')) |
|---------------------------------|
| JSON null |
+---------------------------------+
+-----------------+
| PY_UDF_NULL(10) |
|-----------------|
| not null |
+-----------------+