SQL で作成したストアドプロシージャの Scala サンプルです。¶
Snowpark APIs を使った非同期処理¶
以下の例では、Snowpark APIs を使用して非同期の子ジョブを開始する方法と、さまざまな条件下でのジョブの動作について説明します。
以下の例では、 asyncWait
プロシージャが10秒待機する非同期子ジョブを実行します。
CREATE OR REPLACE PROCEDURE asyncWait()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import com.snowflake.snowpark._
object TestScalaSP {
def asyncBasic(session: com.snowflake.snowpark.Session): String = {
val df = session.sql("select system$wait(10)")
val asyncJob = df.async.collect()
while(!asyncJob.isDone()) {
Thread.sleep(1000)
}
"Done"
}
}
$$;
call asyncScalaTest();
次の例では、 cancelJob
プロシージャは、SQL を使用して、終了までに10秒かかるジョブを開始します。そして、終了する前に子ジョブをキャンセルします。
CREATE OR REPLACE PROCEDURE cancelJob()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import com.snowflake.snowpark._
object TestScalaSP {
def asyncBasic(session: com.snowflake.snowpark.Session): String = {
val df = session.sql("select system$wait(10)")
val asyncJob = df.async.collect()
asyncJob.cancel()
"Done"
}
}
$$;
以下の例では、 checkStatus
プロシージャが10秒待機する非同期子ジョブを実行します。このプロシージャは、ジョブが終了する前にステータスをチェックします。したがって、チェックは False
を返します。
CREATE OR REPLACE PROCEDURE checkStatus()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import java.sql.ResultSet
import net.snowflake.client.jdbc.{SnowflakeConnectionV1, SnowflakeResultSet, SnowflakeStatement}
object TestScalaSP {
def asyncBasic(session: com.snowflake.snowpark.Session): String = {
val connection = session.jdbcConnection
val stmt = connection.createStatement()
val rs = stmt.asInstanceOf[SnowflakeStatement].executeAsyncQuery("CALL SYSTEM$WAIT(10)")
val status = rs.asInstanceOf[SnowflakeResultSet].getStatus.toString
s"""status: ${status}"""
}
}
$$;