Como escrever procedimentos armazenados em Python

Este tópico explica como escrever um procedimento armazenado em Python. Você pode usar a biblioteca do Snowpark dentro do seu procedimento armazenado para realizar consultas, atualizações e outros trabalhos em tabelas no Snowflake.

Neste tópico:

Introdução

Com os Procedimentos Armazenados do Snowpark, você pode construir e executar seu pipeline de dados dentro do Snowflake, usando um warehouse do Snowflake como estrutura computacional. Crie seu pipeline de dados usando a API Snowpark para Python para escrever procedimentos armazenados. Para programar a execução desses procedimentos armazenados, você usa tarefas.

Para obter mais informações sobre modelos de machine learning e Snowpark Python, consulte Treinamento dos modelos de machine learning com Snowpark Python.

Você pode escrever procedimentos armazenados Snowpark para Python usando uma planilha Python ou usando um ambiente de desenvolvimento local.

É possível capturar dados de registro e rastreamento à medida que o código do manipulador é executado. Para obter mais informações, consulte Visão geral do registro e do rastreamento.

Nota

Para criar e chamar um procedimento anônimo, use CALL (com procedimento anônimo). Criar e chamar um procedimento anônimo não exige uma função com privilégios de esquema CREATE PROCEDURE.

Pré-requisitos para escrever de procedimentos armazenados localmente

Para escrever procedimentos armazenados em Python em seu ambiente de desenvolvimento local, é preciso cumprir os seguintes pré-requisitos:

  • Você deve usar a versão 0.4.0 ou uma versão mais recente da biblioteca do Snowpark.

  • Habilite os Pacotes Anaconda para que o Snowpark Python possa carregar as dependências de terceiros necessárias. Consulte Como usar pacotes de terceiros do Anaconda.

  • As versões suportadas do Python são:

    • 3.8

    • 3.9

    • 3,10

    • 3,11

Configure seu ambiente de desenvolvimento para usar a biblioteca do Snowpark. Consulte Configuração do ambiente de desenvolvimento para o Snowpark.

Como escrever o código Python para o procedimento armazenado

Para sua lógica de procedimento, você escreve o código do manipulador que executa quando o procedimento é chamado. Esta seção descreve o projeto de um manipulador.

Você pode criar um procedimento armazenado a partir do código do manipulador de várias maneiras:

Limitações

Os Procedimentos Armazenados Snowpark têm as seguintes limitações:

  • Não há suporte para a criação de processos em procedimentos armazenados.

  • Não há suporte para a execução de consultas simultâneas em procedimentos armazenados.

  • Você não pode usar APIs que executam comandos PUT e GET, incluindo Session.sql("PUT ...") e Session.sql("GET ...").

  • Quando você baixa arquivos de um estágio usando session.file.get, a correspondência de padrões não é aceita.

  • Se você executar seu procedimento armazenado a partir de uma tarefa, você deverá especificar um warehouse ao criar a tarefa. Você não pode usar recursos de computação sem servidor para executar a tarefa.

  • A criação de objetos temporários nomeados não é suportada em um procedimento armazenado de direitos do proprietário. Um procedimento de armazenamento de direitos do proprietário é um procedimento armazenado que funciona com os privilégios do proprietário do procedimento armazenado. Para obter mais informações, consulte direitos do chamador ou direitos do proprietário.

Planejamento para escrever seu procedimento armazenado

Os procedimentos armazenados funcionam dentro do Snowflake e, portanto, é preciso planejar o código que se escreve com isso em mente.

  • Limite a quantidade de memória consumida O Snowflake impõe limites em um método em termos da quantidade de memória necessária. Para orientação, consulte Criação de manipuladores que ficam dentro das restrições impostas pelo Snowflake.

  • Certifique-se de que o método ou função do seu manipulador seja thread-safe.

  • Siga as regras e restrições de segurança. Consulte Práticas de segurança para UDFs e procedimentos.

  • Decida se você quer que o procedimento armazenado seja executado com direitos do chamador ou direitos do proprietário.

  • Considere a versão snowflake-snowpark-python usada para executar procedimentos armazenados. Devido a limitações no processo de liberação dos procedimentos armazenados, a biblioteca snowflake-snowpark-python disponível no ambiente do procedimento armazenado Python é geralmente uma versão anterior à versão lançada publicamente. Use o seguinte SQL para descobrir a última versão disponível:

    select * from information_schema.packages where package_name = 'snowflake-snowpark-python' order by version desc;
    
    Copy

Escrita do método ou função

Ao escrever o método ou função para o procedimento armazenado, observe o seguinte:

  • Especifique o objeto Session Snowpark como o primeiro argumento de seu método ou função. Quando você chama seu procedimento armazenado, o Snowflake cria automaticamente um objeto Session e o passa para seu procedimento armazenado. (Você, por si só, não pode criar o objeto Session).

  • Para o restante dos argumentos e para o valor de retorno, use os tipos Python que correspondem aos tipos de dados Snowflake. O Snowflake oferece suporte aos tipos de dados Python listados em Mapeamentos de tipos de dados SQL-Python para parâmetros e tipos de retorno

Tratamento de erros

Você pode usar as técnicas normais de tratamento de exceções do Python para capturar erros dentro do procedimento.

Se ocorrer uma exceção não capturada dentro do método, o Snowflake gera um erro que inclui o rastreamento de pilha da exceção. Quando o registro de exceções não tratadas está ativado, o Snowflake registra dados sobre exceções não tratadas em uma tabela de eventos.

Disponibilização das dependências para seus códigos

Se seu código de manipulador depender do código definido fora do próprio manipulador (como o código definido em um módulo) ou de arquivos de recursos, você pode disponibilizar essas dependências para seu código, carregando-os em um estágio. Consulte Disponibilização das dependências para seus códigos, ou para planilhas Python, consulte Como adicionar um arquivo Python de um estágio a uma planilha.

Se você criar seu procedimento armazenado usando SQL, use a cláusula IMPORTS ao escrever a instrução CREATE PROCEDURE, para apontar para os arquivos de dependência.

Acesso aos dados no Snowflake a partir do seu procedimento armazenado

Para acessar dados no Snowflake, use as APIs da biblioteca do Snowpark.

Ao tratar uma chamada para seu procedimento armazenado em Python, o Snowflake cria um objeto Session Snowpark e passa o objeto para o método ou função do seu procedimento armazenado.

Como no caso de procedimentos armazenados em outras linguagens, o contexto da sessão (por exemplo, os privilégios, a base de dados e o esquema atual etc.) é determinado pela condição de o procedimento armazenado ser executado com os direitos do chamador ou com os direitos do proprietário. Para obter mais detalhes, consulte Accessing and Setting the Session State.

Você pode usar este objeto Session para chamar APIs na Biblioteca do Snowpark. Por exemplo, você pode criar um DataFrame para uma tabela ou executar uma instrução SQL.

Consulte o Guia do Desenvolvedor do Snowpark para obter mais informações.

Exemplo de acesso aos dados

A seguir, um exemplo de um método Python que copia um número especificado de linhas de uma tabela para outra tabela. O método utiliza os seguintes argumentos:

  • Um objeto Session do Snowpark

  • O nome da tabela para copiar as linhas de

  • O nome da tabela para salvar as linhas para

  • O número de linhas a copiar

O método neste exemplo retorna uma cadeia de caracteres. Se você executar este exemplo em uma planilha Python, mude o tipo de retorno para a planilha para um String

def run(session, from_table, to_table, count):

  session.table(from_table).limit(count).write.save_as_table(to_table)

  return "SUCCESS"
Copy

Como ler arquivos

Você pode ler dinamicamente um arquivo de um estágio em seu manipulador Python usando a classe SnowflakeFile no módulo snowflake.snowpark.files do Snowpark.

O Snowflake oferece suporte à leitura de arquivos com SnowflakeFile para procedimentos armazenados e funções definidas pelo usuário. Para obter mais informações sobre a leitura de arquivos no código do manipulador, bem como mais exemplos, consulte Como ler um arquivo com um manipulador de UDF Python.

Este exemplo demonstra como criar e chamar um procedimento armazenado de direitos do proprietário que lê um arquivo usando a classe SnowflakeFile.

Crie o procedimento armazenado com um manipulador em linha, especificando o modo de entrada como binário passando rb para o argumento mode:

CREATE OR REPLACE PROCEDURE calc_phash(file_path string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','imagehash','pillow')
HANDLER = 'run'
AS
$$
from PIL import Image
import imagehash
from snowflake.snowpark.files import SnowflakeFile

def run(ignored_session, file_path):
    with SnowflakeFile.open(file_path, 'rb') as f:
        return imagehash.average_hash(Image.open(f))
$$;
Copy

Chame o procedimento armazenado:

CALL calc_phash(build_scoped_file_url(@my_files, 'my_image.jpg'));
Copy

Como usar pacotes de terceiros do Anaconda

Você pode especificar pacotes do Anaconda a serem instalados quando você cria procedimentos armazenados de Python. Para ver a lista de pacotes de terceiros do Anaconda, consulte o canal Anaconda do Snowflake. Esses pacotes de terceiros são construídos e fornecidos pelo Anaconda. Você pode usar o canal Snowflake do Conda para testes e desenvolvimento local sem custo sob os Termos Suplementares de Software Incorporado dos Termos de Serviço do Anaconda.

Para limitações, consulte Limitações.

Introdução

Antes de começar a usar os pacotes fornecidos pelo Anaconda dentro do Snowflake, você deve reconhecer os Termos de ofertas externas.

Nota

Você deve ser o administrador da organização (use a função ORGADMIN) para aceitar os termos. Você só precisa aceitar os termos uma única vez para sua conta Snowflake. Consulte Ativação da função ORGADMIN em uma conta.

  1. Entre em Snowsight.

  2. Selecione Admin » Billing & Terms.

  3. Na seção Anaconda, selecione Enable.

  4. Na caixa de diálogo Anaconda Packages, clique no link para rever a página Termos de ofertas externas.

  5. Se você concordar com os termos, selecione Acknowledge & Continue.

Se você vir um erro ao tentar aceitar os termos de serviço, seu perfil de usuário pode estar sem um nome, sobrenome ou endereço de e-mail. Se você tiver uma função de administrador, consulte Adição dos detalhes do usuário ao seu perfil de usuário para atualizar seu perfil usando Snowsight. Caso contrário, entre em contato com um administrador para atualizar sua conta.

Nota

Se você não reconhecer os Termos de Terceiros do Snowflake, como descrito acima, você ainda pode usar procedimentos armazenados, mas com estas limitações:

  • Você não pode usar pacotes de terceiros do Anaconda.

  • Você ainda pode especificar o Snowpark Python como um pacote em um procedimento armazenado, mas não pode especificar uma versão específica.

  • Você não pode usar o método to_pandas ao interagir com um objeto DataFrame.

Exibição e utilização de pacotes

Você pode exibir todos os pacotes disponíveis e suas informações de versão consultando a exibição PACKAGES no Information Schema:

select * from information_schema.packages where language = 'python';
Copy

Para obter mais informações, consulte Como usar pacotes de terceiros na documentação de UDF do Snowflake Python.

Criação do procedimento armazenado

Você pode criar um procedimento armazenado a partir de uma planilha Python, ou usando SQL.

Criação de um procedimento armazenado Python para automatizar sua planilha Python

Crie um procedimento armazenado Python a partir de sua planilha Python para automatizar seu código. Para obter mais detalhes sobre como escrever planilhas Python, consulte Como escrever o código Snowpark nas planilhas Python.

Pré-requisitos

Sua função deve ter privilégios OWNERSHIP ou CREATE PROCEDURE no esquema do banco de dados no qual você executa sua planilha Python para implantá-la como um procedimento armazenado.

Como implantar uma planilha Python como um procedimento armazenado

Para criar um procedimento armazenado Python para automatizar o código em sua planilha Python, faça o seguinte:

  1. Entre em Snowsight.

  2. Abra Projects » Worksheets.

  3. Abra a planilha Python que você deseja implantar como um procedimento armazenado.

  4. Selecione Deploy.

  5. Digite um nome do procedimento armazenado.

  6. (Opcional) Digite um comentário com detalhes sobre o procedimento armazenado.

  7. (Opcional) Selecione Replace if exists para substituir um procedimento armazenado existente com o mesmo nome.

  8. Para Handler, selecione a função do manipulador para seu procedimento armazenado. Por exemplo, main.

  9. Revise os argumentos usados pela função do manipulador e, se necessário, substitua o mapeamento de tipo de dados SQL para um argumento digitado. Para obter detalhes sobre como os tipos Python são mapeados para tipos SQL, consulte Mapeamentos de tipos de dados SQL-Python.

  10. (Opcional) Selecione Open in Worksheets para abrir a definição do procedimento armazenado em uma planilha SQL.

  11. Selecione Deploy para criar o procedimento armazenado:

  12. Depois que o procedimento armazenado for criado, você pode ir para os detalhes do procedimento ou selecionar Done.

Você pode criar múltiplos procedimentos armazenados a partir de uma planilha Python.

Após criar um procedimento armazenado, você pode automatizá-lo como parte de uma tarefa. Consulte Execução de instruções SQL em um cronograma utilizando tarefas.

Retorno de dados tabulares

Você pode escrever um procedimento que retorne os dados em forma de tabela. Para escrever um procedimento que retorne dados tabulares, faça o seguinte:

  • Especifique TABLE(...) como tipo de retorno do procedimento em sua instrução CREATE PROCEDURE.

    Como parâmetros TABLE, você pode especificar os nomes das colunas e tipos de dados retornados se você os conhecer. Se você não conhecer as colunas retornadas ao definir o procedimento – como quando elas são especificadas em tempo de execução – você pode deixar de fora os parâmetros TABLE. Quando você fizer isso, as colunas de valor de retorno do procedimento serão convertidas a partir das colunas no DataFrame retornado por seu manipulador. Os tipos de dados das colunas serão convertidos em SQL de acordo com o mapeamento especificado em Mapeamentos de tipos de dados SQL-Python.

  • Escreva o manipulador de modo que ele retorne o resultado tabular em um DataFrame do Snowpark.

    Para obter mais informações sobre dataframes, consulte Como trabalhar com DataFrames no Snowpark Python.

Exemplo

Os exemplos nesta seção ilustram o retorno de valores tabulares de um procedimento que filtra por linhas onde uma coluna corresponde a uma cadeia de caracteres.

Definição dos dados

O código no exemplo a seguir cria uma tabela de funcionários.

CREATE OR REPLACE TABLE employees(id NUMBER, name VARCHAR, role VARCHAR);
INSERT INTO employees (id, name, role) VALUES (1, 'Alice', 'op'), (2, 'Bob', 'dev'), (3, 'Cindy', 'dev');
Copy

Especificação de nomes e tipos de colunas de retorno

Este exemplo especifica os nomes e tipos de colunas na instrução RETURNS TABLE().

CREATE OR REPLACE PROCEDURE filterByRole(tableName VARCHAR, role VARCHAR)
RETURNS TABLE(id NUMBER, name VARCHAR, role VARCHAR)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'filter_by_role'
AS
$$
from snowflake.snowpark.functions import col

def filter_by_role(session, table_name, role):
   df = session.table(table_name)
   return df.filter(col("role") == role)
$$;
Copy

Omissão de nomes e tipos de colunas de retorno

O código no exemplo a seguir declara um procedimento que permite extrapolar nomes e tipos de colunas de valor de retorno a partir de colunas no valor de retorno do manipulador. Ele omite os nomes e tipos de coluna da instrução RETURNS TABLE().

CREATE OR REPLACE PROCEDURE filterByRole(tableName VARCHAR, role VARCHAR)
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'filter_by_role'
AS
$$
from snowflake.snowpark.functions import col

def filter_by_role(session, table_name, role):
  df = session.table(table_name)
  return df.filter(col("role") == role)
$$;
Copy

Como chamar o procedimento

O exemplo a seguir chama o procedimento armazenado:

CALL filterByRole('employees', 'dev');
Copy

A chamada do procedimento produz os seguintes resultados:

+----+-------+------+
| ID | NAME  | ROLE |
+----+-------+------+
| 2  | Bob   | dev  |
| 3  | Cindy | dev  |
+----+-------+------+

Chamada do seu procedimento armazenado

Depois de criar um procedimento armazenado, você pode chamá-lo do SQL ou como parte de uma tarefa programada.

Exemplos

Como executar tarefas simultâneas com processos de trabalho

Você pode executar tarefas simultâneas usando processos de trabalho do Python. Você pode achar isso útil quando precisar executar tarefas paralelas que aproveitam vários núcleos de CPU em nós do warehouse.

Nota

A Snowflake recomenda que você não use o módulo de multiprocessamento integrado do Python.

Para contornar os casos em que o Bloqueio de intérprete global do Python impede que uma abordagem multitarefa se espalhe por todos os núcleos da CPU, você pode executar tarefas simultâneas usando processos de trabalho separados, em vez de threads.

Você pode fazer isso nos warehouses Snowflake usando a classe joblib da biblioteca Parallel, como no exemplo a seguir.

CREATE OR REPLACE PROCEDURE joblib_multiprocessing_proc(i INT)
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.8
  HANDLER = 'joblib_multiprocessing'
  PACKAGES = ('snowflake-snowpark-python', 'joblib')
AS $$
import joblib
from math import sqrt

def joblib_multiprocessing(session, i):
  result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
  return str(result)
$$;
Copy

Nota

O back-end padrão usado para joblib.Parallel difere entre os warehouses padrão Snowflake e otimizados para Snowpark.

  • Padrão do warehouse: threading

  • Padrão do warehouse otimizado para Snowpark: loky (multiprocessamento)

Você pode substituir a configuração de back-end padrão chamando a função joblib.parallel_backend, como no exemplo a seguir.

import joblib
joblib.parallel_backend('loky')
Copy