Scalaでのストアドプロシージャの記述¶
Scalaでストアドプロシージャを記述できます。ストアドプロシージャ内でSnowparkライブラリを使用して、Snowflakeのテーブルに対してクエリ、更新、およびその他の作業を実行できます。
このトピックでは、ストアドプロシージャのロジックを記述する方法について説明します。ロジックを作成した後、 SQL を使用してプロシージャを作成して呼び出すことができます。詳細については、 ストアドプロシージャの作成 および ストアドプロシージャの呼び出し をご参照ください。
このトピックの内容:
紹介¶
Snowflakeウェアハウスをコンピューティングフレームワークとして使用して、Snowflake内でデータパイプラインをビルドおよび実行できます。データパイプラインのコードでは、ScalaのSnowpark API を使用してストアドプロシージャを記述します。これらのストアドプロシージャの実行をスケジュールするには、 タスク を使用します。
ハンドラーコードの実行時にログをキャプチャし、データをトレースできます。詳細については、 ログおよびトレースの概要 をご参照ください。
注釈
匿名プロシージャの作成と呼び出しの両方を実行するには、 CALL (匿名プロシージャの場合) を使用します。匿名プロシージャの作成と呼び出しには、 CREATE PROCEDURE スキーマ権限を持つロールは必要ありません。
前提条件¶
バージョン1.1.0または最新バージョンのSnowparkライブラリを使用する必要があります。
ハンドラーコードがステージにコピーされるストアドプロシージャを記述している場合は、クラスをコンパイルしてJavaバージョン11.xで実行する必要があります。
Snowparkの開発環境の設定¶
Snowparkライブラリを使用するように開発環境を設定します。 Snowpark Scalaの開発環境の設定 をご参照ください。
ハンドラーコードの構造化とビルド¶
プロシージャを作成する SQL とインラインでハンドラーソースコードを保持するか、別の場所にハンドラーのコンパイル結果を保持して SQL から参照することができます。詳細については、 ハンドラーコードのインラインまたはステージ上での保持 をご参照ください。
プロシージャで使用するハンドラーソースコードのビルドの詳細については、 ハンドラーコードのパッケージ化 をご参照ください。
プロシージャの作成と呼び出し¶
プロシージャのハンドラーを記述すると、 SQL を使ってそのハンドラーを作成し、呼び出すことができます。
ストアドプロシージャの作成
SQL を使用してストアドプロシージャを作成する方法については、 ストアドプロシージャの作成 をご参照ください。
SQL を使った匿名プロシージャの作成については、 CALL (匿名プロシージャの場合) をご参照ください。
ストアドプロシージャの呼び出し
SQL からストアドプロシージャを呼び出す方法については、 ストアドプロシージャの呼び出し をご参照ください。
制限事項¶
Snowparkストアドプロシージャには、次の制限があります。
同時実行はサポートされていません。たとえば、コード内からは、複数のスレッドからクエリを送信することはできません。複数のクエリを同時に発行するコードはエラーを生成します。
タスクからストアドプロシージャを実行する場合は、タスクの作成時にウェアハウスを指定する必要があります。(サーバーレスコンピューティングリソースを使用してタスクを実行することはできません。)
ストアドプロシージャでSnowpark APIs を使用する場合は、次の制限に注意してください。
PUT および GET コマンドを実行する APIs (
Session.sql("PUT ...")
およびSession.sql("GET ...")
を含む)を使用する場合は、プロシージャを呼び出すクエリ用に提供されたメモリバックアップファイルシステムの/tmpディレクトリにのみ書き込むことができます。非同期アクション用 APIs は使用しないでください。
新しいセッションを作成する APIs (例:
Session.builder().configs(...).create()
)は使用しないでください。session.jdbcConnection
(およびそこから返される接続)の使用は、安全ではない動作を引き起こす可能性があるため、サポートされていません。
名前付き仮オブジェクトの作成は、所有者権限のストアドプロシージャではサポートされていません。所有者権限ストアドプロシージャは、ストアドプロシージャ所有者の権限で実行されるストアドプロシージャです。詳細については、 呼び出し元の権限または所有者の権限 をご参照ください。
ストアドプロシージャのハンドラーコードの記述¶
プロシージャのロジックの場合は、プロシージャが呼び出されたときに実行されるハンドラーコードを記述します。このセクションでは、ハンドラーの設計について説明します。
プロシージャを作成する SQL ステートメントにこのコードをインラインで含めるか、コードをステージにコピーして、プロシージャの作成時に参照することができます。詳細については、 ハンドラーコードのインラインまたはステージ上での保持 をご参照ください。
ストアドプロシージャの記述の計画¶
消費されるメモリの量を制限する
Snowflakeは、必要なメモリ量に関してメソッドに制限を設けています。過度の消費を回避する方法の詳細については、 Snowflakeが課す制約内でのハンドラーの設計 をご参照ください。
スレッドセーフなコードを記述する。
ハンドラーメソッドまたは関数がスレッドセーフであることを確認してください。
セキュリティ制限を理解する。
ハンドラーコードは制限されたエンジン内で実行されるため、 UDFs とプロシージャのセキュリティプラクティス で説明されているルールに従ってください。
所有者の権限または呼び出し元の権限の使用を決定する。
ストアドプロシージャの記述を計画するときは、ストアドプロシージャを 呼び出し元の権限または所有者の権限 で実行するかどうかを検討します。
ストアドプロシージャのタイムアウト動作に注意してください。
コードのアクティビティによってタイマーがリセットされない限り、ストアドプロシージャの実行はタイムアウトになります。特に、タイムアウトタイマーは、ファイル操作、クエリ、結果セットの反復など、コードとデータの相互作用によってリセットされます。
クラスまたはオブジェクトの記述¶
定義するメソッドまたは関数は、クラスまたはオブジェクトの一部である必要があります。
クラスまたはオブジェクトを記述するときは、次の点に注意してください。
クラス(またはオブジェクト)とメソッドは、保護またはプライベートにしないでください。
メソッドが静的ではなく、コンストラクターを定義する場合は、クラスの引数がゼロのコンストラクターを定義します。Snowflakeは、初期化時にこの引数がゼロのコンストラクターを呼び出して、クラスのインスタンスを作成します。
同じクラスまたはオブジェクト内の異なるストアドプロシージャに対して異なるメソッドを定義できます。
メソッドまたは関数の記述¶
ストアドプロシージャのメソッドまたは関数を記述するときは、次の点に注意してください。
メソッドまたは関数の最初の引数としてSnowpark
Session
オブジェクトを指定します。ストアドプロシージャを呼び出すと、Snowflakeは自動的に
Session
オブジェクトを作成し、ストアドプロシージャに渡します。(Session
オブジェクトを自分で作成することはできません。)残りの引数と戻り値には、 Snowflakeデータ型 に対応する Scala型 を使用します。
メソッドまたは関数は値を返す必要があります。Scalaのストアドプロシージャの場合は、戻り値が必要です。
コードのアクティビティによってタイマーがリセットされない限り、ストアドプロシージャの実行はタイムアウトになります。特に、タイムアウトタイマーは、ファイル操作、クエリ、結果セットの反復など、コードとデータの相互作用によってリセットされます。
コードで依存関係を利用できるようにする方法¶
ハンドラーコードがハンドラー自体の外部で定義されたコード(JAR ファイル内のクラスなど)またはリソースファイルに依存している場合は、それらの依存関係をステージにアップロードすると、それらの依存関係をコードで使用できるようにすることができます。 プロシージャを作成する ときに、 IMPORTS 句を使用してこれらの依存関係を参照できます。
詳細については、 コードで依存関係を利用できるようにする方法 をご参照ください。
Snowflakeプロシージャ内のデータへのアクセス¶
Snowflakeのデータにアクセスするには、Snowparkライブラリ APIs を使用します。
Scalaストアドプロシージャへの呼び出しを処理する場合、Snowflakeは、Snowpark Session
オブジェクトを作成し、そのオブジェクトをストアドプロシージャのメソッドまたは関数に渡します。
他の言語におけるストアドプロシージャの場合と同様に、セッションのコンテキスト(例: 権限、現在のデータベースおよびスキーマ)は、ストアドプロシージャが呼び出し元の権限で実行されるか、所有者の権限で実行されるかによって決まります。詳細については、 Accessing and Setting the Session State をご参照ください。
この Session
オブジェクトを使用して、 Snowparkライブラリ の APIs を呼び出すことができます。たとえば、 テーブルの DataFrame を作成 したり、 SQL ステートメントを実行したりできます。
詳細については、 Scala用Snowpark開発者ガイド をご参照ください。
注釈
データへのアクセスの制限など、制限については、 制限事項 をご参照ください。
データアクセスの例¶
以下は、指定された数の行を1つのテーブルから別のテーブルにコピーするScalaメソッドの例です。このメソッドは次の引数を取ります。
Snowpark
Session
オブジェクト行をコピーするテーブルの名前
行を保存するテーブルの名前
コピーする行の数。
この例のメソッドは文字列を返します。
object MyObject
{
def myProcedure(session: com.snowflake.snowpark.Session, fromTable: String, toTable: String, count: Int): String =
{
session.table(fromTable).limit(count).write.saveAsTable(toTable)
return "Success"
}
}
次の例では、メソッドではなく関数を定義しています。
object MyObject
{
val myProcedure = (session: com.snowflake.snowpark.Session, fromTable: String, toTable: String, count: Int): String =>
{
session.table(fromTable).limit(count).write.saveAsTable(toTable)
"Success"
}
}
Scalaプロシージャを使用したファイルの読み取り¶
ハンドラーコードを使用してファイルの内容を読み取ることができます。ファイルは、ハンドラーが使用できるSnowflakeのステージである必要があります。たとえば、ファイルを読み取って、ハンドラーで非構造化データを処理する場合があります。
ステージングされたファイルのコンテンツを読み取るには、 SnowflakeFile
クラスまたは InputStream
クラスのいずれかでメソッドを呼び出すことができます。コンピューティング中、ファイルに動的にアクセスする必要があるときは、これを実行する場合があります。詳細については、このトピック内の SnowflakeFile を使用した動的に指定されたファイルの読み取り または InputStream を使用した動的に指定されたファイルの読み取り をご参照ください。
SnowflakeFile
は、次のテーブルで説明されているように、 InputStream
では利用できない機能を提供します。
クラス |
入力 |
メモ |
---|---|---|
|
URL フォーマット:
ファイルは、名前付き内部ステージまたは外部ステージに配置されている必要があります。 |
ファイルサイズなど、追加のファイル属性に簡単にアクセスできます。 |
|
URL フォーマット:
ファイルは、名前付き内部ステージまたは外部ステージに配置されている必要があります。 |
注釈
所有者権限のストアドプロシージャの場合は、プロシージャの所有者にスコープ付き URLs 以外のファイルへのアクセス権が必要があります。呼び出し元権限のプロシージャの場合は、呼び出し元にスコープが付いていない URLs のファイルへのアクセス権が必要です。いずれの場合も、新しい requireScopedUrl
パラメーターに boolean
の値を指定して SnowflakeFile.newInstance
メソッドを呼び出すハンドラーコードにより、ステージングされたファイルを読み取ることができます。
次の例では、スコープ付き URL が不要であることを指定し、 SnowflakeFile.newInstance
を使用します。
var filename = "@my_stage/filename.txt"
var sfFile = SnowflakeFile.newInstance(filename, false)
SnowflakeFile
を使用した動的に指定されたファイルの読み取り¶
次の例のコードには、 String
を受け取り、ファイルの内容を含む String
を返す、ハンドラー関数 execute
があります。実行時に、Snowflakeはプロシージャの input
変数にある受信ファイルパスからのハンドラーの fileName
変数を初期化します。ハンドラーコードは、 SnowflakeFile
インスタンスを使用してファイルを読み取ります。
CREATE OR REPLACE PROCEDURE file_reader_scala_proc_snowflakefile(input VARCHAR)
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
HANDLER = 'FileReader.execute'
PACKAGES=('com.snowflake:snowpark:latest')
AS $$
import java.io.InputStream
import java.nio.charset.StandardCharsets
import com.snowflake.snowpark_java.types.SnowflakeFile
import com.snowflake.snowpark_java.Session
object FileReader {
def execute(session: Session, fileName: String): String = {
var input: InputStream = SnowflakeFile.newInstance(fileName).getInputStream()
return new String(input.readAllBytes(), StandardCharsets.UTF_8)
}
}
$$;
次の CALL の例にあるコードは、ファイルをポイントするスコープ付きファイル URL を作成します。これは、ステージ自体に権限を付与することなく、ステージングされたファイルへの仮アクセスを許可するエンコードされた URL です。
CALL file_reader_scala_proc_snowflakefile(BUILD_SCOPED_FILE_URL('@sales_data_stage', '/car_sales.json'));
InputStream
を使用した動的に指定されたファイルの読み取り¶
次の例のコードには、 InputStream
を受け取り、ファイルの内容を含む String
を返す、ハンドラー関数 execute
があります。実行時に、Snowflakeはプロシージャの input
変数にある受信ファイルパスからのハンドラーの stream
変数を初期化します。ハンドラーコードは、 InputStream
を使用してファイルを読み取ります。
CREATE OR REPLACE PROCEDURE file_reader_scala_proc_input(input VARCHAR)
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
HANDLER = 'FileReader.execute'
PACKAGES=('com.snowflake:snowpark:latest')
AS $$
import java.io.InputStream
import java.nio.charset.StandardCharsets
import com.snowflake.snowpark_java.Session
object FileReader {
def execute(session: Session, stream: InputStream): String = {
val contents = new String(stream.readAllBytes(), StandardCharsets.UTF_8)
return contents
}
}
$$;
次の CALL の例にあるコードは、ファイルをポイントするスコープ付きファイル URL を作成します。これは、ステージ自体に権限を付与することなく、ステージングされたファイルへの仮アクセスを許可するエンコードされた URL です。
CALL file_reader_scala_proc_input(BUILD_SCOPED_FILE_URL('@sales_data_stage', '/car_sales.json'));
表形式のデータを返す¶
表形式フォームでデータを返すプロシージャを書くことができます。表形式データを返すプロシージャを作成するには、次の手順を実行します。
CREATE PROCEDURE ステートメントで、プロシージャの戻り値の型として
TABLE(...)
を指定します。TABLE パラメータとして、返されたデータの列名と 型 がわかっている場合はそれらを指定できます。実行時に指定された場合など、プロシージャを定義するときに返される列がわからない場合は、TABLE パラメータを省略できます。これを行うと、プロシージャの戻り値の列は、そのハンドラーによって返されたデータフレームの列から変換されます。列のデータ型は、 SQL-Scalaデータ型マッピング で指定されたマッピングに従って SQL に変換されます。
Snowparkデータフレームで表形式の結果を返すようにハンドラーを記述します。
データフレームの詳細については、 Snowpark Scalaでの DataFrames の操作 をご参照ください。
注釈
次のいずれかに該当する場合、プロシージャは実行時にエラーを生成します。
戻り値の型として TABLE を宣言していますが、そのハンドラーはデータフレームを返していない。
そのハンドラーはデータフレームを返していますが、プロシージャは TABLE を戻り値の型として宣言していない。
例¶
このセクションの例は、列が文字列と一致する行をフィルターするプロシージャーから表形式の値を返すことを示しています。
データの定義¶
次の例のコードは、従業員のテーブルを作成します。
CREATE OR REPLACE TABLE employees(id NUMBER, name VARCHAR, role VARCHAR);
INSERT INTO employees (id, name, role) VALUES (1, 'Alice', 'op'), (2, 'Bob', 'dev'), (3, 'Cindy', 'dev');
行をフィルターするプロシージャの宣言¶
次の2つの例のコードは、テーブル名とロールを引数として受け取るストアドプロシージャを作成し、ロール列の値が引数として指定されたロールと一致するテーブル内の行を返します。
戻り列の名前と型の指定¶
この例では、 RETURNS TABLE()
ステートメントで列の名前と型を指定しています。
CREATE OR REPLACE PROCEDURE filter_by_role(table_name VARCHAR, role VARCHAR)
RETURNS TABLE(id NUMBER, name VARCHAR, role VARCHAR)
LANGUAGE SCALA
RUNTIME_VERSION = '2.12'
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'Filter.filterByRole'
AS
$$
import com.snowflake.snowpark.functions._
import com.snowflake.snowpark._
object Filter {
def filterByRole(session: Session, tableName: String, role: String): DataFrame = {
val table = session.table(tableName)
val filteredRows = table.filter(col("role") === role)
return filteredRows
}
}
$$;
注釈
現在、 RETURNS TABLE(...)
句では、列型として GEOGRAPHY を指定できません。これは、ストアドプロシージャと匿名プロシージャのいずれを作成する場合にも適用されます。
CREATE OR REPLACE PROCEDURE test_return_geography_table_1()
RETURNS TABLE(g GEOGRAPHY)
...
WITH test_return_geography_table_1() AS PROCEDURE
RETURNS TABLE(g GEOGRAPHY)
...
CALL test_return_geography_table_1();
列タイプとして GEOGRAPHY を指定しようとすると、ストアドプロシージャの呼び出しはエラーになります。
Stored procedure execution error: data type of returned table does not match expected returned table type
これを回避するには、 RETURNS TABLE()
の列の引数と型を省略できます。
CREATE OR REPLACE PROCEDURE test_return_geography_table_1()
RETURNS TABLE()
...
WITH test_return_geography_table_1() AS PROCEDURE
RETURNS TABLE()
...
CALL test_return_geography_table_1();
戻り列の名前と型の省略¶
次の例のコードは、戻り値の列名と型をハンドラーの戻り値の列から挿入できるようにするプロシージャを宣言します。 RETURNS TABLE()
ステートメントから列名と型が省略されています。
CREATE OR REPLACE PROCEDURE filter_by_role(table_name VARCHAR, role VARCHAR)
RETURNS TABLE()
LANGUAGE SCALA
RUNTIME_VERSION = '2.12'
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'Filter.filterByRole'
AS
$$
import com.snowflake.snowpark.functions._
import com.snowflake.snowpark._
object Filter {
def filterByRole(session: Session, tableName: String, role: String): DataFrame = {
val table = session.table(tableName)
val filteredRows = table.filter(col("role") === role)
return filteredRows
}
}
$$;
プロシージャの呼び出し¶
次の例では、ストアドプロシージャを呼び出します。
CALL filter_by_role('employees', 'dev');
プロシージャの呼び出しにより、次の出力が生成されます。
+----+-------+------+
| ID | NAME | ROLE |
+----+-------+------+
| 2 | Bob | dev |
| 3 | Cindy | dev |
+----+-------+------+
ステージングされたハンドラーを使用したストアドプロシージャの準備¶
ハンドラーをコンパイルしてステージにコピーするストアドプロシージャを作成する(ソースとしてインラインに保持するのではなく)場合は、クラスをコンパイルして JAR ファイルにパッケージ化し、 JAR ファイルをステージにアップロードする必要があります。
ハンドラーコードをコンパイルし、パッケージ化する
ストアドプロシージャの設定を簡単にするために、ストアドプロシージャに必要なすべての依存関係を含む JAR ファイルをビルドします。後で、 JAR ファイルをステージにアップロードし、 CREATE PROCEDURE ステートメントから JAR ファイルをポイントする必要があります。アップロードしてポイントする JAR ファイルが少ない場合、このプロセスは簡単です。
sbtを使用して、依存関係のある JAR ファイルをビルドする。
SBT を使用してコードをビルドおよびパッケージ化する場合は、 sbt-assemblyプラグイン を使用して、すべての依存関係を含む JAR ファイルを作成できます。詳細については、 sbtを使用したScalaハンドラーコードのパッケージ化 をご参照ください。
Mavenを使用して、依存関係のある JAR ファイルをビルドする。
Mavenを使用してコードをビルドおよびパッケージ化する場合は、 Mavenアセンブリプラグイン を使用して、すべての依存関係を含む JAR ファイルを作成できます。詳細については、 Mavenを使用したJavaまたはScalaハンドラーコードのパッケージ化 をご参照ください。
他のツールを使用して、依存関係のある JAR ファイルをビルドする。
SBT またはMavenを使用していない場合は、すべての依存関係を含む JAR ファイルをビルドする手順について、ビルドツールのドキュメントをご参照ください。
たとえば、 IntelliJ IDEA プロジェクト(IntelliJ の SBT プロジェクトではない)を使用している場合は、 アーティファクト構成の設定に関する手順 をご参照ください。
ステージにファイルをアップロードする
プロシージャーのロジック(およびその他の依存関係がある場合)をプロシージャーで使用できるようにするには、ステージに必要なファイルをアップロードする必要があります。詳細については、 コードで依存関係を利用できるようにする方法 をご参照ください。