Criação de procedimentos armazenados para DataFrames em Python¶
A API do Snowpark fornece métodos que você pode usar para criar um procedimento armazenado em Python. Este tópico explica como criar procedimentos armazenados.
Neste tópico:
Introdução¶
Com o Snowpark, você pode criar procedimentos armazenados para suas lambdas e funções personalizadas, e pode chamar esses procedimentos armazenados para processar os dados em seu DataFrame.
Você pode criar procedimentos armazenados que só existem dentro da sessão atual (procedimentos armazenados temporários) assim como procedimentos armazenados que você pode usar em outras sessões (procedimentos armazenados permanentes).
Como usar pacotes de terceiros do Anaconda em um procedimento armazenado¶
Você pode especificar pacotes do Anaconda a serem instalados quando você cria procedimentos armazenados de Python. Ao chamar o procedimento armazenado de Python são executadas dentro de um warehouse do Snowflake, pacotes do Anaconda são instalados sem problemas e armazenados em cache no warehouse virtual em seu nome. Para obter mais informações sobre práticas recomendadas, como visualizar os pacotes disponíveis e como criar um ambiente de desenvolvimento local, consulte Como usar pacotes de terceiros.
Use session.add_packages
para adicionar pacotes no nível da sessão.
Este exemplo de código mostra como importar pacotes e retornar suas versões.
>>> import pandas as pd
>>> import snowflake.snowpark
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import sproc
>>> session.add_packages("snowflake-snowpark-python", "pandas", "xgboost==1.5.0")
>>> @sproc
... def compute(session: snowflake.snowpark.Session) -> list:
... return [pd.__version__, xgb.__version__]
Você também pode usar session.add_requirements
para especificar pacotes com um arquivo requirements.
>>> session.add_requirements("mydir/requirements.txt")
Você pode adicionar os pacotes no nível do procedimento armazenado para substituir os pacotes no nível da sessão que você possa ter adicionado anteriormente.
>>> import pandas as pd
>>> import snowflake.snowpark
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import sproc
>>> @sproc(packages=["snowflake-snowpark-python", "pandas", "xgboost==1.5.0"])
... def compute(session: snowflake.snowpark.Session) -> list:
... return [pd.__version__, xgb.__version__]
Importante
Se você não especificar uma versão de pacote, o Snowflake utilizará a versão mais recente ao resolver as dependências. No entanto, ao implementar o procedimento armazenado para produção, você pode querer garantir que seu código sempre utilize as mesmas versões de dependência. Você pode fazer isso tanto para procedimentos armazenados permanentes como para temporários.
Quando você cria um procedimento armazenado permanente, o procedimento armazenado é criado e registrado apenas uma vez. Isso resolve as dependências uma vez e a versão selecionada é usada para cargas de trabalho de produção. Quando o procedimento armazenado for executado, ele sempre usará as mesmas versões de dependência.
Quando você criar um procedimento armazenado temporário, especifique as versões de dependência como parte da especificação da versão. Dessa forma, quando o procedimento armazenado for registrado, a resolução do pacote usará a versão especificada. Se você não especificar a versão, a dependência poderá ser atualizada quando uma nova versão estiver disponível.
Como criar um procedimento armazenado anônimo¶
Para criar um procedimento armazenado anônimo, você pode
Chamar a função
sproc
no módulosnowflake.snowpark.functions
passando a definição da função anônima.Chamar o método
register
na classeStoredProcedureRegistration
passando a definição da função anônima. Para acessar um atributo ou método da classeStoredProcedureRegistration
, chame a propriedadesproc
da classeSession
.
Aqui está um exemplo de um procedimento armazenado anônimo:
>>> from snowflake.snowpark.functions import sproc
>>> from snowflake.snowpark.types import IntegerType
>>> add_one = sproc(lambda session, x: session.sql(f"select {x} + 1").collect()[0][0], return_type=IntegerType(), input_types=[IntegerType()], packages=["snowflake-snowpark-python"])
Nota
Ao escrever um código que possa ser executado em várias sessões, use o método register
para registrar procedimentos armazenados em vez de usar a função sproc
. Isso pode evitar erros nos quais o objeto Session
padrão do Snowflake não pode ser encontrado.
Criação e registro de um procedimento armazenado nomeado¶
Se você quiser chamar um procedimento armazenado pelo nome (por exemplo, usando a função call
no objeto Session
), você pode criar e registrar um procedimento armazenado nomeado. Para fazer isso, você pode:
Chamar a função
sproc
no módulosnowflake.snowpark.functions
passando o argumentoname
e a definição da função anônima.Chamar o método
register
na classeStoredProcedureRegistration
passando o argumentoname
e a definição da função anônima. Para acessar um atributo ou método da classeStoredProcedureRegistration
, chame a propriedadesproc
da classeSession
.
Chamar register
ou sproc
criará um procedimento armazenado temporário que você pode usar na sessão atual.
Para criar um procedimento armazenado permanente, chame o método register
ou a função sproc
e defina o argumento is_permanent
como True
. Quando você cria um procedimento armazenado permanente, você também deve definir o argumento stage_location
como o local do estágio onde o conector de Python usado pelo Snowpark carrega o arquivo de Python para o procedimento armazenado e suas dependências.
Aqui está um exemplo de como registrar um procedimento armazenado temporário nomeado:
>>> from snowflake.snowpark.functions import sproc
>>> from snowflake.snowpark.types import IntegerType
>>> add_one = sproc(lambda session, x: session.sql(f"select {x} + 1").collect()[0][0],
return_type=IntegerType(), input_types=[IntegerType()], name="my_sproc", replace=True,
packages=["snowflake-snowpark-python"])
Aqui está um exemplo de como registrar um procedimento armazenado nomeado permanente definindo o argumento is_permanent
como True
:
>>> import snowflake.snowpark
>>> from snowflake.snowpark.functions import sproc
>>> @sproc(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True, packages=["snowflake-snowpark-python"])
... def minus_one(session: snowflake.snowpark.Session, x: int) -> int:
... return session.sql(f"select {x} - 1").collect()[0][0]
Aqui está um exemplo de como esses procedimentos armazenados são chamados:
>>> add_one(1)
2
>>> session.call("minus_one", 1)
0
>>> session.sql("call minus_one(1)").collect()
[Row(MINUS_ONE(1)=0)]
Como ler arquivos com um procedimento armazenado¶
Para ler o conteúdo de um arquivo com um procedimento armazenado, você pode:
Ler um arquivo especificado estaticamente importando um arquivo e, em seguida, lendo-o do diretório inicial do procedimento armazenado.
Ler um arquivo especificado dinamicamente com SnowflakeFile. Você pode fazer isso se precisar acessar um arquivo durante a computação.
Como ler arquivos especificados estaticamente¶
Especifique que o arquivo é uma dependência que faz o upload do arquivo para o servidor. Isso é feito da mesma forma que para UDFs. Para obter mais informações, consulte Especificação de dependências para uma UDF.
Por exemplo:
>>> # Import a file from your local machine as a dependency. >>> session.add_import("/<path>/my_file.txt") >>> # Or import a file that you uploaded to a stage as a dependency. >>> session.add_import("@my_stage/<path>/my_file.txt")
No procedimento armazenado, leia o arquivo.
>>> def read_file(name: str) -> str: ... import sys ... IMPORT_DIRECTORY_NAME = "snowflake_import_directory" ... import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME] ... ... with open(import_dir + 'my_file.txt', 'r') as file: ... return file.read()
Como ler arquivos especificados dinamicamente com SnowflakeFile
¶
Você pode ler um arquivo de um estágio usando a classe SnowflakeFile
no módulo do Snowpark snowflake.snowpark.files
. A classe SnowflakeFile
fornece acesso dinâmico a arquivos, o que permite transmitir arquivos de qualquer tamanho. O acesso dinâmico a arquivos também é útil quando você deseja iterar vários arquivos. Para exemplos, consulte Processamento de vários arquivos.
Para obter mais informações e exemplos de leitura de arquivos usando SnowflakeFile
, consulte Como ler um arquivo usando a classe SnowflakeFile em um manipulador de UDF Pythin.
O exemplo a seguir cria um procedimento armazenado permanente que lê um arquivo de um estágio usando SnowflakeFile
e retorna o tamanho do arquivo.
Crie o procedimento armazenado:
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import sproc
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark.types import StringType, IntegerType
@sproc(name="calc_size", is_permanent=True, stage_location="@my_procedures", replace=True, packages=["snowflake-snowpark-python"])
def calc_size(ignored_session: snowpark.Session, file_path: str) -> int:
with SnowflakeFile.open(file_path) as f:
s = f.read()
return len(s);
Chame o procedimento armazenado:
file_size = session.sql("call calc_size(build_scoped_file_url('@my_stage', 'my_file.csv'))")