SQL で作成したストアドプロシージャの Scala サンプルです。

Snowpark APIs を使った非同期処理

以下の例では、Snowpark APIs を使用して非同期の子ジョブを開始する方法と、さまざまな条件下でのジョブの動作について説明します。

以下の例では、 asyncWait プロシージャが10秒待機する非同期子ジョブを実行します。

CREATE OR REPLACE PROCEDURE asyncWait()
  RETURNS VARCHAR
  LANGUAGE SCALA
  RUNTIME_VERSION = 2.12
  PACKAGES = ('com.snowflake:snowpark_2.12:latest')
  HANDLER = 'TestScalaSP.asyncBasic'
  AS
  $$
  import com.snowflake.snowpark._
  object TestScalaSP {
    def asyncBasic(session: com.snowflake.snowpark.Session): String = {
      val df = session.sql("select system$wait(10)")
      val asyncJob = df.async.collect()
      while(!asyncJob.isDone()) {
        Thread.sleep(1000)
      }
      "Done"
    }
  }
  $$;
Copy
CALL asyncWait();
Copy

次の例では、 cancelJob プロシージャは、SQL を使用して、終了までに10秒かかるジョブを開始します。そして、終了する前に子ジョブをキャンセルします。

CREATE OR REPLACE PROCEDURE cancelJob()
  RETURNS VARCHAR
  LANGUAGE SCALA
  RUNTIME_VERSION = 2.12
  PACKAGES = ('com.snowflake:snowpark_2.12:latest')
  HANDLER = 'TestScalaSP.asyncBasic'
  AS
  $$
  import com.snowflake.snowpark._
  object TestScalaSP {
    def asyncBasic(session: com.snowflake.snowpark.Session): String = {
      val df = session.sql("select system$wait(10)")
      val asyncJob = df.async.collect()
      asyncJob.cancel()
      "Done"
    }
  }
  $$;
Copy

以下の例では、 checkStatus プロシージャが10秒待機する非同期子ジョブを実行します。このプロシージャは、ジョブが終了する前にステータスをチェックします。したがって、チェックは False を返します。

CREATE OR REPLACE PROCEDURE checkStatus()
  RETURNS VARCHAR
  LANGUAGE SCALA
  RUNTIME_VERSION = 2.12
  PACKAGES = ('com.snowflake:snowpark_2.12:latest')
  HANDLER = 'TestScalaSP.asyncBasic'
  AS
  $$
  import java.sql.ResultSet
  import net.snowflake.client.jdbc.{SnowflakeConnectionV1, SnowflakeResultSet, SnowflakeStatement}
  object TestScalaSP {
    def asyncBasic(session: com.snowflake.snowpark.Session): String = {
      val connection = session.jdbcConnection
      val stmt = connection.createStatement()
      val rs = stmt.asInstanceOf[SnowflakeStatement].executeAsyncQuery("CALL SYSTEM$WAIT(10)")
      val status = rs.asInstanceOf[SnowflakeResultSet].getStatus.toString
      s"""status:    ${status}"""
    }
  }
  $$;
Copy