ストアドプロシージャの Python ハンドラーの例¶
ワーカープロセスによる並行タスクの実行¶
Pythonワーカープロセスを使用して、並行タスクを実行することができます。これは、ウェアハウスノードの複数の CPU コアを活用した並列タスクを実行する必要がある場合に役立つ可能性があります。
注釈
Snowflakeは、組み込みのPythonのマルチプロセスモジュールは使用しないことを推奨しています。
Pythonグローバルインタープリターロック によって、マルチタスクのアプローチが CPU のすべてのコアに渡ってスケーリングできない場合に対処するために、スレッドではなく、別々のワーカープロセスを使って並行タスクを実行することができます。
Snowflakeウェアハウスでは、次の例のように、 joblib ライブラリの Parallel クラスを使用してこれを実行できます。
注釈
joblib.Parallel に使用されるデフォルトのバックエンドは、Snowflake標準のウェアハウスとSnowparkに最適されたウェアハウスで異なります。
標準ウェアハウスのデフォルト:
threadingSnowparkに最適化されたウェアハウスのデフォルト:
loky(マルチプロセス)
次の例のように、 joblib.parallel_backend 関数を呼び出すと、デフォルトのバックエンド設定を上書きできます。
Snowpark APIs を使った非同期処理¶
以下の例では、Snowpark APIs を使用して非同期の子ジョブを開始する方法と、さまざまな条件下でのジョブの動作について説明します。
非同期子ジョブのステータスチェック¶
以下の例では、 checkStatus プロシージャが60秒待機する非同期子ジョブを実行します。このプロシージャは、ジョブが終了する前にそのステータスをチェックするため、 False を返します。
次のコードは、このプロシージャを呼び出します。
非同期の子ジョブのキャンセル¶
以下の例では、 cancelJob プロシージャは、SQL を使用して test_tb テーブルにデータを挿入し、終了までに10秒かかる非同期子ジョブを実行します。そして、子ジョブが終了してデータが挿入される前に、子ジョブをキャンセルします。
次のコードは test_tb テーブルにクエリしますが、データが挿入されていないため結果は返されません。
非同期子ジョブの実行中の待機とブロック¶
次の例では、 blockUntilDone プロシージャが終了までに5秒かかる非同期の子ジョブを実行します。snowflake.snowpark.AsyncJob.result メソッドを使用すると、プロシージャは待機し、ジョブが終了すると戻ります。
次のコードでは、 blockUntilDone プロシージャを呼び出し、5秒待って返します。
未完了の非同期子ジョブの結果をリクエストした後にエラーを返す¶
以下の例では、 earlyReturn プロシージャが、終了までに60秒かかる非同期子ジョブを実行します。このプロシージャは、ジョブが終了する前に、ジョブの結果から DataFrame を返そうとします。結果はエラーです。
次のコードは、 earlyReturn プロシージャを呼び出し、エラーを返します。
子ジョブが終了する前に親ジョブが終了すると、子ジョブがキャンセルされる¶
以下の例では、 earlyCancelJob プロシージャがテーブルにデータを挿入する非同期子ジョブを実行し、終了までに10秒かかります。しかし、親ジョブ --- async_handler --- は子ジョブが終了する前に戻り、子ジョブはキャンセルされます。
次のコードは earlyCancelJob プロシージャを呼び出します。その後、 test_tb テーブルをクエリしますが、キャンセルされた子ジョブによってデータが挿入されていないため、結果は返されません。
ファイルおよびアセットの読み取り¶
IMPORTS を使用した静的に指定されたファイルの読み取り¶
IMPORTS コマンドの CREATE PROCEDURE 句にファイル名とステージ名を指定することで、ファイルを読み込めます。
IMPORTS句でファイルを指定すると、Snowflakeはそのファイルをステージからストアドプロシージャの*ホームディレクトリ*(別称*インポートディレクトリ*)にコピーします。ストアドプロシージャは、ホームディレクトリからファイルを読み取ります。
Snowflakeは、インポートしたファイルを単一のディレクトリにコピーします。そのディレクトリにあるすべてのファイルは一意な名前でなければならないため、 IMPORTS 句の各ファイルも個別の名前にする必要があります。これは、ステージングされたファイルが異なるステージや、ステージ内の異なるサブディレクトリで開始された場合でも同様です。
次の例では、 file.txt という名前のステージから my_stage と呼ばれるファイルを読み取るインラインPythonハンドラーを使用しています。ハンドラーは、Python:code:`sys._xoptions`メソッドと``snowflake_import_directory``システムオプションを使用して、ストアドプロシージャのホームディレクトリの場所を取得できます。
Snowflakeは、ストアドプロシージャの作成中に一度だけファイルを読み取ります。ファイルの読み取りがターゲットハンドラーの外部で発生した場合は、ストアドプロシージャの実行中にファイルを再度読み取ることはありません。
インラインハンドラーを使用してストアドプロシージャを作成する例。
IMPORTS を使用したディレクトリのインポート¶
CREATE PROCEDURE コマンドの IMPORTS 句を使用して、ディレクトリをインポートできます。
注釈
ディレクトリのインポートパスは、末尾のスラッシュ(
/)で終了する必要があります。例:IMPORTS = ('@my_stage/my_dir/')。インポート時にディレクトリの名前を変更するには、ステージパスに
/=custom_name/を追加します。カスタム名は、パスではなく単一のディレクトリ名である必要があります。例:IMPORTS = ('@my_stage/my_dir/=custom_name/')。ディレクトリのインポートはネイティブアプリではサポートされていません。
次の例では、:code:`my_stage`という名前のステージから:code:`my_dir`というディレクトリをインポートし、それに含まれるファイルをリストします。
