SQL で作成したストアドプロシージャの Scala サンプルです。

Snowpark APIs を使った非同期処理

以下の例では、Snowpark APIs を使用して非同期の子ジョブを開始する方法と、さまざまな条件下でのジョブの動作について説明します。

以下の例では、 asyncWait プロシージャが10秒待機する非同期子ジョブを実行します。

CREATE OR REPLACE PROCEDURE asyncWait()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import com.snowflake.snowpark._
object TestScalaSP {
  def asyncBasic(session: com.snowflake.snowpark.Session): String = {
    val df = session.sql("select system$wait(10)")
    val asyncJob = df.async.collect()
    while(!asyncJob.isDone()) {
      Thread.sleep(1000)
    }
    "Done"
  }
}
$$;

call asyncScalaTest();
Copy

次の例では、 cancelJob プロシージャは、SQL を使用して、終了までに10秒かかるジョブを開始します。そして、終了する前に子ジョブをキャンセルします。

CREATE OR REPLACE PROCEDURE cancelJob()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import com.snowflake.snowpark._
object TestScalaSP {
  def asyncBasic(session: com.snowflake.snowpark.Session): String = {
    val df = session.sql("select system$wait(10)")
    val asyncJob = df.async.collect()
    asyncJob.cancel()
    "Done"
  }
}
$$;
Copy

以下の例では、 checkStatus プロシージャが10秒待機する非同期子ジョブを実行します。このプロシージャは、ジョブが終了する前にステータスをチェックします。したがって、チェックは False を返します。

CREATE OR REPLACE PROCEDURE checkStatus()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import java.sql.ResultSet
import net.snowflake.client.jdbc.{SnowflakeConnectionV1, SnowflakeResultSet, SnowflakeStatement}
object TestScalaSP {
  def asyncBasic(session: com.snowflake.snowpark.Session): String = {
    val connection = session.jdbcConnection
    val stmt = connection.createStatement()
    val rs = stmt.asInstanceOf[SnowflakeStatement].executeAsyncQuery("CALL SYSTEM$WAIT(10)")
    val status = rs.asInstanceOf[SnowflakeResultSet].getStatus.toString
    s"""status:    ${status}"""
  }
}
$$;
Copy