Scala-Beispiele für mit SQL erstellt gespeicherte Prozeduren

Verwenden von Snowpark-APIs zur asynchrone Verarbeitung

Die folgenden Beispiele zeigen, wie Sie Snowpark-APIs verwenden können, um asynchrone untergeordnete Jobs zu starten, und wie sich diese Jobs unter verschiedenen Bedingungen verhalten.

Im folgenden Beispiel führt die Prozedur asyncWait einen asynchronen untergeordneten Job aus, der 10 Sekunden wartet.

CREATE OR REPLACE PROCEDURE asyncWait()
  RETURNS VARCHAR
  LANGUAGE SCALA
  RUNTIME_VERSION = 2.12
  PACKAGES = ('com.snowflake:snowpark_2.12:latest')
  HANDLER = 'TestScalaSP.asyncBasic'
  AS
  $$
  import com.snowflake.snowpark._
  object TestScalaSP {
    def asyncBasic(session: com.snowflake.snowpark.Session): String = {
      val df = session.sql("select system$wait(10)")
      val asyncJob = df.async.collect()
      while(!asyncJob.isDone()) {
        Thread.sleep(1000)
      }
      "Done"
    }
  }
  $$;
Copy
CALL asyncWait();
Copy

Im folgenden Beispiel verwendet die Prozedur cancelJob SQL, um einen Job zu starten, der 10 Sekunden bis zur Fertigstellung benötigen würde. Der untergeordnete Job wird dann abgebrochen, bevor er beendet ist.

CREATE OR REPLACE PROCEDURE cancelJob()
  RETURNS VARCHAR
  LANGUAGE SCALA
  RUNTIME_VERSION = 2.12
  PACKAGES = ('com.snowflake:snowpark_2.12:latest')
  HANDLER = 'TestScalaSP.asyncBasic'
  AS
  $$
  import com.snowflake.snowpark._
  object TestScalaSP {
    def asyncBasic(session: com.snowflake.snowpark.Session): String = {
      val df = session.sql("select system$wait(10)")
      val asyncJob = df.async.collect()
      asyncJob.cancel()
      "Done"
    }
  }
  $$;
Copy

Im folgenden Beispiel führt die Prozedur checkStatus einen asynchronen untergeordneten Job aus, der 10 Sekunden wartet. Die Prozedur prüft dann den Status des Jobs, bevor dieser beendet ist, sodass die Prüfung False zurückgibt.

CREATE OR REPLACE PROCEDURE checkStatus()
  RETURNS VARCHAR
  LANGUAGE SCALA
  RUNTIME_VERSION = 2.12
  PACKAGES = ('com.snowflake:snowpark_2.12:latest')
  HANDLER = 'TestScalaSP.asyncBasic'
  AS
  $$
  import java.sql.ResultSet
  import net.snowflake.client.jdbc.{SnowflakeConnectionV1, SnowflakeResultSet, SnowflakeStatement}
  object TestScalaSP {
    def asyncBasic(session: com.snowflake.snowpark.Session): String = {
      val connection = session.jdbcConnection
      val stmt = connection.createStatement()
      val rs = stmt.asInstanceOf[SnowflakeStatement].executeAsyncQuery("CALL SYSTEM$WAIT(10)")
      val status = rs.asInstanceOf[SnowflakeResultSet].getStatus.toString
      s"""status:    ${status}"""
    }
  }
  $$;
Copy