Criação de procedimentos armazenados para DataFrames em Python

A API do Snowpark fornece métodos que você pode usar para criar um procedimento armazenado em Python. Este tópico explica como criar procedimentos armazenados.

Neste tópico:

Introdução

Com o Snowpark, você pode criar procedimentos armazenados para suas lambdas e funções personalizadas, e pode chamar esses procedimentos armazenados para processar os dados em seu DataFrame.

Você pode criar procedimentos armazenados que só existem dentro da sessão atual (procedimentos armazenados temporários) assim como procedimentos armazenados que você pode usar em outras sessões (procedimentos armazenados permanentes).

Como usar pacotes de terceiros do Anaconda em um procedimento armazenado

Você pode especificar pacotes do Anaconda a serem instalados quando você cria procedimentos armazenados de Python. Ao chamar o procedimento armazenado de Python são executadas dentro de um warehouse do Snowflake, pacotes do Anaconda são instalados sem problemas e armazenados em cache no warehouse virtual em seu nome. Para obter mais informações sobre práticas recomendadas, como visualizar os pacotes disponíveis e como criar um ambiente de desenvolvimento local, consulte Como usar pacotes de terceiros.

Use session.add_packages para adicionar pacotes no nível da sessão.

Este exemplo de código mostra como importar pacotes e retornar suas versões.

>>> import pandas as pd
>>> import snowflake.snowpark
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import sproc

>>> session.add_packages("snowflake-snowpark-python", "pandas", "xgboost==1.5.0")

>>> @sproc
... def compute(session: snowflake.snowpark.Session) -> list:
...   return [pd.__version__, xgb.__version__]
Copy

Você também pode usar session.add_requirements para especificar pacotes com um arquivo requirements.

>>> session.add_requirements("mydir/requirements.txt")
Copy

Você pode adicionar os pacotes no nível do procedimento armazenado para substituir os pacotes no nível da sessão que você possa ter adicionado anteriormente.

>>> import pandas as pd
>>> import snowflake.snowpark
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import sproc

>>> @sproc(packages=["snowflake-snowpark-python", "pandas", "xgboost==1.5.0"])
... def compute(session: snowflake.snowpark.Session) -> list:
...    return [pd.__version__, xgb.__version__]
Copy

Importante

Se você não especificar uma versão de pacote, o Snowflake utilizará a versão mais recente ao resolver as dependências. No entanto, ao implementar o procedimento armazenado para produção, você pode querer garantir que seu código sempre utilize as mesmas versões de dependência. Você pode fazer isso tanto para procedimentos armazenados permanentes como para temporários.

  • Quando você cria um procedimento armazenado permanente, o procedimento armazenado é criado e registrado apenas uma vez. Isso resolve as dependências uma vez e a versão selecionada é usada para cargas de trabalho de produção. Quando o procedimento armazenado for executado, ele sempre usará as mesmas versões de dependência.

  • Quando você criar um procedimento armazenado temporário, especifique as versões de dependência como parte da especificação da versão. Dessa forma, quando o procedimento armazenado for registrado, a resolução do pacote usará a versão especificada. Se você não especificar a versão, a dependência poderá ser atualizada quando uma nova versão estiver disponível.

Como criar um procedimento armazenado anônimo

Para criar um procedimento armazenado anônimo, você pode

  • Chamar a função sproc no módulo snowflake.snowpark.functions passando a definição da função anônima.

  • Chamar o método register na classe StoredProcedureRegistration passando a definição da função anônima. Para acessar um atributo ou método da classe StoredProcedureRegistration, chame a propriedade sproc da classe Session.

Aqui está um exemplo de um procedimento armazenado anônimo:

>>> from snowflake.snowpark.functions import sproc
>>> from snowflake.snowpark.types import IntegerType

>>> add_one = sproc(lambda session, x: session.sql(f"select {x} + 1").collect()[0][0], return_type=IntegerType(), input_types=[IntegerType()], packages=["snowflake-snowpark-python"])
Copy

Nota

Ao escrever um código que possa ser executado em várias sessões, use o método register para registrar procedimentos armazenados em vez de usar a função sproc. Isso pode evitar erros nos quais o objeto Session padrão do Snowflake não pode ser encontrado.

Criação e registro de um procedimento armazenado nomeado

Se você quiser chamar um procedimento armazenado pelo nome (por exemplo, usando a função call no objeto Session), você pode criar e registrar um procedimento armazenado nomeado. Para fazer isso, você pode:

  • Chamar a função sproc no módulo snowflake.snowpark.functions passando o argumento name e a definição da função anônima.

  • Chamar o método register na classe StoredProcedureRegistration passando o argumento name e a definição da função anônima. Para acessar um atributo ou método da classe StoredProcedureRegistration, chame a propriedade sproc da classe Session.

Chamar register ou sproc criará um procedimento armazenado temporário que você pode usar na sessão atual.

Para criar um procedimento armazenado permanente, chame o método register ou a função sproc e defina o argumento is_permanent como True. Quando você cria um procedimento armazenado permanente, você também deve definir o argumento stage_location como o local do estágio onde o conector de Python usado pelo Snowpark carrega o arquivo de Python para o procedimento armazenado e suas dependências.

Aqui está um exemplo de como registrar um procedimento armazenado temporário nomeado:

>>> from snowflake.snowpark.functions import sproc
>>> from snowflake.snowpark.types import IntegerType

>>> add_one = sproc(lambda session, x: session.sql(f"select {x} + 1").collect()[0][0],
return_type=IntegerType(), input_types=[IntegerType()], name="my_sproc", replace=True,
packages=["snowflake-snowpark-python"])
Copy

Aqui está um exemplo de como registrar um procedimento armazenado nomeado permanente definindo o argumento is_permanent como True:

>>> import snowflake.snowpark
>>> from snowflake.snowpark.functions import sproc

>>> @sproc(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True, packages=["snowflake-snowpark-python"])
... def minus_one(session: snowflake.snowpark.Session, x: int) -> int:
...  return session.sql(f"select {x} - 1").collect()[0][0]
Copy

Aqui está um exemplo de como esses procedimentos armazenados são chamados:

>>> add_one(1)
2
>>> session.call("minus_one", 1)
0
>>> session.sql("call minus_one(1)").collect()
[Row(MINUS_ONE(1)=0)]
Copy

Como ler arquivos com um procedimento armazenado

Para ler o conteúdo de um arquivo com um procedimento armazenado, você pode:

Como ler arquivos especificados estaticamente

  1. Especifique que o arquivo é uma dependência que faz o upload do arquivo para o servidor. Isso é feito da mesma forma que para UDFs. Para obter mais informações, consulte Especificação de dependências para uma UDF.

    Por exemplo:

    >>> # Import a file from your local machine as a dependency.
    >>> session.add_import("/<path>/my_file.txt")
    
    >>> # Or import a file that you uploaded to a stage as a dependency.
    >>> session.add_import("@my_stage/<path>/my_file.txt")
    
    Copy
  2. No procedimento armazenado, leia o arquivo.

    >>> def read_file(name: str) -> str:
    ...    import sys
    ...    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    ...    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    ...
    ...    with open(import_dir + 'my_file.txt', 'r') as file:
    ...        return file.read()
    
    Copy

Como ler arquivos especificados dinamicamente com SnowflakeFile

Você pode ler um arquivo de um estágio usando a classe SnowflakeFile no módulo do Snowpark snowflake.snowpark.files. A classe SnowflakeFile fornece acesso dinâmico a arquivos, o que permite transmitir arquivos de qualquer tamanho. O acesso dinâmico a arquivos também é útil quando você deseja iterar vários arquivos. Para exemplos, consulte Processamento de vários arquivos.

Para obter mais informações e exemplos de leitura de arquivos usando SnowflakeFile, consulte Como ler um arquivo usando a classe SnowflakeFile em um manipulador de UDF Pythin.

O exemplo a seguir cria um procedimento armazenado permanente que lê um arquivo de um estágio usando SnowflakeFile e retorna o tamanho do arquivo.

Crie o procedimento armazenado:

import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import sproc
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark.types import StringType, IntegerType

@sproc(name="calc_size", is_permanent=True, stage_location="@my_procedures", replace=True, packages=["snowflake-snowpark-python"])
def calc_size(ignored_session: snowpark.Session, file_path: str) -> int:
  with SnowflakeFile.open(file_path) as f:
    s = f.read()
  return len(s);
Copy

Chame o procedimento armazenado:

file_size = session.sql("call calc_size(build_scoped_file_url('@my_stage', 'my_file.csv'))")
Copy