Criação de funções definidas pelo usuário (UDFs) para DataFrames em Python

A API do Snowpark fornece métodos que você pode usar para criar uma função definida pelo usuário a partir de uma lambda ou função em Python. Este tópico explica como criar esses tipos de funções.

Neste tópico:

Introdução

Com o Snowpark, você pode criar funções definidas pelo usuário (UDFs) para suas lambdas e funções personalizadas, e pode chamar essas UDFs para processar os dados em seu DataFrame.

Quando você usa a API do Snowpark para criar uma UDF, a biblioteca do Snowpark carrega o código de sua função para um estágio interno. Quando você chama a UDF, a biblioteca do Snowpark executa sua função no servidor, onde os dados estão. Como resultado, os dados não precisam ser transferidos para o cliente para que a função possa processar os dados.

Em seu código personalizado, você também pode importar módulos de arquivos Python ou pacotes de terceiros.

Você pode criar uma UDF para seu código personalizado de duas maneiras:

  • Você pode criar uma UDF anônima e atribuir a função a uma variável. Desde que essa variável esteja no escopo, você pode usar a variável para chamar a UDF.

  • Você pode criar uma UDF nomeada e chamar a UDF pelo nome. Você pode usar isso se, por exemplo, precisar chamar uma UDF pelo nome ou usar a UDF em uma sessão posterior.

As próximas seções explicam como criar estas UDFs usando um ambiente de desenvolvimento local ou usando uma planilha Python.

Observe que se você definiu uma UDF executando o comando CREATE FUNCTION, você pode chamar essa UDF no Snowpark. Para obter mais detalhes, consulte Como chamar funções definidas pelo usuário (UDFs).

Nota

As UDFs vetorizadas de Python permitem que você defina funções Python que recebem lotes de linhas de entrada como DataFrames do Pandas. Isso resulta em um desempenho muito melhor com cenários de inferência de machine learning. Para obter mais informações, consulte Uso de UDFs vetorizadas.

Nota

Se você estiver trabalhando com uma planilha Python, use esses exemplos dentro da função do manipulador:

import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col

def main(session: snowpark.Session):
   df_table = session.table("sample_product_data")
Copy

Se os exemplos retornarem algo diferente de um DataFrame, como um objeto list de Row, altere o tipo de retorno para corresponder ao tipo de retorno do exemplo.

Depois de executar um exemplo de código, use a guia Results para visualizar qualquer saída retornada. Consulte Execução das planilhas Python para obter mais detalhes.

Especificação de dependências para uma UDF

Para definir uma UDF usando a API do Snowpark, você deve importar os arquivos que contenham quaisquer módulos dos quais sua UDF dependa, tais como arquivos Python, arquivos zip, arquivos de recursos etc.

Você também pode especificar um diretório, e a biblioteca do Snowpark o comprimirá automaticamente e o carregará como um arquivo zip. (Para obter mais detalhes sobre recursos de leitura de uma UDF, consulte Como ler arquivos com uma UDF).

Quando você chama Session.add_import(), a biblioteca do Snowpark carrega os arquivos especificados em um estágio interno e importa os arquivos ao executar sua UDF.

O exemplo a seguir demonstra como adicionar um arquivo zip em um estágio como uma dependência em relação ao seu código:

# Add a zip file that you uploaded to a stage.
session.add_import("@my_stage/<path>/my_library.zip")
Copy

Os exemplos a seguir demonstram como adicionar um arquivo de Python a partir de sua máquina local:

# Import a Python file from your local machine.
session.add_import("/<path>/my_module.py")

# Import a Python file from your local machine and specify a relative Python import path.
session.add_import("/<path>/my_module.py", import_path="my_dir.my_module")
Copy

Os exemplos a seguir demonstram como adicionar outros tipos de dependências:

# Add a directory of resource files.
session.add_import("/<path>/my-resource-dir/")

# Add a resource file.
session.add_import("/<path>/my-resource.xml")
Copy

Nota

A biblioteca Python Snowpark não é carregada automaticamente.

Você não precisa especificar as seguintes dependências:

  • Suas bibliotecas incluídas no Python.

    Essas bibliotecas já estão disponíveis no ambiente de runtime no servidor onde suas UDFs são executadas.

Como usar pacotes de terceiros do Anaconda em uma UDF

Você pode usar pacotes de terceiros do canal Snowflake Anaconda em uma UDF.

  • Se você criar uma UDF Python em uma planilha Python, os pacotes Anaconda já estarão disponíveis em sua planilha. Consulte Como adicionar um arquivo Python de um estágio a uma planilha.

  • Se você criar uma UDF Python em seu ambiente de desenvolvimento local, você poderá especificar quais pacotes Anaconda serão instalados.

Quando consultas que chamam UDFs de Python são executadas dentro de um warehouse do Snowflake, os pacotes do Anaconda são instalados sem problemas e armazenados em cache no warehouse virtual em seu nome.

Para obter mais informações sobre práticas recomendadas, como visualizar os pacotes disponíveis e como criar um ambiente de desenvolvimento local, consulte Como usar pacotes de terceiros.

Se você escrever uma UDF Python em seu ambiente de desenvolvimento local, use session.add_packages para adicionar pacotes em nível de sessão.

Este exemplo de código mostra como importar pacotes e retornar suas versões.

import numpy as np
import pandas as pd
import xgboost as xgb
from snowflake.snowpark.functions import udf

session.add_packages("numpy", "pandas", "xgboost==1.5.0")

@udf
def compute() -> list:
   return [np.__version__, pd.__version__, xgb.__version__]
Copy

Você também pode usar session.add_requirements para especificar pacotes com um arquivo requirements.

session.add_requirements("mydir/requirements.txt")
Copy

Você pode adicionar os pacotes no nível da UDF para substituir os pacotes no nível da sessão que você possa ter adicionado anteriormente.

import numpy as np
import pandas as pd
import xgboost as xgb
from snowflake.snowpark.functions import udf

@udf(packages=["numpy", "pandas", "xgboost==1.5.0"])
   def compute() -> list:
   return [np.__version__, pd.__version__, xgb.__version__]
Copy

Importante

Se você não especificar uma versão de pacote, o Snowflake usará a versão mais recente ao resolver as dependências. Ao implementar a UDF para produção, você pode querer garantir que seu código sempre utilize as mesmas versões de dependência. Você pode fazer isso tanto para UDFs permanentes como para temporárias.

  • Quando você cria uma UDF permanente, a UDF é criada e registrada apenas uma vez. Isso resolve as dependências uma vez e a versão selecionada é usada para cargas de trabalho de produção. Quando a UDF é executada, ela sempre usará as mesmas versões de dependência.

  • Quando você criar uma UDF temporária, especifique as versões de dependência como parte da especificação da versão. Desta forma, quando a UDF for registrada, a resolução do pacote usará a versão especificada. Se você não especificar a versão, a dependência poderá ser atualizada quando uma nova versão estiver disponível.

Como criar uma UDF anônima

Para criar uma UDF anônima, você pode:

  • Chamar a função udf no módulo snowflake.snowpark.functions passando a definição da função anônima.

  • Chamar o método register na classe UDFRegistration passando a definição da função anônima.

Aqui está um exemplo de uma UDF anônima:

from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import udf

add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()])
Copy

Nota

Ao escrever um código que possa ser executado em várias sessões, use o método register para registrar UDFs em vez de usar a função udf. Isso pode evitar erros nos quais o objeto Session padrão do Snowflake não pode ser encontrado.

Criação e registro de uma UDF nomeada

Se você quiser chamar uma UDF pelo nome (por exemplo, usando a função call_udf no módulo functions), você pode criar e registrar uma UDF nomeada. Para fazer isso, use uma das seguintes opções:

  • O método register, na classe UDFRegistration, com o argumento name.

  • A função udf, no módulo snowflake.snowpark.functions, com o argumento name.

Para acessar um atributo ou método da classe UDFRegistration, chame a propriedade udf da classe Session.

Chamar register ou udf criará uma UDF temporária que você pode usar na sessão atual.

Para criar uma UDF permanente, chame o método register ou a função udf e defina o argumento is_permanent como True. Quando você cria uma UDF permanente, você também deve definir o argumento stage_location como o local do estágio onde o arquivo de Python para a UDF e suas dependências são carregados.

Aqui está um exemplo de como registrar uma UDF nomeada temporária:

from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import udf

add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()], name="my_udf", replace=True)
Copy

Aqui está um exemplo de como registrar uma UDF nomeada permanente definindo o argumento is_permanent como True:

@udf(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True)
def minus_one(x: int) -> int:
   return x-1
Copy

Aqui está um exemplo de como essas UDFs são chamadas:

df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b")
df.select(add_one("a"), minus_one("b")).collect()
Copy
[Row(MY_UDF("A")=2, MINUS_ONE("B")=1), Row(MY_UDF("A")=4, MINUS_ONE("B")=3)]

Você também pode chamar a UDF usando SQL:

session.sql("select minus_one(1)").collect()
Copy
[Row(MINUS_ONE(1)=0)]

Criação de um UDF a partir de um arquivo de origem Python

Se você criar sua UDF em seu ambiente de desenvolvimento local, você também poderá definir seu manipulador da UDF em um arquivo Python e depois usar o método register_from_file na classe UDFRegistration para criar uma UDF.

Nota

Você não pode usar este método em uma planilha Python.

Aqui estão exemplos da utilização de register_from_file.

Suponha que você tenha um arquivo Python test_udf_file.py que contenha:

def mod5(x: int) -> int:
    return x % 5
Copy

Depois você pode criar uma UDF a partir desta função do arquivo test_udf_file.py.

# mod5() in that file has type hints
mod5_udf = session.udf.register_from_file(
      file_path="tests/resources/test_udf_dir/test_udf_file.py",
      func_name="mod5",
   )
   session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()
Copy
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]

Você também pode carregar o arquivo em um local preparado, depois usá-lo para criar a UDF.

from snowflake.snowpark.types import IntegerType
# suppose you have uploaded test_udf_file.py to stage location @mystage.
mod5_udf = session.udf.register_from_file(
   file_path="@mystage/test_udf_file.py",
   func_name="mod5",
   return_type=IntegerType(),
   input_types=[IntegerType()],
)
session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()
Copy
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]

Como ler arquivos com uma UDF

Para ler o conteúdo de um arquivo, seu código Python pode:

Como ler arquivos especificados estaticamente

A biblioteca Snowpark carrega e executa UDFs no servidor. Se sua UDF precisa ler dados de um arquivo, você deve assegurar-se de que o arquivo seja carregado com a UDF.

Nota

Se você escrever sua UDF em uma planilha Python, a UDF só poderá ler arquivos de um estágio.

Para configurar uma UDF para ler um arquivo:

  1. Especifique que o arquivo é uma dependência que faz o upload do arquivo para o servidor. Para obter mais informações, consulte Especificação de dependências para uma UDF.

    Por exemplo:

     # Import a file from your local machine as a dependency.
     session.add_import("/<path>/my_file.txt")
    
    # Or import a file that you uploaded to a stage as a dependency.
     session.add_import("@my_stage/<path>/my_file.txt")
    
    Copy
  2. Na UDF, leia o arquivo. No exemplo a seguir, o arquivo só será lido uma vez durante a criação do UDF, e não será lido novamente durante a execução do UDF. Isso é possível com uma biblioteca de terceiros cachetools.

    import sys
    import os
    import cachetools
    from snowflake.snowpark.types import StringType
    @cachetools.cached(cache={})
    def read_file(filename):
       import_dir = sys._xoptions.get("snowflake_import_directory")
          if import_dir:
             with open(os.path.join(import_dir, filename), "r") as f:
                return f.read()
    
       # create a temporary text file for test
    temp_file_name = "/tmp/temp.txt"
    with open(temp_file_name, "w") as t:
       _ = t.write("snowpark")
    session.add_import(temp_file_name)
    session.add_packages("cachetools")
    
    def add_suffix(s):
       return f"{read_file(os.path.basename(temp_file_name))}-{s}"
    
    concat_file_content_with_str_udf = session.udf.register(
          add_suffix,
          return_type=StringType(),
          input_types=[StringType()]
       )
    
    df = session.create_dataframe(["snowflake", "python"], schema=["a"])
    df.select(concat_file_content_with_str_udf("a")).to_df("col1").collect()
    
    Copy
    [Row(COL1='snowpark-snowflake'), Row(COL1='snowpark-python')]
    
    os.remove(temp_file_name)
    session.clear_imports()
    
    Copy

Como ler arquivos especificados dinamicamente com SnowflakeFile

Você pode ler um arquivo de um estágio usando a classe SnowflakeFile no módulo do Snowpark snowflake.snowpark.files. A classe SnowflakeFile fornece acesso dinâmico a arquivos, o que permite transmitir arquivos de qualquer tamanho. O acesso dinâmico a arquivos também é útil quando você deseja iterar vários arquivos. Para exemplos, consulte Processamento de vários arquivos.

Para obter mais informações e exemplos de leitura de arquivos usando SnowflakeFile, consulte Como ler um arquivo usando a classe SnowflakeFile em um manipulador de UDF Pythin.

O exemplo a seguir registra uma UDF temporária que lê um arquivo de texto de um estágio usando SnowflakeFile e retorna o tamanho do arquivo.

Registre a UDF:

import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import udf
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark.types import StringType, IntegerType

@udf(name="get_file_length", replace=True, input_types=[StringType()], return_type=IntegerType(), packages=['snowflake-snowpark-python'])
def get_file_length(file_path):
  with SnowflakeFile.open(file_path) as f:
    s = f.read()
  return len(s);
Copy

Chame a UDF:

session.sql("select get_file_length(build_scoped_file_url(@my_stage, 'example-file.txt'));")
Copy

Gravação de arquivos de UDFs e UDTFs do Snowpark Python

Com o Snowpark Python, você pode gravar arquivos em estágios com funções definidas pelo usuário (UDFs), UDFs vetorizadas, funções de tabela definidas pelo usuário (UDTFs) e UDTFs vetorizadas. No manipulador de funções, você usa a SnowflakeFile API para abrir e gravar arquivos. Quando você retorna o arquivo da função, o arquivo é gravado junto com os resultados de consulta.

Uma UDF simples para gravar um arquivo pode ter a seguinte aparência:

CREATE OR REPLACE FUNCTION write_file()
RETURNS STRING
LANGUAGE PYTHON
VOLATILE
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'write_file'
AS
$$
from snowflake.snowpark.files import SnowflakeFile

def write_file():
  file = SnowflakeFile.open_new_result("w") # Open a new result file
  file.write("Hello world")                 # Write data
  return file               # File must be returned
$$;
Copy

Ao executar essa UDF, você receberá um URL com escopo que faz referência ao arquivo de resultados.

Acesso aos arquivos de resultados

Um manipulador de arquivos é retornado como um URL com escopo para a consulta que chama a UDF. Você pode usar esse URL de escopo específico para acessar arquivos de dentro do Snowflake (por meio de outra UDF ou do comando COPY FILES), mas não de fora do Snowflake como um URL pré-assinado. O URL do escopo é válido por 24 horas.

Depois que um arquivo é retornado por uma UDF, você pode acessá-lo usando qualquer uma das seguintes ferramentas de armazenamento, dependendo do seu caso de uso:

  • COPY FILES: copiar o arquivo para outro local do estágio. Depois que o arquivo for copiado, você poderá usá-lo como um arquivo preparado típico, por exemplo, usando as seguintes ferramentas:

    • Tabelas de diretório: consultar uma lista de arquivos em um estágio usando uma cláusula WHERE para filtrar, se necessário.

    • GET_PRESIGNED_URL: gerar um URL para o estágio/arquivo.

    • Estágios externos: acessar o arquivo fora do Snowflake por meio de APIs do provedor de nuvem.

  • UDF: ler o arquivo em outra consulta.

Para obter mais informações, consulte COPY FILES.

Limitações

  • Esse recurso não está disponível para Java ou Scala.

  • Os procedimentos armazenados também suportam gravações em arquivos, mas não podem ser facilmente encadeados com um comando COPY FILES. Portanto, para gravações de arquivos usando procedimentos armazenados, recomendamos o uso do comando PUT de preparação de arquivo.

Exemplos

Esta seção inclui exemplos de código que mostram como gravar arquivos em estágios para diferentes casos de uso.

Transformação de arquivo

A seguir, um exemplo do manipulador de UDF que transforma um arquivo. Você pode modificar esse exemplo para fazer diferentes tipos de transformação de arquivos, como, por exemplo, a transformação de um arquivo em outro:

  • Converter de um formato de arquivo para outro formato.

  • Redimensionar uma imagem.

  • Transformar os arquivos em um «estado dourado» em uma pasta de formato com registro de data e hora no mesmo bucket ou em outro.

CREATE OR REPLACE FUNCTION convert_to_foo(filename string)
RETURNS STRING
LANGUAGE PYTHON
VOLATILE
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'convert_to_foo'
AS
$$
from snowflake.snowpark.files import SnowflakeFile

def convert_to_foo(filename):
  input_file = SnowflakeFile.open(filename, "r")
  converted_file = SnowflakeFile.open_new_result("w")

  # Foo-type is just adding foo at the end of every line
  for line in input_file.readlines():
    converted_file.write(line[:-1] + 'foo' + '\n')
  return converted_file
$$;
Copy

Você pode chamar essa UDF em uma consulta e, em seguida, acessar o arquivo de resultados converted_file gravado pela UDF.

Os exemplos a seguir do SQL mostram o que você pode fazer com os arquivos de resultados retornados pelas UDFs, como copiá-los para um estágio ou consumi-los em uma consulta subsequente ou em outra UDF. Esses padrões básicos do SQL são aplicáveis a todos os exemplos de gravação de arquivos da UDF incluídos neste tópico. Por exemplo, você pode usar o URL de consulta pré-assinada para qualquer um dos seguintes exemplos da UDF, usando-a no lugar de outra declaração SQL.

Exemplo 1: Converter um único arquivo e copiá-lo para um estágio final
COPY FILES INTO @output_stage FROM
  (SELECT convert_to_foo(BUILD_SCOPED_FILE_URL(@input_stage, 'in.txt')), 'out.foo.txt');
Copy
Exemplo 2: Converter uma tabela de arquivos e copiá-los para um estágio final
CREATE TABLE files_to_convert(file string);
-- Populate files_to_convert with input files:
INSERT INTO files_to_convert VALUES ('file1.txt');
INSERT INTO files_to_convert VALUES ('file2.txt');

COPY FILES INTO @output_stage FROM
  (SELECT convert_to_foo(BUILD_SCOPED_FILE_URL(@input_stage, file)),
      REPLACE(file, '.txt', '.foo.txt') FROM files_to_convert);
Copy
Exemplo 3: Converter todos os arquivos em um estágio e copiá-los para um estágio final
COPY FILES INTO @output_stage FROM
  (SELECT convert_to_foo(BUILD_SCOPED_FILE_URL(@input_stage, RELATIVE_PATH)),
      REPLACE(RELATIVE_PATH, 'format1', 'format2') FROM DIRECTORY(@input_stage));
Copy
Exemplo 4: Converter todos os arquivos de uma tabela e lê-los sem copiar
-- A basic UDF to read a file:
CREATE OR REPLACE FUNCTION read_udf(filename string)
RETURNS STRING
LANGUAGE PYTHON
VOLATILE
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'read'
AS
'
from snowflake.snowpark.files import SnowflakeFile

def read(filename):
  return SnowflakeFile.open(filename, "r").read()
';
Copy
-- Create files_to_convert as in Example 2.

SELECT convert_to_foo(BUILD_SCOPED_FILE_URL(@input_stage, file)) as new_file
  FROM files_to_convert;
-- The following query must be run within 24 hours from the prior one
SELECT read_udf(new_file) FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()));
Copy
Exemplo 5: Converter todos os arquivos de uma tabela e lê-los imediatamente por meio de uma UDF
-- Set up files_to_convert as in Example 2.
-- Set up read_udf as in Example 4.

SELECT read_udf(
    convert_to_foo(BUILD_SCOPED_FILE_URL(@input_stage, file))) FROM files_to_convert;
Copy
Exemplo 6: Ler usando URLs pré-assinados

Este exemplo é apenas para estágios com criptografia no lado do servidor. Os estágios internos têm criptografia no lado do cliente por padrão.

COPY FILES INTO @output_stage FROM
  (SELECT convert_to_foo(BUILD_SCOPED_FILE_URL(@input_stage, file)) FROM files_to_convert);

-- Refresh the directory to get new files in output_stage.
ALTER STAGE output_stage REFRESH;
Copy

Criação de um PDF a partir de uma partição de dados da tabela e copie-o para um local final

O exemplo de manipulador de UDF a seguir particiona os dados de entrada e grava um relatório em PDF para cada partição dos dados. Este exemplo divide os relatórios pela cadeia de caracteres location.

Você também pode modificar esse exemplo para gravar outros tipos de arquivos, como modelos ML e outros formatos personalizados para cada partição.

-- Create a stage that includes the font (for PDF creation)

CREATE OR REPLACE STAGE fonts
URL = 's3://sfquickstarts/misc/';

-- UDF to write the data
CREATE OR REPLACE FUNCTION create_report_pdf(data string)
RETURNS TABLE (file string)
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
HANDLER='CreateReport'
PACKAGES = ('snowflake-snowpark-python', 'fpdf')
IMPORTS  = ('@fonts/DejaVuSans.ttf')
AS $$
from snowflake.snowpark.files import SnowflakeFile
from fpdf import FPDF
import shutil
import sys
import uuid
import_dir = sys._xoptions["snowflake_import_directory"]

class CreateReport:
  def __init__(self):
      self.pdf = None

  def process(self, data):
      if self.pdf == None:
        # PDF library edits this file, make sure it's unique.
        font_file = f'/tmp/DejaVuSans-{uuid.uuid4()}.ttf'
        shutil.copy(f'{import_dir}/DejaVuSans.ttf', font_file)
        self.pdf = FPDF()
        self.pdf.add_page()
        self.pdf.add_font('DejaVu', '', font_file, uni=True)
        self.pdf.set_font('DejaVu', '', 14)
      self.pdf.write(8, data)
      self.pdf.ln(8)

  def end_partition(self):
      f = SnowflakeFile.open_new_result("wb")
      f.write(self.pdf.output(dest='S').encode('latin-1'))
      yield f,
$$;
Copy

O exemplo de SQL a seguir configura a tabela reportData com dados fictícios e cria o estágio output_stage. Em seguida, ele chama a UDF create_report_pdf para criar um arquivo PDF usando os dados que ele consulta na tabela reportData. Na mesma instrução SQL, o comando COPY FILES copia o arquivo de resultado da UDF para output_stage.

Nota

Usamos um estágio de saída criptografado no lado do servidor (SSE) porque o URL pré-assinado para um arquivo em um estágio SSE fará o download de um arquivo não criptografado. Em geral, recomendamos a criptografia de estágio padrão nos estágios, pois o arquivo é criptografado no lado do cliente e é mais seguro. Os arquivos em estágios normais ainda podem ser lidos por meio de UDFs ou GET/PUT, mas não por meio de URLs pré-assinados. Certifique-se de compreender as implicações de segurança antes de usar um estágio SSE em um ambiente de produção.

 -- Fictitious data
 CREATE OR REPLACE TABLE reportData(location string, item string);
 INSERT INTO reportData VALUES ('SanMateo' ,'Item A');
 INSERT INTO reportData VALUES ('SanMateo' ,'Item Z');
 INSERT INTO reportData VALUES ('SanMateo' ,'Item X');
 INSERT INTO reportData VALUES ('Bellevue' ,'Item B');
 INSERT INTO reportData VALUES ('Bellevue' ,'Item Q');

 -- Presigned URLs only work with SSE stages, see note above.
 CREATE OR REPLACE STAGE output_stage ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE');

 COPY FILES INTO @output_stage
   FROM (SELECT reports.file, location || '.pdf'
           FROM reportData, TABLE(create_report_pdf(item)
           OVER (partition BY location)) AS reports);

 -- Check the results
LIST @output_stage;
SELECT GET_PRESIGNED_URL(@output_stage, 'SanMateo.pdf');
Copy

Divisão de arquivos e descarga em várias tabelas

O exemplo de manipulador de UDF a seguir divide um arquivo CSV por linha com base no primeiro caractere de cada linha. Em seguida, a UDF descarrega os arquivos divididos em várias tabelas.

CREATE OR REPLACE FUNCTION split_file(path string)
RETURNS TABLE(file string, name string)
LANGUAGE PYTHON
VOLATILE
PACKAGES = ('snowflake-snowpark-python')
RUNTIME_VERSION = 3.9
HANDLER = 'SplitCsvFile'
AS $$
import csv
from snowflake.snowpark.files import SnowflakeFile

class SplitCsvFile:
    def process(self, file):
      toTable1 = SnowflakeFile.open_new_result("w")
      toTable1Csv = csv.writer(toTable1)
      toTable2 = SnowflakeFile.open_new_result("w")
      toTable2Csv = csv.writer(toTable2)
      toTable3 = SnowflakeFile.open_new_result("w")
      toTable3Csv = csv.writer(toTable3)
      with SnowflakeFile.open(file, 'r') as file:
        # File is of the format 1:itemA \n 2:itemB \n [...]
        for line in file.readlines():
          forTable = line[0]
          if (forTable == "1"):
            toTable1Csv.writerow([line[2:-1]])
          if (forTable == "2"):
            toTable2Csv.writerow([line[2:-1]])
          if (forTable == "3"):
            toTable3Csv.writerow([line[2:-1]])
      yield toTable1, 'table1.csv'
      yield toTable2, 'table2.csv'
      yield toTable3, 'table3.csv'
$$;
-- Create a stage with access to an import file.
CREATE OR REPLACE STAGE staged_files url="s3://sfquickstarts/misc/";

-- Add the files to be split into a table - we just add one.
CREATE OR REPLACE TABLE filesToSplit(path string);
INSERT INTO filesToSplit VALUES ( 'items.txt');

-- Create output tables
CREATE OR REPLACE TABLE table1(item string);
CREATE OR REPLACE TABLE table2(item string);
CREATE OR REPLACE TABLE table3(item string);

-- Create output stage
CREATE OR REPLACE stage output_stage;

-- Creates files named path-tableX.csv
COPY FILES INTO @output_stage FROM
  (SELECT file, path || '-' || name FROM filesToSplit, TABLE(split_file(build_scoped_file_url(@staged_files, path))));

-- We use pattern and COPY INTO (not COPY FILES INTO) to upload to a final table.
COPY INTO table1 FROM @output_stage PATTERN = '.*.table1.csv';
COPY INTO table2 FROM @output_stage PATTERN = '.*.table2.csv';
COPY INTO table3 FROM @output_stage PATTERN = '.*.table3.csv';

-- See results
SELECT * from table1;
SELECT * from table2;
SELECT * from table3;
Copy

Uso de UDFs vetorizadas

As UDFs de Python vetorizadas permitem que você defina funções Python que recebem lotes de linhas de entrada como DataFrames do Pandas e retornam lotes de resultados como matrizes ou séries do Pandas. A coluna no dataframe do Snowpark será vetorizada como uma Series do Pandas dentro da UDF.

Aqui está um exemplo de como usar a interface de lote:

from sklearn.linear_model import LinearRegression
model = LinearRegression()
model.fit(X, y)

@udf(packages=['pandas', 'scikit-learn','xgboost'])
def predict(df: PandasDataFrame[float, float, float, float]) -> PandasSeries[float]:
    # The input pandas DataFrame doesn't include column names. Specify the column names explicitly when needed.
    df.columns = ["col1", "col2", "col3", "col4"]
    return model.predict(df)
Copy

Você chama UDFs vetorizadas de Python da mesma forma que você chama outras UDFs de Python. Para obter mais informações, consulte UDFs vetorizadas de Python, que explica como criar um UDF vetorizado usando uma instrução SQL. Por exemplo, você pode usar o decorador vectorized quando especificar o código Python na instrução SQL. Ao usar a API Snowpark Python descrita neste documento, você não usará uma instrução SQL para criar uma UDF vetorizada. Então você não usa o decorador vectorized.

É possível limitar o número de linhas por lote. Para obter mais informações, consulte Como definir um tamanho alvo de lote.

Para obter mais explicações e exemplos de como usar a API Snowpark Python para criar UDFs vetorizados, consulte a seção de UDFs da referência de API Snowpark.