Exemplos de manipuladores de UDF em Python

Este tópico inclui exemplos simples de código de manipulador de UDF escrito em Python.

Para obter mais informações sobre como usar o Python para criar um manipulador de UDF, consulte Como criar UDFs de Python.

Defina runtime_version como a versão do Python runtime que seu código requer. As versões suportadas do Python são:

  • 3.8

  • 3.9

  • 3,10

  • 3,11

Como importar um pacote em um manipulador inline

Uma lista com curadoria de pacotes de terceiros do Anaconda está disponível. Para obter mais informações, consulte Como usar pacotes de terceiros.

Nota

Antes que você possa usar os pacotes fornecidos pelo Anaconda, seu administrador da organização Snowflake deve reconhecer os Termos de Terceiros do Snowflake. Para obter mais informações, consulte Como usar pacotes de terceiros do Anaconda.

O código a seguir mostra como importar pacotes e retornar suas versões.

Crie a UDF:

CREATE OR REPLACE FUNCTION py_udf()
RETURNS VARIANT
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
PACKAGES = ('numpy','pandas','xgboost==1.5.0')
HANDLER = 'udf'
AS $$
import numpy as np
import pandas as pd
import xgboost as xgb
def udf():
  return [np.__version__, pd.__version__, xgb.__version__]
$$;
Copy

Chame a UDF:

SELECT py_udf();
+-------------+
| PY_UDF()    |
|-------------|
| [           |
|   "1.19.2", |
|   "1.4.0",  |
|   "1.5.0"   |
| ]           |
+-------------+
Copy

Você pode usar a palavra-chave PACKAGES para especificar versões de pacotes da seguinte maneira:

  • Sem uma versão (por exemplo, numpy)

  • Fixado em uma versão exata (por exemplo, numpy==1.25.2)

  • Limitado a um prefixo de versão usando caracteres curinga (por exemplo, numpy==1.*)

  • Restrito a um intervalo de versões (por exemplo, numpy>=1.25)

  • Restrito por vários especificadores de versão (por exemplo, numpy>=1.25,<2) para que um pacote que satisfaça todos os especificadores de versão seja selecionado.

Nota

O uso de vários operadores de intervalo (por exemplo, numpy>=1.25,<2) não é compatível com políticas de pacote, mas você pode usá-los ao criar UDF, UDTF e procedimentos armazenados do Python.

Aqui está um exemplo de como usar o curinga * para restringir um pacote a um prefixo de versão.

CREATE OR REPLACE FUNCTION my_udf()
RETURNS STRING
LANGUAGE PYTHON
PACKAGES=('numpy==1.*')
RUNTIME_VERSION=3.10
HANDLER='echo'
AS $$
def echo():
  return 'hi'
$$;
Copy

Este exemplo mostra como restringir um pacote para ser maior ou igual a uma versão especificada.

CREATE OR REPLACE FUNCTION my_udf()
RETURNS STRING
LANGUAGE PYTHON
PACKAGES=('numpy>=1.2')
RUNTIME_VERSION=3.10
HANDLER='echo'
AS $$
def echo():
  return 'hi'
$$;
Copy

Este exemplo mostra como usar vários especificadores de versão de pacote.

CREATE OR REPLACE FUNCTION my_udf()
RETURNS STRING
LANGUAGE PYTHON
PACKAGES=('numpy>=1.2,<2')
RUNTIME_VERSION=3.10
HANDLER='echo'
AS $$
def echo():
  return 'hi'
$$;
Copy

Como ler um arquivo

Você pode ler o conteúdo de um arquivo com o código do manipulador de UDF de Python. Por exemplo, você pode querer ler um arquivo para processar dados não estruturados.

Para ler o conteúdo de um arquivo, você pode:

Como ler um arquivo especificado estaticamente usando IMPORTS

Você pode ler um arquivo especificando o nome do arquivo e o nome do estágio na cláusula IMPORTS do comando CREATE FUNCTION.

Quando você especifica um arquivo na cláusula IMPORTS, o Snowflake copia esse arquivo do estágio para o diretório inicial da UDF (também chamado de diretório importar), que é o diretório do qual a UDF realmente lê o arquivo.

Snowflake copia arquivos importados para um único diretório. Todos os arquivos nesse diretório devem ter nomes exclusivos, portanto, cada arquivo em sua cláusula IMPORTS deve ter um nome distinto. Isso se aplica mesmo se os arquivos começarem em diferentes estágios ou diferentes subdiretórios dentro de um estágio.

Nota

Você só pode importar arquivos do diretório de nível superior em um estágio, não em subpastas.

O exemplo a seguir usa um manipulador Python inline que lê um arquivo chamado file.txt de um estágio chamado my_stage. O manipulador recupera a localização do diretório inicial da UDF usando o método Python sys._xoptions com a opção do sistema snowflake_import_directory.

O Snowflake lê o arquivo uma vez durante a criação da UDF, e não será lido novamente durante a execução da UDF se a leitura do arquivo for feita fora do manipulador de destino.

Crie a UDF com um manipulador inline:

CREATE OR REPLACE FUNCTION my_udf()
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION=3.8
IMPORTS=('@my_stage/file.txt')
HANDLER='compute'
AS
$$
import sys
import os

with open(os.path.join(sys._xoptions["snowflake_import_directory"], 'file.txt'), "r") as f:
  s = f.read()

def compute():
  return s
$$;
Copy

Como ler um arquivo especificado dinamicamente com SnowflakeFile

Você pode ler um arquivo de um estágio usando a classe SnowflakeFile no módulo do Snowpark snowflake.snowpark.files. A classe SnowflakeFile fornece acesso dinâmico a arquivos, o que permite transmitir arquivos de qualquer tamanho. O acesso dinâmico a arquivos também é útil quando você deseja iterar vários arquivos. Para exemplos, consulte Processamento de vários arquivos.

A classe SnowflakeFile tem um método para abrir um arquivo: open. O método open retorna um objeto SnowflakeFile que estende os objetos de arquivo IOBase do Python.

O objeto SnowflakeFile suporta os seguintes métodos IOBase, BufferedIOBase e RawIOBase:

  • IOBase.fileno

  • IOBase.isatty

  • IOBase.readable

  • IOBase.readinto

  • IOBase.readline

  • IOBase.readlines

  • IOBase.seek

  • IOBase.seekable

  • IOBase.tell

  • BufferedIOBase.readinto1

  • RawIOBase.read

  • RawIOBase.readall

Para obter mais informações, consulte a Documentação do Python 3.8 em IOBase. Chamar métodos sem suporte em um servidor Snowflake, como o método fileno, retornará um erro.

Nota

Por padrão, o acesso a arquivos com SnowflakeFile requer URLs com escopo para tornar seu código resiliente a ataques de injeção de arquivo. Você pode criar uma URL com escopo em SQL usando a função interna BUILD_SCOPED_FILE_URL. Para obter mais informações sobre URLs com escopo, consulte Tipos de URLs disponíveis para acessar arquivos. Somente usuários com acesso ao arquivo podem criar um URL com escopo.

Pré-requisitos

Antes que seu código do manipulador Python possa ler um arquivo em um estágio, você deve fazer o seguinte para disponibilizar o arquivo para o código:

  1. Crie um estágio que esteja disponível para seu manipulador.

    Você pode usar um estágio externo ou interno. Se você usar um estágio interno, isso pode ser um estágio de usuário quando planeja criar um procedimento armazenado de direitos do chamador. Caso contrário, você deve usar um estágio nomeado. No momento, o Snowflake não oferece suporte ao uso de um estágio de tabela para dependências da UDF.

    Para saber mais sobre a criação de um estágio, consulte CREATE STAGE. Para saber mais sobre a escolha de um tipo de estágio interno, consulte Escolha de um estágio interno para os arquivos locais.

    Privilégios adequados no estágio devem ser atribuídos à seguinte função, dependendo do seu caso de uso:

    Caso de uso

    Função

    UDF ou procedimento armazenado de direitos do proprietário

    A função que possui a UDF ou procedimento armazenado em execução.

    Procedimento armazenado de direitos do chamador

    A função do usuário.

    Para obter mais informações, consulte Granting Privileges for User-Defined Functions.

  2. Copie o arquivo que seu código lerá para o estágio.

    Você pode copiar o arquivo de uma unidade local para um estágio interno usando o comando PUT. Para obter mais informações sobre a preparação de arquivos com PUT, consulte Preparação de arquivos de dados de um sistema de arquivo local.

    Você pode copiar o arquivo de uma unidade local para um local de estágio externo usando qualquer uma das ferramentas fornecidas pelo seu serviço de armazenamento em nuvem. Para obter ajuda, consulte a documentação do seu serviço de armazenamento em nuvem.

Como calcular o hash perceptivo de uma imagem com um manipulador Python inline

Este exemplo usa SnowflakeFile para ler um par de arquivos de imagem preparados e usar o hash perceptivo (pHash) de cada arquivo para determinar a semelhança entre as imagens.

Crie uma UDF que retorna o valor de fase de uma imagem, especificando o modo de entrada como binário passando rb para o argumento mode:

CREATE OR REPLACE FUNCTION calc_phash(file_path string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','imagehash','pillow')
HANDLER = 'run'
AS
$$
from PIL import Image
import imagehash
from snowflake.snowpark.files import SnowflakeFile

def run(file_path):
  with SnowflakeFile.open(file_path, 'rb') as f:
  return imagehash.average_hash(Image.open(f))
$$;
Copy

Crie uma segunda UDF que calcula a distância entre os valores de fase de duas imagens:

CREATE OR REPLACE FUNCTION calc_phash_distance(h1 string, h2 string)
RETURNS INT
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('imagehash')
HANDLER = 'run'
as
$$
import imagehash

def run(h1, h2):
  return imagehash.hex_to_hash(h1) - imagehash.hex_to_hash(h2)
$$;
Copy

Prepare os arquivos de imagem e atualize a tabela de diretórios:

PUT file:///tmp/image1.jpg @images AUTO_COMPRESS=FALSE;
PUT file:///tmp/image2.jpg @images AUTO_COMPRESS=FALSE;

ALTER STAGE images REFRESH;
Copy

Chame a UDFs:

SELECT
  calc_phash_distance(
    calc_phash(build_scoped_file_url(@images, 'image1.jpg')),
    calc_phash(build_scoped_file_url(@images, 'image2.jpg'))
  ) ;
Copy

Como processar um arquivo CSV com uma UDTF

Este exemplo usa SnowflakeFile para criar uma UDTF que extrai o conteúdo de um arquivo CSV e retorna as linhas em uma tabela.

Crie a UDTF com um manipulador inline:

CREATE FUNCTION parse_csv(file_path string)
RETURNS TABLE (col1 string, col2 string, col3 string)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'csvparser'
AS
$$
from snowflake.snowpark.files import SnowflakeFile

class csvparser:
  def process(self, stagefile):
    with SnowflakeFile.open(stagefile) as f:
      for line in f.readlines():
        lineStr = line.strip()
        row = lineStr.split(",")
        try:
          # Read the columns from the line.
          yield (row[1], row[0], row[2], )
        except:
          pass
$$;
Copy

Prepare arquivo CSV e atualize a tabela de diretórios:

PUT file:///tmp/sample.csv @data_stage AUTO_COMPRESS=FALSE;

ALTER STAGE data_stage REFRESH;
Copy

Chame a UDTF, passando um URL de arquivo:

SELECT * FROM TABLE(PARSE_CSV(build_scoped_file_url(@data_stage, 'sample.csv')));
Copy

Processamento de vários arquivos

Você pode ler e processar vários arquivos passando a coluna RELATIVE_PATH de uma tabela de diretórios para seu manipulador. Para obter mais informações sobre a coluna RELATIVE_PATH, consulte a saída de uma consulta de tabela de diretório.

Nota

Dependendo do tamanho do arquivo e das necessidades de computação, talvez você queira usar ALTER WAREHOUSE para dimensionar seu warehouse antes de executar uma instrução que lê e processa vários arquivos.

Chame uma UDF para processar vários arquivos:

O exemplo a seguir chama uma UDF dentro de uma instrução CREATE TABLE para processar cada arquivo em um estágio e, em seguida, armazenar os resultados em uma nova tabela.

Para fins de demonstração, o exemplo considera o seguinte:

  • Existem vários arquivos de texto em um estágio chamado my_stage.

  • Há uma UDF existente chamada get_sentiment que realiza análise de sentimento em texto não estruturado. A UDF toma um caminho para um arquivo de texto como entrada e retorna um valor que representa o sentimento.

CREATE OR REPLACE TABLE sentiment_results AS
SELECT
  relative_path
  , get_sentiment(build_scoped_file_url(@my_stage, relative_path)) AS sentiment
FROM directory(@my_stage);
Copy
Chame uma UDTF para processar vários arquivos:

Este próximo exemplo chama uma UDTF chamada parse_excel_udtf. O exemplo passa o relative_path da tabela de diretórios para o estágio chamado my_excel_stage.

SELECT t.*
FROM directory(@my_stage) d,
table(parse_excel_udtf(build_scoped_file_url(@my_excel_stage, relative_path)) t;
Copy

Como ler arquivos com URIs e URLs do estágio

Acesso a arquivos com SnowflakeFile exige URLs com escopo por padrão. Isso torna seu código resistente a ataques de injeção de arquivos. No entanto, você pode se referir a um local de arquivo usando um URI de estágio ou um URL de estágio em vez disso. Para isso, você deve chamar o método SnowflakeFile.open com o argumento de palavra-chave require_scoped_url = False.

Esta opção é útil quando você deseja permitir que um chamador forneça um URI que seja acessível apenas ao proprietário da UDF. Por exemplo, você pode usar um URI de estágio para acesso a arquivos se você possui uma UDF e você deseja ler em seus arquivos de configuração ou modelos de aprendizado de máquina. Não recomendamos esta opção quando você trabalha com arquivos que possuem nomes imprevisíveis, como arquivos criados com base na entrada do usuário.

Este exemplo lê um modelo de aprendizado de máquina de um arquivo e usa o modelo em uma função para executar o processamento de linguagem natural para análise de sentimento. O exemplo chama o open com require_scoped_url = False. Em ambos os formatos de localização de arquivo (URI do estágio e URL do estágio), o proprietário da UDF deve ter acesso ao arquivo de modelo.

Crie a UDF com um manipulador inline:

CREATE OR REPLACE FUNCTION extract_sentiment(input_data string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','scikit-learn')
HANDLER = 'run'
AS
$$
from snowflake.snowpark.files import SnowflakeFile
from sklearn.linear_model import SGDClassifier
import pickle

def run(input_data):
  model_file = '@models/NLP_model.pickle'
  # Specify 'mode = rb' to open the file in binary mode.
  with SnowflakeFile.open(model_file, 'rb', require_scoped_url = False) as f:
    model = pickle.load(f)
    return model.predict([input_data])[0]
$$;
Copy

Prepare o arquivo de modelo e atualize a tabela de diretórios:

PUT file:///tmp/NLP_model.pickle @models AUTO_COMPRESS=FALSE;

ALTER STAGE models REFRESH;
Copy

Como alternativa, você pode especificar a UDF com o URL do estágio do modelo para extrair o sentimento.

Por exemplo, crie uma UDF com um manipulador inline que especifica um arquivo usando um URL do estágio:

CREATE OR REPLACE FUNCTION extract_sentiment(input_data string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','scikit-learn')
HANDLER = 'run'
AS
$$
from snowflake.snowpark.files import SnowflakeFile
from sklearn.linear_model import SGDClassifier
import pickle

def run(input_data):
  model_file = 'https://my_account/api/files/my_db/my_schema/models/NLP_model.pickle'
  # Specify 'rb' to open the file in binary mode.
  with SnowflakeFile.open(model_file, 'rb', require_scoped_url = False) as f:
    model = pickle.load(f)
    return model.predict([input_data])[0]
$$;
Copy

Chame a UDF com os dados de entrada:

SELECT extract_sentiment('I am writing to express my interest in a recent posting made.');
Copy

Como escrever arquivos

Um manipulador de UDF pode escrever arquivos em um diretório /tmp criado para a consulta chamando a UDF.

Tenha em mente que um diretório /tmp é reservado para uma única consulta de chamada, mas vários processos de trabalhadores de Python podem estar em execução ao mesmo tempo. Para evitar colisões, você deve garantir que o acesso ao diretório /tmp seja sincronizado com outros processos de trabalhadores de Python ou que os nomes dos arquivos escritos no /tmp sejam únicos.

Para obter um código de exemplo, consulte Como descompactar um arquivo preparado neste tópico.

O código no exemplo a seguir grava a entrada text para o diretório /tmp. Ele também anexa o ID do processo da função para garantir a exclusividade do local do arquivo.

def func(text):
   # Append the function's process ID to ensure the file name's uniqueness.
   file_path = '/tmp/content' + str(os.getpid())
   with open(file_path, "w") as file:
      file.write(text)
Copy

Como descompactar um arquivo preparado

Você pode armazenar um arquivo .zip em um estágio e depois descompactá-lo em uma UDF usando o módulo Python zipfile.

Por exemplo, você pode carregar um arquivo .zip para um estágio, depois referenciar o arquivo .zip em seu local de estágio na cláusula IMPORTS ao criar a UDF. No runtime, o Snowflake copiará o arquivo preparado para um diretório de importação do qual seu código pode acessá-lo.

Para obter mais informações sobre como ler e gravar arquivos, consulte Como ler um arquivo e Como escrever arquivos.

No exemplo a seguir, o código da UDF usa um modelo de NLP para descobrir entidades no texto. O código retorna um conjunto dessas entidades. Para configurar o modelo de NLP para processar o texto, o código primeiro usa o módulo zipfile para extrair o arquivo para o modelo (en_core_web_sm-2.3.1) de um arquivo .zip. O código então usa o módulo spaCy para carregar o modelo a partir do arquivo.

Note que o código escreve o conteúdo do arquivo extraído no diretório /tmp criado para a consulta que chama essa função. O código usa bloqueios de arquivo para garantir que a extração seja sincronizada entre os processos de trabalhadores de Python; dessa forma, o conteúdo é descompactado apenas uma vez. Para obter mais informações sobre como escrever arquivos, consulte Como escrever arquivos.

Para saber mais sobre o módulo de arquivo zip, consulte a referência zipfile. Para saber mais sobre o módulo spaCy, consulte a documentação spaCy API.

Crie a UDF com um manipulador inline:

CREATE OR REPLACE FUNCTION py_spacy(str string)
RETURNS ARRAY
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'func'
PACKAGES = ('spacy')
IMPORTS = ('@spacy_stage/spacy_en_core_web_sm.zip')
AS
$$
import fcntl
import os
import spacy
import sys
import threading
import zipfile

 # File lock class for synchronizing write access to /tmp.
 class FileLock:
   def __enter__(self):
       self._lock = threading.Lock()
       self._lock.acquire()
       self._fd = open('/tmp/lockfile.LOCK', 'w+')
       fcntl.lockf(self._fd, fcntl.LOCK_EX)

    def __exit__(self, type, value, traceback):
       self._fd.close()
       self._lock.release()

 # Get the location of the import directory. Snowflake sets the import
 # directory location so code can retrieve the location via sys._xoptions.
 IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
 import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]

 # Get the path to the ZIP file and set the location to extract to.
 zip_file_path = import_dir + "spacy_en_core_web_sm.zip"
 extracted = '/tmp/en_core_web_sm'

 # Extract the contents of the ZIP. This is done under the file lock
 # to ensure that only one worker process unzips the contents.
 with FileLock():
    if not os.path.isdir(extracted + '/en_core_web_sm/en_core_web_sm-2.3.1'):
       with zipfile.ZipFile(zip_file_path, 'r') as myzip:
          myzip.extractall(extracted)

 # Load the model from the extracted file.
 nlp = spacy.load(extracted + "/en_core_web_sm/en_core_web_sm-2.3.1")

 def func(text):
    doc = nlp(text)
    result = []

    for ent in doc.ents:
       result.append((ent.text, ent.start_char, ent.end_char, ent.label_))
    return result
 $$;
Copy

Tratamento de valores NULL

O código a seguir mostra como os valores NULL são tratados. Para obter mais informações, consulte Valores NULL.

Crie a UDF:

CREATE OR REPLACE FUNCTION py_udf_null(a variant)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'udf'
AS $$

def udf(a):
    if not a:
        return 'JSON null'
    elif getattr(a, "is_sql_null", False):
        return 'SQL null'
    else:
        return 'not null'
$$;
Copy

Chame a UDF:

SELECT py_udf_null(null);
SELECT py_udf_null(parse_json('null'));
SELECT py_udf_null(10);
+-------------------+
| PY_UDF_NULL(NULL) |
|-------------------|
| SQL null          |
+-------------------+

+---------------------------------+
| PY_UDF_NULL(PARSE_JSON('NULL')) |
|---------------------------------|
| JSON null                       |
+---------------------------------+

+-----------------+
| PY_UDF_NULL(10) |
|-----------------|
| not null        |
+-----------------+
Copy