Scala-Beispiele für mit SQL erstellt gespeicherte Prozeduren

Verwenden von Snowpark-APIs zur asynchrone Verarbeitung

Die folgenden Beispiele zeigen, wie Sie Snowpark-APIs verwenden können, um asynchrone untergeordnete Jobs zu starten, und wie sich diese Jobs unter verschiedenen Bedingungen verhalten.

Im folgenden Beispiel führt die Prozedur asyncWait einen asynchronen untergeordneten Job aus, der 10 Sekunden wartet.

CREATE OR REPLACE PROCEDURE asyncWait()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import com.snowflake.snowpark._
object TestScalaSP {
  def asyncBasic(session: com.snowflake.snowpark.Session): String = {
    val df = session.sql("select system$wait(10)")
    val asyncJob = df.async.collect()
    while(!asyncJob.isDone()) {
      Thread.sleep(1000)
    }
    "Done"
  }
}
$$;

call asyncScalaTest();
Copy

Im folgenden Beispiel verwendet die Prozedur cancelJob SQL, um einen Job zu starten, der 10 Sekunden bis zur Fertigstellung benötigen würde. Der untergeordnete Job wird dann abgebrochen, bevor er beendet ist.

CREATE OR REPLACE PROCEDURE cancelJob()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import com.snowflake.snowpark._
object TestScalaSP {
  def asyncBasic(session: com.snowflake.snowpark.Session): String = {
    val df = session.sql("select system$wait(10)")
    val asyncJob = df.async.collect()
    asyncJob.cancel()
    "Done"
  }
}
$$;
Copy

Im folgenden Beispiel führt die Prozedur checkStatus einen asynchronen untergeordneten Job aus, der 10 Sekunden wartet. Die Prozedur prüft dann den Status des Jobs, bevor dieser beendet ist, sodass die Prüfung False zurückgibt.

CREATE OR REPLACE PROCEDURE checkStatus()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
PACKAGES = ('com.snowflake:snowpark:latest')
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import java.sql.ResultSet
import net.snowflake.client.jdbc.{SnowflakeConnectionV1, SnowflakeResultSet, SnowflakeStatement}
object TestScalaSP {
  def asyncBasic(session: com.snowflake.snowpark.Session): String = {
    val connection = session.jdbcConnection
    val stmt = connection.createStatement()
    val rs = stmt.asInstanceOf[SnowflakeStatement].executeAsyncQuery("CALL SYSTEM$WAIT(10)")
    val status = rs.asInstanceOf[SnowflakeResultSet].getStatus.toString
    s"""status:    ${status}"""
  }
}
$$;
Copy