Scalaでのストアドプロシージャの記述

Scalaでストアドプロシージャを記述できます。ストアドプロシージャ内でSnowparkライブラリを使用して、Snowflakeのテーブルに対してクエリ、更新、およびその他の作業を実行できます。

このトピックでは、ストアドプロシージャのロジックを記述する方法について説明します。ロジックを作成した後、 SQL を使用してプロシージャを作成して呼び出すことができます。詳細については、 ストアドプロシージャの作成 および ストアドプロシージャの呼び出し をご参照ください。

このトピックの内容:

概要

Snowflakeウェアハウスをコンピューティングフレームワークとして使用して、Snowflake内でデータパイプラインをビルドおよび実行できます。データパイプラインのコードでは、ScalaのSnowpark API を使用してストアドプロシージャを記述します。これらのストアドプロシージャの実行をスケジュールするには、 タスク を使用します。

注釈

匿名プロシージャの作成と呼び出しの両方を実行するには、 CALL (匿名プロシージャの場合) を使用します。匿名プロシージャの作成と呼び出しには、 CREATE PROCEDURE スキーマ権限を持つロールは必要ありません。

前提条件

バージョン1.1.0または最新バージョンのSnowparkライブラリを使用する必要があります。

ハンドラーコードがステージにコピーされるストアドプロシージャを記述している場合は、クラスをコンパイルしてJavaバージョン11.xで実行する必要があります。

Snowparkの開発環境の設定

Snowparkライブラリを使用するように開発環境を設定します。 Snowpark Scalaの開発環境の設定 をご参照ください。

ハンドラーコードの構造化とビルド

プロシージャを作成する SQL とインラインでハンドラーソースコードを保持するか、別の場所にハンドラーのコンパイル結果を保持して SQL から参照することができます。詳細については、 ハンドラーコードのインラインまたはステージ上での保持 をご参照ください。

プロシージャで使用するハンドラーソースコードのビルドの詳細については、 ハンドラーコードのパッケージ化 をご参照ください。

制限事項

Snowparkストアドプロシージャには、次の制限があります。

  • 同時実行はサポートされていません。たとえば、コード内からは、複数のスレッドからクエリを送信することはできません。複数のクエリを同時に発行するコードはエラーを生成します。

  • タスクからストアドプロシージャを実行する場合は、タスクの作成時にウェアハウスを指定する必要があります。(Snowflakeが管理するコンピューティングリソースを使用してタスクを実行することはできません。)

  • Scalaによるストアドプロシージャハンドラーコードからのファイルの読み取りと書き込みは、まだ完全にはサポートされていません。ファイルを読み書きすると、動作が不安定になる場合があります。これには、引数として InputStream を受け取り、 putget などの FileOperation クラス(通常は Session.file メソッドを介してアクセス)から利用可能なメソッドを使用することが含まれます。

  • ストアドプロシージャでSnowpark APIs を使用する場合は、次の制限に注意してください。

    • PUT および GET コマンドを実行する APIsSession.sql("PUT ...") および Session.sql("GET ...") を含む)を使用する場合は、プロシージャを呼び出すクエリ用に提供されたメモリバックアップファイルシステムの/tmpディレクトリにのみ書き込むことができます。

    • 非同期アクション用 APIs は使用しないでください。

    • 新しいセッションを作成する APIs (例: Session.builder().configs(...).create())は使用しないでください。

    • session.jdbcConnection (およびそこから返される接続)の使用は、安全ではない動作を引き起こす可能性があるため、サポートされていません。

  • 名前付き仮オブジェクトの作成は、所有者権限のストアドプロシージャではサポートされていません。所有者権限ストアドプロシージャは、ストアドプロシージャ所有者の権限で実行されるストアドプロシージャです。詳細については、 呼び出し元の権限または所有者の権限 をご参照ください。

ストアドプロシージャのScalaコードの記述

プロシージャのロジックの場合は、プロシージャが呼び出されたときに実行されるハンドラーコードを記述します。このセクションでは、ハンドラーの設計について説明します。

プロシージャを作成する SQL ステートメントにこのコードをインラインで含めるか、コードをステージにコピーして、プロシージャの作成時に参照することができます。詳細については、 ハンドラーコードのインラインまたはステージ上での保持 をご参照ください。

ストアドプロシージャの記述の計画

消費されるメモリの量を制限する

Snowflakeは、必要なメモリ量に関してメソッドに制限を設けています。過度の消費を回避する方法の詳細については、 Snowflakeが課す制約内でのハンドラーの設計 をご参照ください。

スレッドセーフなコードを記述する

ハンドラーメソッドまたは関数がスレッドセーフであることを確認してください。

セキュリティ制限を理解する

ハンドラーコードは制限されたエンジン内で実行されるため、 UDFs とプロシージャのセキュリティプラクティス で説明されているルールに従ってください。

所有者の権限または呼び出し元の権限の使用を決定する

さらに、ストアドプロシージャの記述を計画するときは、ストアドプロシージャを 呼び出し元の権限または所有者の権限 で実行するかどうかを検討します。

ストアドプロシージャのタイムアウト動作に注意してください

コードのアクティビティによってタイマーがリセットされない限り、ストアドプロシージャの実行はタイムアウトになります。特に、タイムアウトタイマーは、ファイル操作、クエリ、結果セットの反復など、コードとデータの相互作用によってリセットされます。

クラスまたはオブジェクトの記述

定義するメソッドまたは関数は、クラスまたはオブジェクトの一部である必要があります。

クラスまたはオブジェクトを記述するときは、次の点に注意してください。

  • クラス(またはオブジェクト)とメソッドは、保護またはプライベートにしないでください。

  • メソッドが静的ではなく、コンストラクターを定義する場合は、クラスの引数がゼロのコンストラクターを定義します。Snowflakeは、初期化時にこの引数がゼロのコンストラクターを呼び出して、クラスのインスタンスを作成します。

  • 同じクラスまたはオブジェクト内の異なるストアドプロシージャに対して異なるメソッドを定義できます。

メソッドまたは関数の記述

ストアドプロシージャのメソッドまたは関数を記述するときは、次の点に注意してください。

  • メソッドまたは関数の最初の引数としてSnowpark Session オブジェクトを指定します。

    ストアドプロシージャを呼び出すと、Snowflakeは自動的に Session オブジェクトを作成し、ストアドプロシージャに渡します。(Session オブジェクトを自分で作成することはできません。)

  • 残りの引数と戻り値には、 Snowflakeデータ型 に対応する Scala型 を使用します。

  • メソッドまたは関数は値を返す必要があります。Scalaのストアドプロシージャの場合は、戻り値が必要です。

  • コードのアクティビティによってタイマーがリセットされない限り、ストアドプロシージャの実行はタイムアウトになります。特に、タイムアウトタイマーは、ファイル操作、クエリ、結果セットの反復など、コードとデータの相互作用によってリセットされます。

コードで依存関係を利用できるようにする方法

ハンドラーコードがハンドラー自体の外部で定義されたコード(JAR ファイル内のクラスなど)またはリソースファイルに依存している場合は、それらの依存関係をステージにアップロードすると、それらの依存関係をコードで使用できるようにすることができます。 プロシージャを作成する ときに、 IMPORTS 句を使用してこれらの依存関係を参照できます。

詳細については、 コードで依存関係を利用できるようにする方法 をご参照ください。

ストアドプロシージャからSnowflakeへのデータのアクセス

Snowflakeのデータにアクセスするには、Snowparkライブラリ APIs を使用します。

Scalaストアドプロシージャへの呼び出しを処理する場合、Snowflakeは、Snowpark Session オブジェクトを作成し、そのオブジェクトをストアドプロシージャのメソッドまたは関数に渡します。

他の言語におけるストアドプロシージャの場合と同様に、セッションのコンテキスト(例: 権限、現在のデータベースおよびスキーマ)は、ストアドプロシージャが呼び出し元の権限で実行されるか、所有者の権限で実行されるかによって決まります。詳細については、 セッション状態へのアクセスおよび設定 をご参照ください。

この Session オブジェクトを使用して、 Snowparkライブラリ の APIs を呼び出すことができます。たとえば、 テーブルの DataFrame を作成 したり、 SQL ステートメントを実行したりできます。

詳細については、 Scala用Snowpark開発者ガイド をご参照ください。

注釈

データへのアクセスの制限など、制限については、 制限事項 をご参照ください。

データアクセスの例

以下は、指定された数の行を1つのテーブルから別のテーブルにコピーするScalaメソッドの例です。このメソッドは次の引数を取ります。

  • Snowpark Session オブジェクト

  • 行をコピーするテーブルの名前

  • 行を保存するテーブルの名前

  • コピーする行の数。

この例のメソッドは文字列を返します。

object MyObject
{
  def myProcedure(session: com.snowflake.snowpark.Session, fromTable: String, toTable: String, count: Int): String =
  {
    session.table(fromTable).limit(count).write.saveAsTable(toTable)
    return "Success"
  }
}
Copy

次の例では、メソッドではなく関数を定義しています。

object MyObject
{
  val myProcedure = (session: com.snowflake.snowpark.Session, fromTable: String, toTable: String, count: Int): String =>
  {
    session.table(fromTable).limit(count).write.saveAsTable(toTable)
    "Success"
  }
}
Copy

他のクラスとリソースファイルへのアクセス

コードがストアドプロシージャの外部で定義されたクラス(例: 別の JAR ファイル内のクラス)またはリソースファイルに依存している場合、それらの依存関係をステージにアップロードすると、ハンドラーはそれらの依存関係を利用できるようになります。詳細については、 コードで依存関係を利用できるようにする方法 をご参照ください。

後で CREATE PROCEDURE ステートメント を実行するときに、 IMPORTS 句を使用してこれらのファイルをポイントします。

ステージングされたハンドラーを使用したストアドプロシージャの準備

ハンドラーをコンパイルしてステージにコピーするストアドプロシージャを作成する(ソースとしてインラインに保持するのではなく)場合は、クラスをコンパイルして JAR ファイルにパッケージ化し、 JAR ファイルをステージにアップロードする必要があります。

Scalaコードのコンパイルとパッケージ化

ストアドプロシージャの設定を簡単にするために、ストアドプロシージャに必要なすべての依存関係を含む JAR ファイルをビルドします。後で、 JAR ファイルをステージにアップロードし、 CREATE PROCEDURE ステートメントから JAR ファイルをポイントする必要があります。アップロードしてポイントする JAR ファイルが少ない場合、このプロセスは簡単です。

SBT を使用した、依存関係のある JAR ファイルのビルド

SBT を使用してコードをビルドおよびパッケージ化する場合は、 sbt-assemblyプラグイン を使用して、すべての依存関係を含む JAR ファイルを作成できます。詳細については、 SBT を使用したScalaハンドラーコードのパッケージ化 をご参照ください。

Mavenを使用した、依存関係のある JAR ファイルのビルド

Mavenを使用してコードをビルドおよびパッケージ化する場合は、 Mavenアセンブリプラグイン を使用して、すべての依存関係を含む JAR ファイルを作成できます。詳細については、 Mavenを使用したJavaまたはScalaハンドラーコードのパッケージ化 をご参照ください。

他のツールを使用した、依存関係のある JAR ファイルのビルド

SBT またはMavenを使用していない場合は、すべての依存関係を含む JAR ファイルをビルドする手順について、ビルドツールのドキュメントをご参照ください。

たとえば、 IntelliJ IDEA プロジェクト(IntelliJ の SBT プロジェクトではない)を使用している場合は、 アーティファクト構成の設定に関する手順 をご参照ください。

ステージへのファイルのアップロード

プロシージャーのロジック(およびその他の依存関係がある場合)をプロシージャーで使用できるようにするには、ステージに必要なファイルをアップロードする必要があります。詳細については、 コードで依存関係を利用できるようにする方法 をご参照ください。

ストアドプロシージャの作成

SQL を使用してストアドプロシージャを作成する方法については、 ストアドプロシージャの作成 をご参照ください。

表形式のデータを返す

表形式フォームでデータを返すプロシージャを書くことができます。表形式データを返すプロシージャを作成するには、次の手順を実行します。

  • CREATE PROCEDURE ステートメントで、プロシージャの戻り値の型として TABLE(...) を指定します。

    TABLE パラメータとして、返されたデータの列名と がわかっている場合はそれらを指定できます。実行時に指定された場合など、プロシージャを定義するときに返される列がわからない場合は、TABLE パラメータを省略できます。これを行うと、プロシージャの戻り値の列は、そのハンドラーによって返されたデータフレームの列から変換されます。列のデータ型は、 SQL-Scalaデータ型マッピング で指定されたマッピングに従って SQL に変換されます。

  • Snowparkデータフレームで表形式の結果を返すようにハンドラーを記述します。

    データフレームの詳細については、 Snowpark Scalaでの DataFrames の操作 をご参照ください。

注釈

次のいずれかに該当する場合、プロシージャは実行時にエラーを生成します。

  • 戻り値の型として TABLE を宣言していますが、そのハンドラーはデータフレームを返していない。

  • そのハンドラーはデータフレームを返していますが、プロシージャは TABLE を戻り値の型として宣言していない。

このセクションの例は、列が文字列と一致する行をフィルターするプロシージャーから表形式の値を返すことを示しています。

データの定義

次の例のコードは、従業員のテーブルを作成します。

CREATE OR REPLACE TABLE employees(id NUMBER, name VARCHAR, role VARCHAR);
INSERT INTO employees (id, name, role) VALUES (1, 'Alice', 'op'), (2, 'Bob', 'dev'), (3, 'Cindy', 'dev');
Copy

行をフィルターするプロシージャの宣言

次の2つの例のコードは、テーブル名とロールを引数として受け取るストアドプロシージャを作成し、ロール列の値が引数として指定されたロールと一致するテーブル内の行を返します。

戻り列の名前と型の指定

この例では、 RETURNS TABLE() ステートメントで列の名前と型を指定しています。

CREATE OR REPLACE PROCEDURE filter_by_role(table_name VARCHAR, role VARCHAR)
RETURNS TABLE(id NUMBER, name VARCHAR, role VARCHAR)
LANGUAGE SCALA
RUNTIME_VERSION = '2.12'
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'Filter.filterByRole'
AS
$$
import com.snowflake.snowpark.functions._
import com.snowflake.snowpark._

object Filter {
    def filterByRole(session: Session, tableName: String, role: String): DataFrame = {
        val table = session.table(tableName)
        val filteredRows = table.filter(col("role") === role)
        return filteredRows
    }
}
$$;
Copy

注釈

現在、 CREATE PROCEDURERETURNS TABLE(...) 句では、列型として GEOGRAPHY を指定できません。

CREATE OR REPLACE PROCEDURE test_return_geography_table_1()
RETURNS TABLE(g GEOGRAPHY)
...
Copy

指定した場合にストアドプロシージャを呼び出すと、次のエラーが発生します。

CALL test_return_geography_table_1();
Copy
Stored procedure execution error: data type of returned table does not match expected returned table type
Copy

これを回避するには、 RETURNS TABLE() の列の引数と型を省略できます。

CREATE OR REPLACE PROCEDURE test_return_geography_table_1()
RETURNS TABLE()
...
Copy
戻り列の名前と型の省略

次の例のコードは、戻り値の列名と型をハンドラーの戻り値の列から挿入できるようにするプロシージャを宣言します。 RETURNS TABLE() ステートメントから列名と型が省略されています。

CREATE OR REPLACE PROCEDURE filter_by_role(table_name VARCHAR, role VARCHAR)
RETURNS TABLE()
LANGUAGE SCALA
RUNTIME_VERSION = '2.12'
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'Filter.filterByRole'
AS
$$
import com.snowflake.snowpark.functions._
import com.snowflake.snowpark._

object Filter {
    def filterByRole(session: Session, tableName: String, role: String): DataFrame = {
        val table = session.table(tableName)
        val filteredRows = table.filter(col("role") === role)
        return filteredRows
    }
}
$$;
Copy

プロシージャの呼び出し

次の例では、ストアドプロシージャを呼び出します。

CALL filter_by_role('employees', 'dev');
Copy

プロシージャの呼び出しにより、次の出力が生成されます。

+----+-------+------+
| ID | NAME  | ROLE |
+----+-------+------+
| 2  | Bob   | dev  |
| 3  | Cindy | dev  |
+----+-------+------+
Copy

ストアドプロシージャの呼び出し

SQL からストアドプロシージャを呼び出す方法については、 ストアドプロシージャの呼び出し をご参照ください。