Pythonでのストアドプロシージャの記述¶
このトピックでは、Pythonでストアドプロシージャを記述する方法について説明します。ストアドプロシージャ内でSnowparkライブラリを使用して、Snowflakeのテーブルに対してクエリ、更新、およびその他の作業を実行できます。
このトピックの内容:
紹介¶
Snowparkストアドプロシージャを使用すると、Snowflakeウェアハウスをコンピューティングフレームワークとして使用して、Snowflake内でデータパイプラインをビルドおよび実行できます。Python用Snowpark API を使用してストアドプロシージャを記述し、データパイプラインを構築します。これらのストアドプロシージャの実行をスケジュールするには、 タスク を使用します。
機械学習モデルとSnowpark Pythonの詳細については、 Snowpark Pythonを使用した機械学習モデルのトレーニング をご参照ください。
Pythonワークシートを使用 するか、 ローカル開発環境を使用 して、Python用Snowparkストアドプロシージャを作成できます。
ハンドラーコードの実行時にログをキャプチャし、データをトレースできます。詳細については、 ログおよびトレースの概要 をご参照ください。
注釈
匿名プロシージャの作成と呼び出しの両方を実行するには、 CALL (匿名プロシージャの場合) を使用します。匿名プロシージャの作成と呼び出しには、 CREATE PROCEDURE スキーマ権限を持つロールは必要ありません。
ストアドプロシージャをローカルに記述するための前提条件¶
ローカル開発環境でPythonストアドプロシージャを作成するには、次の前提条件を満たす必要があります。
バージョン0.4.0または最新バージョンのSnowparkライブラリを使用する必要があります。
Snowpark Pythonが必要なサードパーティの依存関係を読み込めるように、Anacondaパッケージを有効にします。 Anacondaのサードパーティパッケージの使用 をご参照ください。
サポートされているPythonのバージョンは次のとおりです。
3.8
3.9
3.10
3.11
次に、Snowparkライブラリを使用するように開発環境をセットアップします。 Snowparkの開発環境の設定 をご参照ください。
ストアドプロシージャのPythonコードの記述¶
プロシージャのロジックの場合は、プロシージャが呼び出されたときに実行されるハンドラーコードを記述します。このセクションでは、ハンドラーの設計について説明します。
ハンドラーコードからストアドプロシージャを作成するには、いくつかの方法があります。
プロシージャを作成する SQL ステートメントにコードをインラインで含めます。 ハンドラーコードのインラインまたはステージ上での保持 をご参照ください。
コードをステージにコピーして、プロシージャの作成時に参照することができます。 ハンドラーコードのインラインまたはステージ上での保持 をご参照ください。
コードをPythonワークシートに記述し、ワークシートの内容をストアドプロシージャにデプロイします。 Pythonワークシートコードを自動化するPythonストアド プロシージャの作成 をご参照ください。
制限事項¶
Snowparkストアドプロシージャには、次の制限があります。
プロセスの作成は、ストアドプロシージャではサポートされていません。
クエリの同時実行は、ストアドプロシージャではサポートされていません。
PUT および GET (
Session.sql("PUT ...")
およびSession.sql("GET ...")
を含む)コマンドを実行する APIs は使用できません。session.file.get
を使用してステージからファイルをダウンロードする場合、パターンマッチングはサポートされていません。タスクからストアドプロシージャを実行する場合は、タスクの作成時にウェアハウスを指定する必要があります。サーバーレスコンピューティングリソースを使用してタスクを実行することはできません。
名前付き仮オブジェクトの作成は、所有者権限のストアドプロシージャではサポートされていません。所有者権限ストアドプロシージャは、ストアドプロシージャ所有者の権限で実行されるストアドプロシージャです。詳細については、 呼び出し元の権限または所有者の権限 をご参照ください。
ストアドプロシージャの記述の計画¶
ストアドプロシージャはSnowflake内で実行されるため、それを念頭に置いて記述するコードを計画する必要があります。
消費されるメモリの量を制限するSnowflakeは、必要なメモリ量に関してメソッドに制限を設けています。ガイダンスについては、 Snowflakeが課す制約内でのハンドラーの設計 をご参照ください。
ハンドラーメソッドまたは関数がスレッドセーフであることを確認してください。
ルールとセキュリティ制限に従ってください。 UDFs とプロシージャのセキュリティプラクティス をご参照ください。
ストアドプロシージャを 呼び出し元の権限または所有者の権限 で実行するかどうかを決定します。
ストアドプロシージャの実行に使用されるsnowflake-snowpark-pythonバージョンを検討してください。ストアドプロシージャのリリースプロセスには制限があるため、Pythonストアドプロシージャ環境で利用可能なsnowflake-snowpark-pythonライブラリは、通常、公開されているバージョンより1つ遅れています。次の SQL を使用して、利用可能な最新バージョンを見つけます。
select * from information_schema.packages where package_name = 'snowflake-snowpark-python' order by version desc;
メソッドまたは関数の記述¶
ストアドプロシージャのメソッドまたは関数を記述するときは、次の点に注意してください。
メソッドまたは関数の最初の引数としてSnowpark
Session
オブジェクトを指定します。ストアドプロシージャを呼び出すと、Snowflakeは自動的にSession
オブジェクトを作成し、ストアドプロシージャに渡します。(Session
オブジェクトを自分で作成することはできません。)残りの引数と戻り値には、 Snowflakeデータ型 に対応するPython型を使用します。Snowflakeは、 パラメーターと戻り値のSQL-Pythonデータ型マッピング にリストされているPythonデータ型をサポートしています
エラーの処理¶
通常のPython例外処理手法を使用して、プロシージャのエラーをキャッチできます。
メソッド内でキャッチされない例外が発生した場合、Snowflakeは例外のスタックトレースを含むエラーを発生させます。 未処理の例外のログ を有効にすると、Snowflakeは未処理の例外に関するデータをイベントテーブルに記録します。
コードで依存関係を利用できるようにする方法¶
ハンドラーコードが、ハンドラー自体の外部で定義されたコード(モジュールで定義されたコードなど)またはリソースファイルに依存している場合は、それらの依存関係をステージにアップロードすると、コードでそれらの依存関係を利用できるようになります。 コードで依存関係を利用できるようにする方法 を参照するか、Pythonワークシートの場合は ステージからワークシートにPythonファイルを追加する をご参照ください。
SQL を使用してストアドプロシージャを作成する場合は、 CREATE PROCEDURE ステートメント を記述するときに IMPORTS 句を使用して、依存ファイルを指定します。
ストアドプロシージャからSnowflakeへのデータのアクセス¶
Snowflakeのデータにアクセスするには、Snowparkライブラリ APIs を使用します。
Pythonストアドプロシージャへの呼び出しを処理する場合、Snowflakeは、Snowpark Session
オブジェクトを作成し、そのオブジェクトをストアドプロシージャのメソッドまたは関数に渡します。
他の言語におけるストアドプロシージャの場合と同様に、セッションのコンテキスト(例: 権限、現在のデータベースおよびスキーマ)は、ストアドプロシージャが呼び出し元の権限で実行されるか、所有者の権限で実行されるかによって決まります。詳細については、 Accessing and Setting the Session State をご参照ください。
この Session
オブジェクトを使用して、 Snowparkライブラリ の APIs を呼び出すことができます。たとえば、 テーブルの DataFrame を作成 したり、 SQL ステートメントを実行したりできます。
詳細については、 Snowpark開発者ガイド をご参照ください。
データアクセスの例¶
以下は、指定された数の行を1つのテーブルから別のテーブルにコピーするPythonメソッドの例です。このメソッドは次の引数を取ります。
Snowpark
Session
オブジェクト行をコピーするテーブルの名前
行を保存するテーブルの名前
コピーする行の数。
この例のメソッドは文字列を返します。この例を Pythonワークシート で実行する場合は、 ワークシートの戻り値の型を変更 して String にします。
def run(session, from_table, to_table, count):
session.table(from_table).limit(count).write.save_as_table(to_table)
return "SUCCESS"
ファイルの読み取り¶
Snowpark snowflake.snowpark.files
モジュールの SnowflakeFile
クラスを使用すると、Pythonハンドラーでステージからファイルを動的に読み取ることができます。
Snowflakeは、ストアドプロシージャとユーザー定義関数の両方で、 SnowflakeFile
を使用したファイルの読み取りをサポートしています。ハンドラーコードでのファイルの読み取りに関する詳細とその他の例については、 Python UDF ハンドラーを使用したファイルの読み取り をご参照ください。
この例では、 SnowflakeFile
クラスを使用して、ファイルを読み取る所有権ストアドプロシージャを作成し、呼び出す方法を示しています。
ストアドプロシージャをインラインハンドラーで作成し、 mode
の引数に rb
を渡して入力モードをバイナリに指定します。
CREATE OR REPLACE PROCEDURE calc_phash(file_path string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','imagehash','pillow')
HANDLER = 'run'
AS
$$
from PIL import Image
import imagehash
from snowflake.snowpark.files import SnowflakeFile
def run(ignored_session, file_path):
with SnowflakeFile.open(file_path, 'rb') as f:
return imagehash.average_hash(Image.open(f))
$$;
ストアドプロシージャを呼び出します。
CALL calc_phash(build_scoped_file_url(@my_files, 'my_image.jpg'));
Anacondaのサードパーティパッケージの使用¶
Pythonストアドプロシージャを作成するときにインストールするAnacondaパッケージを指定できます。Anacondaのサードパーティパッケージのリストを表示するには、 Anaconda Snowflakeチャネル をご参照ください。これらのサードパーティパッケージは、Anacondaによって構築および提供されています。Snowflake condaチャネルは、Anacondaの利用規約の補足組み込みソフトウェア条項に基づいて、無料でローカルテストおよび開発に使用できます。
制限については、 制限事項 をご参照ください。
はじめるにあたり¶
Snowflake内でAnacondaが提供するパッケージの使用を開始する前に、 外部製品規約 に同意する必要があります。
注釈
規約に同意するには、組織管理者(ORGADMIN ロールを使用)である必要があります。Snowflake アカウントで規約に同意する必要があるのは一度だけです。 アカウントの ORGADMIN ロールの有効化 をご参照ください。
Snowsight にサインインします。
Admin » Billing & Terms を選択します。
Anaconda セクションで、 Enable を選択します。
Anaconda Packages ダイアログで、リンクをクリックして 外部製品規約ページ を確認します。
規約に同意する場合は、 Acknowledge & Continue を選択します。
利用規約に同意しようとしたときにエラーが表示された場合は、ユーザープロファイルに名、姓、または電子メールアドレスが含まれていない可能性があります。管理者ロールを持っている場合は、 ユーザープロファイルにユーザー詳細を追加する を参照して、 Snowsight を使用してプロファイルを更新してください。または、アカウント管理者に連絡して アカウントを更新してください。
注釈
上記のSnowflakeサードパーティ規約に同意しない場合でも、ストアドプロシージャを使用できますが、次の制限があります。
Anacondaのサードパーティパッケージを使用することはできません。
ストアドプロシージャのパッケージとしてSnowpark Pythonを指定することはできますが、特定のバージョンを指定することはできません。
DataFrameオブジェクトを操作する場合、
to_pandas
メソッドを使用することはできません。
パッケージの表示と使用¶
Information Schemaの PACKAGES ビューをクエリすると、使用可能なすべてのパッケージとそのバージョン情報を表示できます。
select * from information_schema.packages where language = 'python';
詳細については、Snowflake Python UDFドキュメントの サードパーティパッケージの使用 をご参照ください。
ストアドプロシージャの作成¶
Pythonワークシートから、または SQL を使用して、ストアドプロシージャを作成できます。
SQL を使用してストアドプロシージャを作成するには、 ストアドプロシージャの作成 をご参照ください。
Pythonワークシートからストアドプロシージャを作成するには、 Pythonワークシートコードを自動化するPythonストアド プロシージャの作成 をご参照ください。
Pythonワークシートコードを自動化するPythonストアド プロシージャの作成¶
PythonワークシートからPythonストアドプロシージャを作成して、コードを自動化します。Pythonワークシートの作成の詳細については、 PythonワークシートでSnowparkコードを記述する をご参照ください。
前提条件¶
ロールには、Pythonワークシートを実行してストアドプロシージャとして展開するデータベーススキーマに対する OWNERSHIP または CREATE PROCEDURE 権限が必要です。
Pythonワークシートをストアドプロシージャとしてデプロイする¶
Pythonワークシートのコードを自動化するPythonストアドプロシージャを作成するには、次の手順を実行します。
Snowsight にサインインします。
Projects » Worksheets を開きます。
ストアドプロシージャとしてデプロイするPythonワークシートを開きます。
Deploy を選択します。
ストアドプロシージャの名前を入力します。
(オプション)ストアドプロシージャに関する詳細を含むコメントを入力します。
(オプション) Replace if exists を選択して、既存のストアドプロシージャを同じ名前に置き換えます。
Handler の場合は、ストアドプロシージャのハンドラー関数を選択します。たとえば、
main
です。ハンドラー関数で使用される引数を確認し、必要であれば型付き引数の SQL データ型マッピングを上書きします。Pythonの型が SQL の型にマップされる方法の詳細については、 SQL-Pythonデータ型マッピング をご参照ください。
(オプション) Open in Worksheets を選択して、ストアドプロシージャの定義を SQL ワークシートで開きます。
Deploy を選択肢て、ストアドプロシージャを作成します。
ストアドプロシージャが作成されたら、プロシージャの詳細に移動するか、 Done を選択できます。
1つのPythonワークシートから複数のストアドプロシージャを作成できます。
ストアドプロシージャを作成したら、それをタスクの一部として自動化できます。 タスクを使用したスケジュールでの SQL ステートメントの実行 をご参照ください。
表形式のデータを返す¶
表形式フォームでデータを返すプロシージャを書くことができます。表形式データを返すプロシージャを作成するには、次の手順を実行します。
CREATE PROCEDURE ステートメントで、プロシージャの戻り値の型として
TABLE(...)
を指定します。TABLE パラメータとして、返されたデータの列名と 型 がわかっている場合はそれらを指定できます。実行時に指定された場合など、プロシージャを定義するときに返される列がわからない場合は、TABLE パラメータを省略できます。これを実行すると、プロシージャの戻り値の列は、そのハンドラーによって返された DataFrame の列から変換されます。列のデータ型は、 SQL-Pythonデータ型マッピング で指定されたマッピングに従って SQL に変換されます。
Snowpark DataFrame で表形式の結果を返すようにハンドラーを記述します。
データフレームの詳細については、 Snowpark PythonでのDataFramesの使用 をご参照ください。
例¶
このセクションの例は、列が文字列と一致する行をフィルターするプロシージャーから表形式の値を返すことを示しています。
データの定義¶
次の例のコードは、従業員のテーブルを作成します。
CREATE OR REPLACE TABLE employees(id NUMBER, name VARCHAR, role VARCHAR);
INSERT INTO employees (id, name, role) VALUES (1, 'Alice', 'op'), (2, 'Bob', 'dev'), (3, 'Cindy', 'dev');
戻り列の名前と型の指定¶
この例では、 RETURNS TABLE()
ステートメントで列の名前と型を指定しています。
CREATE OR REPLACE PROCEDURE filterByRole(tableName VARCHAR, role VARCHAR)
RETURNS TABLE(id NUMBER, name VARCHAR, role VARCHAR)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'filter_by_role'
AS
$$
from snowflake.snowpark.functions import col
def filter_by_role(session, table_name, role):
df = session.table(table_name)
return df.filter(col("role") == role)
$$;
戻り列の名前と型の省略¶
次の例のコードは、戻り値の列名と型をハンドラーの戻り値の列から挿入できるようにするプロシージャを宣言します。 RETURNS TABLE()
ステートメントから列名と型が省略されています。
CREATE OR REPLACE PROCEDURE filterByRole(tableName VARCHAR, role VARCHAR)
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'filter_by_role'
AS
$$
from snowflake.snowpark.functions import col
def filter_by_role(session, table_name, role):
df = session.table(table_name)
return df.filter(col("role") == role)
$$;
プロシージャの呼び出し¶
次の例では、ストアドプロシージャを呼び出します。
CALL filterByRole('employees', 'dev');
プロシージャの呼び出しにより、次の出力が生成されます。
+----+-------+------+
| ID | NAME | ROLE |
+----+-------+------+
| 2 | Bob | dev |
| 3 | Cindy | dev |
+----+-------+------+
ストアドプロシージャの呼び出し¶
ストアドプロシージャを作成したら、SQL から、またはスケジュールされたタスクの一部として呼び出すことができます。
SQL からストアドプロシージャを呼び出す方法については、 ストアドプロシージャの呼び出し をご参照ください。
スケジュールされたタスクの一部としてストアドプロシージャを呼び出す方法については、 タスクを使用したスケジュールでの SQL ステートメントの実行 をご参照ください。
例¶
ワーカープロセスによる並行タスクの実行¶
Pythonワーカープロセスを使用して、並行タスクを実行することができます。これは、ウェアハウスノードの複数の CPU コアを活用した並列タスクを実行する必要がある場合に役立つ可能性があります。
注釈
Snowflakeは、組み込みのPythonのマルチプロセスモジュールは使用しないことを推奨しています。
Pythonグローバルインタープリターロック によって、マルチタスクのアプローチが CPU のすべてのコアに渡ってスケーリングできない場合に対処するために、スレッドではなく、別々のワーカープロセスを使って並行タスクを実行することができます。
Snowflakeウェアハウスでは、次の例のように、 joblib
ライブラリの Parallel
クラスを使用してこれを実行できます。
CREATE OR REPLACE PROCEDURE joblib_multiprocessing_proc(i INT)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'joblib_multiprocessing'
PACKAGES = ('snowflake-snowpark-python', 'joblib')
AS $$
import joblib
from math import sqrt
def joblib_multiprocessing(session, i):
result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
return str(result)
$$;
注釈
joblib.Parallel
に使用されるデフォルトのバックエンドは、Snowflake標準のウェアハウスとSnowparkに最適されたウェアハウスで異なります。
標準ウェアハウスのデフォルト:
threading
Snowparkに最適化されたウェアハウスのデフォルト:
loky
(マルチプロセス)
次の例のように、 joblib.parallel_backend
関数を呼び出すと、デフォルトのバックエンド設定を上書きできます。
import joblib
joblib.parallel_backend('loky')