Scala-Beispiele für mit SQL erstellt gespeicherte Prozeduren¶
Verwenden von Snowpark-APIs zur asynchrone Verarbeitung¶
Die folgenden Beispiele zeigen, wie Sie Snowpark-APIs verwenden können, um asynchrone untergeordnete Jobs zu starten, und wie sich diese Jobs unter verschiedenen Bedingungen verhalten.
Im folgenden Beispiel führt die Prozedur asyncWait
einen asynchronen untergeordneten Job aus, der 10 Sekunden wartet.
CREATE OR REPLACE PROCEDURE asyncWait()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import com.snowflake.snowpark._
object TestScalaSP {
def asyncBasic(session: com.snowflake.snowpark.Session): String = {
val df = session.sql("select system$wait(10)")
val asyncJob = df.async.collect()
while(!asyncJob.isDone()) {
Thread.sleep(1000)
}
"Done"
}
}
$$;
call asyncScalaTest();
Im folgenden Beispiel verwendet die Prozedur cancelJob
SQL, um einen Job zu starten, der 10 Sekunden bis zur Fertigstellung benötigen würde. Der untergeordnete Job wird dann abgebrochen, bevor er beendet ist.
CREATE OR REPLACE PROCEDURE cancelJob()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import com.snowflake.snowpark._
object TestScalaSP {
def asyncBasic(session: com.snowflake.snowpark.Session): String = {
val df = session.sql("select system$wait(10)")
val asyncJob = df.async.collect()
asyncJob.cancel()
"Done"
}
}
$$;
Im folgenden Beispiel führt die Prozedur checkStatus
einen asynchronen untergeordneten Job aus, der 10 Sekunden wartet. Die Prozedur prüft dann den Status des Jobs, bevor dieser beendet ist, sodass die Prüfung False
zurückgibt.
CREATE OR REPLACE PROCEDURE checkStatus()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import java.sql.ResultSet
import net.snowflake.client.jdbc.{SnowflakeConnectionV1, SnowflakeResultSet, SnowflakeStatement}
object TestScalaSP {
def asyncBasic(session: com.snowflake.snowpark.Session): String = {
val connection = session.jdbcConnection
val stmt = connection.createStatement()
val rs = stmt.asInstanceOf[SnowflakeStatement].executeAsyncQuery("CALL SYSTEM$WAIT(10)")
val status = rs.asInstanceOf[SnowflakeResultSet].getStatus.toString
s"""status: ${status}"""
}
}
$$;