Exemples en Scala pour les procédures stockées créées avec SQL¶
Utilisation des APIs Snowpark pour le traitement asynchrone¶
Les exemples suivants illustrent la manière dont vous pouvez utiliser des APIs Snowpark pour lancer des tâches enfant asynchrones, ainsi que le comportement de ces tâches dans différentes conditions.
Dans l’exemple suivant, la procédure asyncWait
exécute une tâche enfant asynchrone qui attend 10 secondes.
CREATE OR REPLACE PROCEDURE asyncWait()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import com.snowflake.snowpark._
object TestScalaSP {
def asyncBasic(session: com.snowflake.snowpark.Session): String = {
val df = session.sql("select system$wait(10)")
val asyncJob = df.async.collect()
while(!asyncJob.isDone()) {
Thread.sleep(1000)
}
"Done"
}
}
$$;
call asyncScalaTest();
Dans l’exemple suivant, la procédure cancelJob
utilise SQL pour démarrer une tâche qui prendrait 10 secondes pour se terminer. Elle annule ensuite la tâche enfant avant qu’elle ne soit terminée.
CREATE OR REPLACE PROCEDURE cancelJob()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import com.snowflake.snowpark._
object TestScalaSP {
def asyncBasic(session: com.snowflake.snowpark.Session): String = {
val df = session.sql("select system$wait(10)")
val asyncJob = df.async.collect()
asyncJob.cancel()
"Done"
}
}
$$;
Dans l’exemple suivant, la procédure checkStatus
exécute une tâche enfant asynchrone qui attend 10 secondes. La procédure vérifie ensuite le statut de la tâche avant qu’elle ne soit terminée, de sorte que la vérification renvoie False
.
CREATE OR REPLACE PROCEDURE checkStatus()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import java.sql.ResultSet
import net.snowflake.client.jdbc.{SnowflakeConnectionV1, SnowflakeResultSet, SnowflakeStatement}
object TestScalaSP {
def asyncBasic(session: com.snowflake.snowpark.Session): String = {
val connection = session.jdbcConnection
val stmt = connection.createStatement()
val rs = stmt.asInstanceOf[SnowflakeStatement].executeAsyncQuery("CALL SYSTEM$WAIT(10)")
val status = rs.asInstanceOf[SnowflakeResultSet].getStatus.toString
s"""status: ${status}"""
}
}
$$;