Pythonでのストアドプロシージャの記述

このトピックでは、Pythonでストアドプロシージャを記述する方法について説明します。ストアドプロシージャ内でSnowparkライブラリを使用して、Snowflakeのテーブルに対してクエリ、更新、およびその他の作業を実行できます。

このトピックの内容:

紹介

Snowparkストアドプロシージャを使用すると、Snowflakeウェアハウスをコンピューティングフレームワークとして使用して、Snowflake内でデータパイプラインをビルドおよび実行できます。Python用Snowpark API を使用してストアドプロシージャを記述し、データパイプラインを構築します。これらのストアドプロシージャの実行をスケジュールするには、 タスク を使用します。

機械学習モデルとSnowpark Pythonの詳細については、 Snowpark Pythonを使用した機械学習モデルのトレーニング をご参照ください。

Pythonワークシートを使用 するか、 ローカル開発環境を使用 して、Python用Snowparkストアドプロシージャを作成できます。

ハンドラーコードの実行時にログをキャプチャし、データをトレースできます。詳細については、 ログおよびトレースの概要 をご参照ください。

注釈

匿名プロシージャの作成と呼び出しの両方を実行するには、 CALL (匿名プロシージャの場合) を使用します。匿名プロシージャの作成と呼び出しには、 CREATE PROCEDURE スキーマ権限を持つロールは必要ありません。

ストアドプロシージャをローカルに記述するための前提条件

ローカル開発環境でPythonストアドプロシージャを作成するには、次の前提条件を満たす必要があります。

  • バージョン0.4.0または最新バージョンのSnowparkライブラリを使用する必要があります。

  • Snowpark Pythonが必要なサードパーティの依存関係を読み込めるように、Anacondaパッケージを有効にします。 Anacondaのサードパーティパッケージの使用 をご参照ください。

  • サポートされているPythonのバージョンは次のとおりです。

    • 3.8

    • 3.9

    • 3.10

    • 3.11

次に、Snowparkライブラリを使用するように開発環境をセットアップします。 Snowparkの開発環境の設定 をご参照ください。

ストアドプロシージャのPythonコードの記述

プロシージャのロジックの場合は、プロシージャが呼び出されたときに実行されるハンドラーコードを記述します。このセクションでは、ハンドラーの設計について説明します。

ハンドラーコードからストアドプロシージャを作成するには、いくつかの方法があります。

制限事項

Snowparkストアドプロシージャには、次の制限があります。

  • プロセスの作成は、ストアドプロシージャではサポートされていません。

  • クエリの同時実行は、ストアドプロシージャではサポートされていません。

  • PUT および GET (Session.sql("PUT ...") および Session.sql("GET ...") を含む)コマンドを実行する APIs は使用できません。

  • session.file.get を使用してステージからファイルをダウンロードする場合、パターンマッチングはサポートされていません。

  • タスクからストアドプロシージャを実行する場合は、タスクの作成時にウェアハウスを指定する必要があります。Snowflakeが管理するコンピューティングリソースを使用してタスクを実行することはできません。

  • 名前付き仮オブジェクトの作成は、所有者権限のストアドプロシージャではサポートされていません。所有者権限ストアドプロシージャは、ストアドプロシージャ所有者の権限で実行されるストアドプロシージャです。詳細については、 呼び出し元の権限または所有者の権限 をご参照ください。

ストアドプロシージャの記述の計画

ストアドプロシージャはSnowflake内で実行されるため、それを念頭に置いて記述するコードを計画する必要があります。

  • 消費されるメモリの量を制限するSnowflakeは、必要なメモリ量に関してメソッドに制限を設けています。ガイダンスについては、 Snowflakeが課す制約内でのハンドラーの設計 をご参照ください。

  • ハンドラーメソッドまたは関数がスレッドセーフであることを確認してください。

  • ルールとセキュリティ制限に従ってください。 UDFs とプロシージャのセキュリティプラクティス をご参照ください。

  • ストアドプロシージャを 呼び出し元の権限または所有者の権限 で実行するかどうかを決定します。

  • ストアドプロシージャの実行に使用されるsnowflake-snowpark-pythonバージョンを検討してください。ストアドプロシージャのリリースプロセスには制限があるため、Pythonストアドプロシージャ環境で利用可能なsnowflake-snowpark-pythonライブラリは、通常、公開されているバージョンより1つ遅れています。次の SQL を使用して、利用可能な最新バージョンを見つけます。

    select * from information_schema.packages where package_name = 'snowflake-snowpark-python' order by version desc;
    
    Copy

メソッドまたは関数の記述

ストアドプロシージャのメソッドまたは関数を記述するときは、次の点に注意してください。

  • メソッドまたは関数の最初の引数としてSnowpark Session オブジェクトを指定します。ストアドプロシージャを呼び出すと、Snowflakeは自動的に Session オブジェクトを作成し、ストアドプロシージャに渡します。(Session オブジェクトを自分で作成することはできません。)

  • 残りの引数と戻り値には、 Snowflakeデータ型 に対応するPython型を使用します。Snowflakeは、 パラメーターと戻り値のSQL-Pythonデータ型マッピング にリストされているPythonデータ型をサポートしています

エラーの処理

通常のPython例外処理手法を使用して、プロシージャのエラーをキャッチできます。

メソッド内でキャッチされない例外が発生した場合、Snowflakeは例外のスタックトレースを含むエラーを発生させます。 未処理の例外のログ を有効にすると、Snowflakeは未処理の例外に関するデータをイベントテーブルに記録します。

コードで依存関係を利用できるようにする方法

ハンドラーコードが、ハンドラー自体の外部で定義されたコード(モジュールで定義されたコードなど)またはリソースファイルに依存している場合は、それらの依存関係をステージにアップロードすると、コードでそれらの依存関係を利用できるようになります。 コードで依存関係を利用できるようにする方法 を参照するか、Pythonワークシートの場合は ステージからワークシートにPythonファイルを追加する をご参照ください。

SQL を使用してストアドプロシージャを作成する場合は、 CREATE PROCEDURE ステートメント を記述するときに IMPORTS 句を使用して、依存ファイルを指定します。

ストアドプロシージャからSnowflakeへのデータのアクセス

Snowflakeのデータにアクセスするには、Snowparkライブラリ APIs を使用します。

Pythonストアドプロシージャへの呼び出しを処理する場合、Snowflakeは、Snowpark Session オブジェクトを作成し、そのオブジェクトをストアドプロシージャのメソッドまたは関数に渡します。

他の言語におけるストアドプロシージャの場合と同様に、セッションのコンテキスト(例: 権限、現在のデータベースおよびスキーマ)は、ストアドプロシージャが呼び出し元の権限で実行されるか、所有者の権限で実行されるかによって決まります。詳細については、 Accessing and Setting the Session State をご参照ください。

この Session オブジェクトを使用して、 Snowparkライブラリ の APIs を呼び出すことができます。たとえば、 テーブルの DataFrame を作成 したり、 SQL ステートメントを実行したりできます。

詳細については、 Snowpark開発者ガイド をご参照ください。

データアクセスの例

以下は、指定された数の行を1つのテーブルから別のテーブルにコピーするPythonメソッドの例です。このメソッドは次の引数を取ります。

  • Snowpark Session オブジェクト

  • 行をコピーするテーブルの名前

  • 行を保存するテーブルの名前

  • コピーする行の数。

この例のメソッドは文字列を返します。この例を Pythonワークシート で実行する場合は、 ワークシートの戻り値の型を変更 して String にします。

def run(session, from_table, to_table, count):

  session.table(from_table).limit(count).write.save_as_table(to_table)

  return "SUCCESS"
Copy

ファイルの読み取り

Snowpark snowflake.snowpark.files モジュールの SnowflakeFile クラスを使用すると、Pythonハンドラーでステージからファイルを動的に読み取ることができます。

Snowflakeは、ストアドプロシージャとユーザー定義関数の両方で、 SnowflakeFile を使用したファイルの読み取りをサポートしています。ハンドラーコードでのファイルの読み取りに関する詳細とその他の例については、 Python UDF ハンドラーを使用したファイルの読み取り をご参照ください。

この例では、 SnowflakeFile クラスを使用して、ファイルを読み取る所有権ストアドプロシージャを作成し、呼び出す方法を示しています。

ストアドプロシージャをインラインハンドラーで作成し、 mode の引数に rb を渡して入力モードをバイナリに指定します。

CREATE OR REPLACE PROCEDURE calc_phash(file_path string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','imagehash','pillow')
HANDLER = 'run'
AS
$$
from PIL import Image
import imagehash
from snowflake.snowpark.files import SnowflakeFile

def run(ignored_session, file_path):
    with SnowflakeFile.open(file_path, 'rb') as f:
        return imagehash.average_hash(Image.open(f))
$$;
Copy

ストアドプロシージャを呼び出します。

CALL calc_phash(build_scoped_file_url(@my_files, 'my_image.jpg'));
Copy

Anacondaのサードパーティパッケージの使用

Pythonストアドプロシージャを作成するときにインストールするAnacondaパッケージを指定できます。Anacondaのサードパーティパッケージのリストを表示するには、 Anaconda Snowflakeチャネル をご参照ください。これらのサードパーティパッケージは、Anacondaによって構築および提供されています。Snowflake condaチャネルは、Anacondaの利用規約の補足組み込みソフトウェア条項に基づいて、無料でローカルテストおよび開発に使用できます。

制限については、 制限事項 をご参照ください。

はじめるにあたり

Snowflake内でAnacondaが提供するパッケージの使用を開始する前に、 Snowflakeサードパーティ規約 に同意する必要があります。

注釈

規約に同意するには、組織管理者(ORGADMIN ロールを使用)である必要があります。Snowflake アカウントで規約に同意する必要があるのは一度だけです。 アカウントの ORGADMIN ロールの有効化 をご参照ください。

  1. Snowsight にサインインします。

  2. Admin » Billing & Terms を選択します。

  3. Anaconda セクションで、 Enable を選択します。

  4. Anaconda Packages ダイアログで、リンクをクリックして Snowflakeサードパーティ規約ページ を確認します。

  5. 規約に同意する場合は、 Acknowledge & Continue を選択します。

利用規約に同意しようとしたときにエラーが表示された場合は、ユーザープロファイルに名、姓、または電子メールアドレスが含まれていない可能性があります。管理者ロールを持っている場合は、 ユーザープロファイルにユーザー詳細を追加する を参照して、 Snowsight を使用してプロファイルを更新してください。または、アカウント管理者に連絡して アカウントを更新してください

注釈

上記のSnowflakeサードパーティ規約に同意しない場合でも、ストアドプロシージャを使用できますが、次の制限があります。

  • Anacondaのサードパーティパッケージを使用することはできません。

  • ストアドプロシージャのパッケージとしてSnowpark Pythonを指定することはできますが、特定のバージョンを指定することはできません。

  • DataFrameオブジェクトを操作する場合、 to_pandas メソッドを使用することはできません。

パッケージの表示と使用

Information Schemaの PACKAGES ビューをクエリすると、使用可能なすべてのパッケージとそのバージョン情報を表示できます。

select * from information_schema.packages where language = 'python';
Copy

詳細については、Snowflake Python UDFドキュメントの サードパーティパッケージの使用 をご参照ください。

ストアドプロシージャの作成

Pythonワークシートから、または SQL を使用して、ストアドプロシージャを作成できます。

Pythonワークシートコードを自動化するPythonストアド プロシージャの作成

PythonワークシートからPythonストアドプロシージャを作成して、コードを自動化します。Pythonワークシートの作成の詳細については、 PythonワークシートでSnowparkコードを記述する をご参照ください。

前提条件

ロールには、Pythonワークシートを実行してストアドプロシージャとして展開するデータベーススキーマに対する OWNERSHIP または CREATE PROCEDURE 権限が必要です。

Pythonワークシートをストアドプロシージャとしてデプロイする

Pythonワークシートのコードを自動化するPythonストアドプロシージャを作成するには、次の手順を実行します。

  1. Snowsight にサインインします。

  2. Worksheets を開きます。

  3. ストアドプロシージャとしてデプロイするPythonワークシートを開きます。

  4. Deploy を選択します。

  5. ストアドプロシージャの名前を入力します。

  6. (オプション)ストアドプロシージャに関する詳細を含むコメントを入力します。

  7. (オプション) Replace if exists を選択して、既存のストアドプロシージャを同じ名前に置き換えます。

  8. Handler の場合は、ストアドプロシージャのハンドラー関数を選択します。たとえば、 main です。

  9. ハンドラー関数で使用される引数を確認し、必要であれば型付き引数の SQL データ型マッピングを上書きします。Pythonの型が SQL の型にマップされる方法の詳細については、 SQL-Pythonデータ型マッピング をご参照ください。

  10. (オプション) Open in Worksheets を選択して、ストアドプロシージャの定義を SQL ワークシートで開きます。

  11. Deploy を選択肢て、ストアドプロシージャを作成します。

  12. ストアドプロシージャが作成されたら、プロシージャの詳細に移動するか、 Done を選択できます。

1つのPythonワークシートから複数のストアドプロシージャを作成できます。

ストアドプロシージャを作成したら、それをタスクの一部として自動化できます。 タスクを使用したスケジュールでの SQL ステートメントの実行 をご参照ください。

表形式のデータを返す

表形式フォームでデータを返すプロシージャを書くことができます。表形式データを返すプロシージャを作成するには、次の手順を実行します。

  • CREATE PROCEDURE ステートメントで、プロシージャの戻り値の型として TABLE(...) を指定します。

    TABLE パラメータとして、返されたデータの列名と がわかっている場合はそれらを指定できます。実行時に指定された場合など、プロシージャを定義するときに返される列がわからない場合は、TABLE パラメータを省略できます。これを実行すると、プロシージャの戻り値の列は、そのハンドラーによって返された DataFrame の列から変換されます。列のデータ型は、 SQL-Pythonデータ型マッピング で指定されたマッピングに従って SQL に変換されます。

  • Snowpark DataFrame で表形式の結果を返すようにハンドラーを記述します。

    データフレームの詳細については、 Snowpark PythonでのDataFramesの使用 をご参照ください。

このセクションの例は、列が文字列と一致する行をフィルターするプロシージャーから表形式の値を返すことを示しています。

データの定義

次の例のコードは、従業員のテーブルを作成します。

CREATE OR REPLACE TABLE employees(id NUMBER, name VARCHAR, role VARCHAR);
INSERT INTO employees (id, name, role) VALUES (1, 'Alice', 'op'), (2, 'Bob', 'dev'), (3, 'Cindy', 'dev');
Copy

戻り列の名前と型の指定

この例では、 RETURNS TABLE() ステートメントで列の名前と型を指定しています。

CREATE OR REPLACE PROCEDURE filterByRole(tableName VARCHAR, role VARCHAR)
RETURNS TABLE(id NUMBER, name VARCHAR, role VARCHAR)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'filter_by_role'
AS
$$
from snowflake.snowpark.functions import col

def filter_by_role(session, table_name, role):
   df = session.table(table_name)
   return df.filter(col("role") == role)
$$;
Copy

戻り列の名前と型の省略

次の例のコードは、戻り値の列名と型をハンドラーの戻り値の列から挿入できるようにするプロシージャを宣言します。 RETURNS TABLE() ステートメントから列名と型が省略されています。

CREATE OR REPLACE PROCEDURE filterByRole(tableName VARCHAR, role VARCHAR)
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'filter_by_role'
AS
$$
from snowflake.snowpark.functions import col

def filter_by_role(session, table_name, role):
  df = session.table(table_name)
  return df.filter(col("role") == role)
$$;
Copy

プロシージャの呼び出し

次の例では、ストアドプロシージャを呼び出します。

CALL filterByRole('employees', 'dev');
Copy

プロシージャの呼び出しにより、次の出力が生成されます。

+----+-------+------+
| ID | NAME  | ROLE |
+----+-------+------+
| 2  | Bob   | dev  |
| 3  | Cindy | dev  |
+----+-------+------+

ストアドプロシージャの呼び出し

ストアドプロシージャを作成したら、SQL から、またはスケジュールされたタスクの一部として呼び出すことができます。

ワーカープロセスによる並行タスクの実行

Pythonワーカープロセスを使用して、並行タスクを実行することができます。これは、ウェアハウスノードの複数の CPU コアを活用した並列タスクを実行する必要がある場合に役立つ可能性があります。

注釈

Snowflakeは、組み込みのPythonのマルチプロセスモジュールは使用しないことを推奨しています。

Pythonグローバルインタープリターロック によって、マルチタスクのアプローチが CPU のすべてのコアに渡ってスケーリングできない場合に対処するために、スレッドではなく、別々のワーカープロセスを使って並行タスクを実行することができます。

Snowflakeウェアハウスでは、次の例のように、 joblib ライブラリの Parallel クラスを使用してこれを実行できます。

CREATE OR REPLACE PROCEDURE joblib_multiprocessing_proc(i INT)
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.8
  HANDLER = 'joblib_multiprocessing'
  PACKAGES = ('snowflake-snowpark-python', 'joblib')
AS $$
import joblib
from math import sqrt

def joblib_multiprocessing(session, i):
  result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
  return str(result)
$$;
Copy

注釈

joblib.Parallel に使用されるデフォルトのバックエンドは、Snowflake標準のウェアハウスとSnowparkに最適されたウェアハウスで異なります。

  • 標準ウェアハウスのデフォルト: threading

  • Snowparkに最適化されたウェアハウスのデフォルト: loky (マルチプロセス)

次の例のように、 joblib.parallel_backend 関数を呼び出すと、デフォルトのバックエンド設定を上書きできます。

import joblib
joblib.parallel_backend('loky')
Copy