Como usar APIs Snowpark para processamento assíncrono
Os exemplos a seguir ilustram como você pode usar as APIs Snowpark para iniciar trabalhos filho assíncronos, além de como esses trabalhos se comportam sob diferentes condições.
No exemplo a seguir, o procedimento asyncWait executa um trabalho filho assíncrono que aguarda 10 segundos.
Scala 2.12 Scala 2.13 (Preview)
CREATE OR REPLACE PROCEDURE asyncWait ()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2 . 12
PACKAGES = ( 'com.snowflake:snowpark_2.12:latest' )
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import com . snowflake . snowpark . _
object TestScalaSP {
def asyncBasic ( session : com . snowflake . snowpark . Session ): String = {
val df = session . sql ( "select system$wait(10)" )
val asyncJob = df . async . collect ()
while ( ! asyncJob . isDone ()) {
Thread . sleep ( 1000 )
}
"Done"
}
}
$$;
CREATE OR REPLACE PROCEDURE asyncWait ()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2 . 13
PACKAGES = ( 'com.snowflake:snowpark_2.13:latest' )
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import com . snowflake . snowpark . _
object TestScalaSP {
def asyncBasic ( session : com . snowflake . snowpark . Session ): String = {
val df = session . sql ( "select system$wait(10)" )
val asyncJob = df . async . collect ()
while ( ! asyncJob . isDone ()) {
Thread . sleep ( 1000 )
}
"Done"
}
}
$$;
No exemplo a seguir, o procedimento cancelJob usa o SQL para iniciar um trabalho que levaria 10 segundos para terminar. Em seguida, ele cancela o trabalho filho antes que ele termine.
Scala 2.12 Scala 2.13 (Preview)
CREATE OR REPLACE PROCEDURE cancelJob ()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2 . 12
PACKAGES = ( 'com.snowflake:snowpark_2.12:latest' )
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import com . snowflake . snowpark . _
object TestScalaSP {
def asyncBasic ( session : com . snowflake . snowpark . Session ): String = {
val df = session . sql ( "select system$wait(10)" )
val asyncJob = df . async . collect ()
asyncJob . cancel ()
"Done"
}
}
$$;
CREATE OR REPLACE PROCEDURE cancelJob ()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2 . 13
PACKAGES = ( 'com.snowflake:snowpark_2.13:latest' )
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import com . snowflake . snowpark . _
object TestScalaSP {
def asyncBasic ( session : com . snowflake . snowpark . Session ): String = {
val df = session . sql ( "select system$wait(10)" )
val asyncJob = df . async . collect ()
asyncJob . cancel ()
"Done"
}
}
$$;
No exemplo a seguir, o procedimento checkStatus executa um trabalho filho assíncrono que aguarda 10 segundos. O procedimento então verifica o status do trabalho antes de terminar, então a verificação retorna False .
Scala 2.12 Scala 2.13 (Preview)
CREATE OR REPLACE PROCEDURE checkStatus ()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2 . 12
PACKAGES = ( 'com.snowflake:snowpark_2.12:latest' )
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import java . sql . ResultSet
import net . snowflake . client . jdbc .{ SnowflakeConnectionV1 , SnowflakeResultSet , SnowflakeStatement }
object TestScalaSP {
def asyncBasic ( session : com . snowflake . snowpark . Session ): String = {
val connection = session . jdbcConnection
val stmt = connection . createStatement ()
val rs = stmt . asInstanceOf [ SnowflakeStatement ]. executeAsyncQuery ( "CALL SYSTEM$WAIT(10)" )
val status = rs . asInstanceOf [ SnowflakeResultSet ]. getStatus . toString
s"""status: ${ status } """
}
}
$$;
CREATE OR REPLACE PROCEDURE checkStatus ()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2 . 13
PACKAGES = ( 'com.snowflake:snowpark_2.13:latest' )
HANDLER = 'TestScalaSP.asyncBasic'
AS
$$
import java . sql . ResultSet
import net . snowflake . client . jdbc .{ SnowflakeConnectionV1 , SnowflakeResultSet , SnowflakeStatement }
object TestScalaSP {
def asyncBasic ( session : com . snowflake . snowpark . Session ): String = {
val connection = session . jdbcConnection
val stmt = connection . createStatement ()
val rs = stmt . asInstanceOf [ SnowflakeStatement ]. executeAsyncQuery ( "CALL SYSTEM$WAIT(10)" )
val status = rs . asInstanceOf [ SnowflakeResultSet ]. getStatus . toString
s"""status: ${ status } """
}
}
$$;