Snowpark(Scala)でのストアドプロシージャの記述

このプレビュー機能を使用すると、Scalaでストアドプロシージャを記述できます。ストアドプロシージャ内でSnowparkライブラリを使用して、Snowflakeのテーブルに対してクエリ、更新、およびその他の作業を実行できます。

このトピックでは、Scalaでストアドプロシージャを記述する方法について説明します。

このトピックの内容:

紹介

Snowparkストアドプロシージャを使用すると、Snowflakeウェアハウスをコンピューティングフレームワークとして使用して、Snowflake内でデータパイプラインをビルドおよび実行できます。データパイプラインのコードでは、ScalaのSnowpark API を使用してストアドプロシージャを記述します。これらのストアドプロシージャの実行をスケジュールするには、 タスク を使用します。

このプレビューでは、Snowparkストアドプロシージャに次の制限があります。

  • 同時実行は、ストアドプロシージャではサポートされていません。たとえば、ストアドプロシージャ内からは、複数のスレッドからクエリを送信することはできません。

  • タスクからストアドプロシージャを実行する場合は、タスクの作成時にウェアハウスを指定する必要があります。(Snowflakeが管理するコンピューティングリソースを使用してタスクを実行することはできません。)

  • ストアドプロシージャでSnowpark APIs の一部を使用することはできません。サポート外の APIs のリストについては、 ストアドプロシージャからSnowflakeへのデータのアクセス をご参照ください。

次のセクションでは、ScalaのSnowparkストアドプロシージャについて詳しく説明します。

前提条件

バージョン1.1.0または最新バージョンのSnowparkライブラリを使用する必要があります。

プリコンパイル済みストアドプロシージャを作成している場合は、Javaバージョン11.xで実行するようにクラスをコンパイルする必要があります。

Snowparkの開発環境の設定

次に、Snowparkライブラリを使用するように開発環境をセットアップします。 Snowpark Scalaの開発環境の設定 をご参照ください。

インラインまたはプリコンパイル済みストアドプロシージャ作成の選択

Java UDFs の場合と同様に、インラインストアドプロシージャまたはプリコンパイル済みストアドプロシージャのいずれかを作成できます。

  • インラインストアドプロシージャでは、 CREATE PROCEDURE ステートメントの AS 句にScalaコードを記述します。例:

    CREATE OR REPLACE PROCEDURE MYPROC(fromTable STRING, toTable STRING, count INT)
      RETURNS STRING
      LANGUAGE SCALA
      RUNTIME_VERSION = '2.12'
      PACKAGES = ('com.snowflake:snowpark:latest')
      HANDLER = 'MyScalaObject.run'
      AS
      $$
        object MyScalaObject {
          def run(session: com.snowflake.snowpark.Session, fromTable: String, toTable: String, count: Int): String = {
            session.table(fromTable).limit(count).write.saveAsTable(toTable)
            return "SUCCESS"
          }
        }
      $$;
    

    注釈

    繰り返しの呼び出しでより高速に実行するために、Snowflakeがコンパイルされたクラスを保存する JAR ファイルの場所に TARGET_PATH を設定できます。 ストアドプロシージャの作成 をご参照ください。

  • プリコンパイル済みストアドプロシージャでは、Scalaコードを .scala ソースファイルに記述します。

    例:

    object MyScalaObject {
      def run(session: com.snowflake.snowpark.Session, fromTable: String, toTable: String, count: Int): String = {
        session.table(fromTable).limit(count).write.saveAsTable(toTable)
        return "SUCCESS"
      }
    }
    

    次に、コードをコンパイルし、クラスを JAR ファイルにパッケージ化し、 JAR ファイルをステージにアップロードし、ステージ上の JAR ファイルをポイントして CREATE PROCEDURE コマンドを実行します。例:

    CREATE OR REPLACE PROCEDURE MYPROC(value INT, fromTable STRING, toTable STRING, count INT)
      RETURNS INT
      LANGUAGE SCALA
      RUNTIME_VERSION = '2.12'
      PACKAGES = ('com.snowflake:snowpark:latest')
      IMPORTS = ('@mystage/MyCompiledScalaCode.jar')
      HANDLER = 'MyScalaObject.run';
    

ストアドプロシージャのScalaコードの記述

ストアドプロシージャのコードについては、Scalaメソッドまたは関数を記述する必要があります。次のセクションでは、コードを記述するためのガイドラインを示します。

ストアドプロシージャの記述の計画

ストアドプロシージャのScalaコードには、Java UDF のコードと同じ制約があります。ストアドプロシージャの記述を計画するときは、これらの制約を考慮に入れる必要があります。

消費されるメモリの量を制限する

Java UDFs の場合と同様に、Snowflakeは、必要なメモリ量に関してメソッドまたは関数に制限を設けています。

メソッドまたは関数では、メモリを過度に消費しないようにする必要があります。

スレッドセーフなコードを記述する

Java UDFs の場合 と同様に、メソッドまたは関数がスレッドセーフであることを確認する必要があります。

セキュリティ制限を理解する

メソッドまたは関数は制限されたエンジン内で実行されるため、 Java UDFs のために文書化された ルールと同じルールに従う必要があります。

所有者の権限または呼び出し元の権限の使用を決定する

さらに、ストアドプロシージャの記述を計画するときは、ストアドプロシージャを 呼び出し元の権限または所有者の権限 で実行するかどうかを検討します。

クラスまたはオブジェクトの記述

定義するメソッドまたは関数は、クラスまたはオブジェクトの一部である必要があります。

クラスまたはオブジェクトを記述するときは、次の点に注意してください。

  • クラス(またはオブジェクト)とメソッドは、保護またはプライベートにしないでください。

  • メソッドが静的ではなく、コンストラクターを定義する場合は、クラスの引数がゼロのコンストラクターを定義します。Snowflakeは、初期化時にこの引数がゼロのコンストラクターを呼び出して、クラスのインスタンスを作成します。

  • 同じクラスまたはオブジェクト内の異なるストアドプロシージャに対して異なるメソッドを定義できます。

メソッドまたは関数の記述

ストアドプロシージャのメソッドまたは関数を記述するときは、次の点に注意してください。

  • メソッドまたは関数の最初の引数としてSnowpark Session オブジェクトを指定します。

    ストアドプロシージャを呼び出すと、Snowflakeは自動的に Session オブジェクトを作成し、ストアドプロシージャに渡します。(Session オブジェクトを自分で作成することはできません。)

  • 残りの引数と戻り値には、 Snowflakeデータ型 に対応する Scala型 を使用します。

  • メソッドまたは関数は値を返す必要があります。Scalaのストアドプロシージャの場合は、戻り値が必要です。

ストアドプロシージャからSnowflakeへのデータのアクセス

Snowflakeのデータにアクセスするには、Snowparkライブラリ APIs を使用します。

Scalaストアドプロシージャへの呼び出しを処理する場合、Snowflakeは、Snowpark Session オブジェクトを作成し、そのオブジェクトをストアドプロシージャのメソッドまたは関数に渡します。

他の言語におけるストアドプロシージャの場合と同様に、セッションのコンテキスト(例: 権限、現在のデータベースおよびスキーマ)は、ストアドプロシージャが呼び出し元の権限で実行されるか、所有者の権限で実行されるかによって決まります。詳細については、 セッション状態 をご参照ください。

この Session オブジェクトを使用して、 Snowparkライブラリ の APIs を呼び出すことができます。たとえば、 テーブルの DataFrame を作成 したり、 SQL ステートメントを実行したりできます。

詳細については、 Scala用Snowpark開発者ガイド をご参照ください。

注釈

次のSnowpark APIs は、ストアドプロシージャでは使用できません。

以下は、指定された数の行を1つのテーブルから別のテーブルにコピーするScalaメソッドの例です。このメソッドは次の引数を取ります。

  • Snowpark Session オブジェクト

  • 行をコピーするテーブルの名前

  • 行を保存するテーブルの名前

  • コピーする行の数。

この例のメソッドは文字列を返します。

object MyObject
{
  def myProcedure(session: com.snowflake.snowpark.Session, fromTable: String, toTable: String, count: Int): String =
  {
    session.table(fromTable).limit(count).write.saveAsTable(toTable)
    return "Success"
  }
}

次の例では、メソッドではなく関数を定義しています。

object MyObject
{
  val myProcedure = (session: com.snowflake.snowpark.Session, fromTable: String, toTable: String, count: Int): String =>
  {
    session.table(fromTable).limit(count).write.saveAsTable(toTable)
    "Success"
  }
}

他のクラスとリソースファイルへのアクセス

コードがストアドプロシージャの外部で定義されたクラス(例: 別の JAR ファイル内のクラス)またはリソースファイルに依存している場合は、 これらのファイルをステージにアップロード して、ストアドプロシージャが実行されるときに、ファイルが次の場合に使用できるようにする必要があります。

後で CREATE PROCEDURE ステートメントを実行 するときに、 IMPORTS 句を使用してこれらのファイルをポイントします。

プリコンパイル済みストアドプロシージャの準備

プリコンパイル済みストアドプロシージャ(インラインストアドプロシージャではなく)を作成する場合は、クラスをコンパイルして JAR ファイルにパッケージ化し、 JAR ファイルをステージにアップロードする必要があります。

Scalaコードのコンパイルとパッケージ化

ストアドプロシージャの設定を簡単にするために、ストアドプロシージャに必要なすべての依存関係を含む JAR ファイルをビルドします。後で、 JAR ファイルをステージにアップロードし、 CREATE PROCEDURE ステートメントから JAR ファイルをポイントする必要があります。アップロードしてポイントする JAR ファイルが少ない場合、このプロセスは簡単です。

次のセクションでは、すべての依存関係を含む JAR ファイルを作成するためのヒントをいくつか紹介します。

SBT を使用した、依存関係のある JAR ファイルのビルド

SBT を使用してコードをビルドおよびパッケージ化する場合は、 sbt-assemblyプラグイン を使用して、すべての依存関係を含む JAR ファイルを作成できます。

  1. build.sbt ファイルを含むディレクトリで、 project/ サブディレクトリに plugins.sbt という名前のファイルを作成します。

    たとえば、 build.sbt ファイルを含むディレクトリが hello-snowpark/ の場合は、ファイル hello-snowpark/project/plugins.sbt を作成します。

    hello-snowpark/
                 |__ build.sbt
                 |__ project/
                           |__plugins.sbt
    
  2. plugins.sbt ファイルに、次の行を追加します。

    addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.1.0")
    
  3. プロジェクトで同じライブラリの複数のバージョンが必要な場合(例: プロジェクトが、異なるバージョンの3番目のライブラリを必要とする、2つのライブラリに依存している場合)、依存関係を解決するために build.sbt ファイルでマージ戦略を定義します。詳細については、 マージ戦略 をご参照ください。

  4. build.sbt ファイルで、Snowparkライブラリのバージョンを少なくとも 必要な最小バージョン に更新します。

    libraryDependencies += "com.snowflake" % "snowpark" % "1.1.0" % "provided"
    

    さらに、依存関係が "provided" であることを指定して、 JAR ファイルからSnowparkライブラリを除外します(上記のとおり)。

  5. プロジェクトのディレクトリ(例: hello-snowpark)に移動し、次のコマンドを実行します。

    sbt assembly
    

    注釈

    エラー Not a valid command: assemblyNot a valid project ID: assembly、または Not a valid key: assembly が発生した場合は、 plugins.sbt ファイルが project/ という名前のサブディレクトリにあることを確認してください(ステップ1で説明のとおり)。

    このコマンドは、次の場所に JAR ファイルを作成します。

    target/scala-<version>/<project-name>-assembly-1.0.jar
    

Mavenを使用した、依存関係のある JAR ファイルのビルド

Mavenを使用してコードをビルドおよびパッケージ化する場合は、 Mavenアセンブリプラグイン を使用して、すべての依存関係を含む JAR ファイルを作成できます。

  1. プロジェクトのディレクトリ(例: hello-snowpark/)に、 assembly/ という名前のサブディレクトリを作成します。

  2. そのディレクトリに、 JAR ファイルに依存関係を含めることを指定する アセンブリ記述子ファイル を作成します。

    例については、 jar-with-dependencies をご参照ください。

  3. アセンブリ記述子で、 JAR ファイルからSnowparkライブラリを除外する <dependentencySet> 要素を追加します。

    例:

    <assembly xmlns="http://maven.apache.org/ASSEMBLY/2.1.0"
              xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
              xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.1.0 http://maven.apache.org/xsd/assembly-2.1.0.xsd">
      <id>jar-with-dependencies</id>
      <formats>
         <format>jar</format>
      </formats>
      <includeBaseDirectory>false</includeBaseDirectory>
      <dependencySets>
        <dependencySet>
          <outputDirectory>/</outputDirectory>
          <useProjectArtifact>false</useProjectArtifact>
          <unpack>true</unpack>
          <scope>provided</scope>
          <excludes>
            <exclude>com.snowflake:snowpark</exclude>
          </excludes>
        </dependencySet>
      </dependencySets>
    </assembly>
    

    アセンブリ記述子の要素については、 アセンブリ記述子の形式 をご参照ください。

  4. pom.xml ファイルの <プロジェクト> » <ビルド> » <プラグイン> の下に、Mavenアセンブリプラグインの <プラグイン> 要素を追加します。

    さらに、 <構成> » <記述子> の下に、前のステップで作成したアセンブリ記述子ファイルをポイントする <記述子> を追加します。

    例:

    <project>
      [...]
      <build>
        [...]
        <plugins>
          <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.3.0</version>
            <configuration>
              <descriptors>
                <descriptor>src/assembly/jar-with-dependencies.xml</descriptor>
              </descriptors>
            </configuration>
            [...]
          </plugin>
          [...]
        </plugins>
        [...]
      </build>
      [...]
    </project>
    

他のツールを使用した、依存関係のある JAR ファイルのビルド

SBT またはMavenを使用していない場合は、すべての依存関係を含む JAR ファイルをビルドする手順について、ビルドツールのドキュメントをご参照ください。

たとえば、 IntelliJ IDEA プロジェクト(IntelliJ の SBT プロジェクトではない)を使用している場合は、 アーティファクト構成の設定に関する手順 をご参照ください。

ステージへのファイルのアップロード

次に、ストアドプロシージャに必要なファイルをステージにアップロードします。

  1. ファイルのステージを選択します。

    外部ステージまたは名前付き内部ステージを使用できます。 PUT コマンドを使用してファイルをアップロードする場合は、名前付きの内部ステージを使用します。(PUT コマンドは、外部ステージへのファイルのアップロードをサポートしていません。)

    既存のステージを使用することも、 CREATE STAGE を実行して新しいステージを作成することもできます。たとえば、次のコマンドは mystage という名前の新しい内部ステージを作成します。

    CREATE STAGE mystage;
    

    注釈

    ストアドプロシージャの所有者は、ステージに対する READ 権限 を持っている必要があります。

  2. PUT コマンドを使用して、次のファイルをそのステージにアップロードします。

    • コンパイルされたScalaコードを含む JAR ファイル。

    • ストアドプロシージャが依存するその他のファイル。

    例:

    PUT file:///Users/MyUserName/myjar.jar
            @mystage
            AUTO_COMPRESS = FALSE
            OVERWRITE = TRUE
            ;
    

    注釈

    AUTO_COMPRESS = FALSE を省略すると、 PUT コマンドはファイルを自動的に圧縮します。ステージ上の圧縮ファイルの名前は myjar.jar.gz になります。後で CREATE PROCEDURE コマンドを実行 するときに、 IMPORTS 句でこの .gz 拡張子を使用してファイル名を指定する必要があります。

コンパイルされたScalaコードを含む JAR ファイルを削除または名前変更すると、ストアドプロシージャを呼び出せなくなることに注意してください。

JAR ファイルを更新する必要がある場合は、次を実行します。

  • ストアドプロシージャの呼び出しができない間に更新します。

  • 句 OVERWRITE=TRUE を PUT コマンドに追加します。

ストアドプロシージャの作成

次に、 CREATE PROCEDURE ステートメントを実行して、メソッドまたは関数のストアドプロシージャを作成します。以下のテーブルにリストされているパラメーターを設定します。

パラメーター

説明

[ arg_name arg_data_type [, ... ] ]

  • Snowpark Session オブジェクトの引数を省略します。前述のように、これは CREATE PROCEDURE または CALL で指定する仮パラメーターではありません。ストアドプロシージャを呼び出すと、Snowflakeは Session オブジェクトを作成し、ストアドプロシージャに渡します。

  • 残りの引数のデータ型には、メソッドまたは関数の引数の Scala型 に対応する Snowflakeデータ型 を使用します。

RETURNS result_data_type

戻り値のSnowflakeデータ型で RETURNS を指定します。

LANGUAGE SCALA

ストアドプロシージャコードがScalaで記述されていることを示すには、これを指定する必要があります。

RUNTIME_VERSION = '2.12'

ストアドプロシージャがScala 2.12を使用することを示すには、これを指定する必要があります。

PACKAGES = ( 'package_name' )

このリストには、使用するバージョンのSnowparkライブラリのパッケージを含めます。

Snowparkライブラリの完全修飾パッケージ名を次の形式で指定します。

com.snowflake:snowpark:<version>

例:

  • Snowparkの最新バージョンを使用するには、次を使用します。

    PACKAGES = ('com.snowflake:snowpark:latest')
    
  • Snowparkの 1.5.0 バージョンを使用するには、次を使用します。

    PACKAGES = ('com.snowflake:snowpark:1.5.0')

注釈

新しいストアドプロシージャを作成するときは、バージョン1.1.0以降を指定します。

Snowflakeは、ストアドプロシージャでSnowparkバージョン0.9.0以降の使用をサポートしています。ただし、これらのバージョンには制限があることに注意してください。たとえば、1.1.0より前のバージョンは、ストアドプロシージャでトランザクションの使用をサポートしていません。

サポートされているパッケージとバージョンのリストについては、 INFORMATION_SCHEMA.PACKAGES ビューLANGUAGE = 'scala' の行をクエリします。例:

select * from information_schema.packages where language = 'scala';

IMPORTS = ( 'file' [, 'file' ... ] )

ストアドプロシージャが、 ステージの場所 にアップロードした JAR ファイルに依存している場合は、それらのファイルをこのリストに含めてください。

インラインストアドプロシージャを作成している場合は、コードがストアドプロシージャまたはリソースファイルの外部で定義されたクラスに依存している場合を除き、この句を省略できます。

プリコンパイル済みストアドプロシージャを作成する場合は、ストアドプロシージャの定義を含む JAR ファイルも含める必要があります。

HANDLER = 'method_name'

これをScalaメソッドまたは関数の完全修飾名に設定します。

TARGET_PATH = 'jar_file'

インラインストアドプロシージャを作成していて、呼び出しのたびにSnowflakeがコードを再コンパイルしないようにする場合は、Snowflakeがコンパイル済みコード用に作成する JAR ファイルでこれを設定できます。

これは、 WRITE 権限を持つステージ上のパスである必要があります。

EXECUTE AS CALLER

呼び出し元の権限 を使用するようにストアドプロシージャを設定する場合は、このパラメーターを追加します。

代わりに所有者の権限を使用する場合は、このパラメーターを省略します。

次の例では、Scalaでストアドプロシージャを作成します。

例1: インラインストアドプロシージャ:

CREATE OR REPLACE PROCEDURE myProc(fromTable STRING, toTable STRING, count INT)
RETURNS STRING
LANGUAGE SCALA
RUNTIME_VERSION = '2.12'
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'MyObject.myProcedure'
AS
$$
  object MyObject
  {
    def myProcedure(session: com.snowflake.snowpark.Session, fromTable: String, toTable: String, count: Int): String =
    {
      session.table(fromTable).limit(count).write.saveAsTable(toTable)
      return "Success"
    }
  }
$$;

例2: 内部ステージ mystage の JAR ファイル myjar.jar でコンパイルされたコードを使用する、プリコンパイル済みストアドプロシージャ:

CREATE OR REPLACE PROCEDURE myProc(fromTable STRING, toTable STRING, count INT)
RETURNS STRING
LANGUAGE SCALA
RUNTIME_VERSION = '2.12'
PACKAGES = ('com.snowflake:snowpark:latest')
IMPORTS = ('@mystage/myjar.jar')
HANDLER = 'MyObject.myProcedure';

ストアドプロシージャの呼び出し

ユーザーがストアドプロシージャを呼び出すには、ユーザーのロールに、ストアドプロシージャの USAGE 権限 が必要です。

ストアドプロシージャを呼び出す権限を取得すると、 CALL ステートメントを使用してストアドプロシージャを呼び出すことができます。例:

CALL myProc('table_a', 'table_b', 5);

付録: Scalaデータ型のSnowflakeデータ型へのマッピング

UDFs の引数と戻り値について、Snowflakeは、 パラメーターと戻り値型の SQL-Javaデータ型のマッピング にリストされているJava型に加えて、 さらに 次のScalaデータ型をサポートします。

SQL データ型

Scalaデータ型

メモ

NUMBER

次の型がサポートされています。

  • Int または Option[Int]

  • Long または Option[Long]

FLOAT

Float または Option[Float]

DOUBLE

Double または Option[Double]

VARCHAR

String

BOOLEAN

Boolean または Option[Boolean]

BINARY

Array[Byte]

VARIANT

String

表示されるタイプに応じて値をフォーマットします。 バリアントnull は、文字列「null」としてフォーマットされます。

ARRAY

Array[String]

OBJECT

Map[String, String]

DATETIMESTAMP には、 パラメーターと戻り値型の SQL-Javaデータ型のマッピング にリストされているJava型を使用します。

最上部に戻る