PythonでSnowflake関数とストアドプロシージャを管理する¶
Pythonを使用して、Snowflakeでユーザー定義関数(UDFs)とストアドプロシージャを管理することができます。UDF またはプロシージャを記述する場合は、サポートされているハンドラー言語のいずれかでそのロジックを記述し、 Snowflake Python APIs を使用して作成します。UDFs とプロシージャの詳細については、 関数とプロシージャによるSnowflakeの拡張 をご参照ください。
前提条件¶
このトピックの例では、Snowflakeと接続するコードを追加して Root
オブジェクトを作成し、そこからSnowflake Python Snowflake Python APIs を使用することを想定しています。
たとえば、以下のコードでは、構成ファイルで定義された接続パラメーターを使用してSnowflakeへの接続を作成します。
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
出来上がった Session
オブジェクトを使って、コードは API のタイプとメソッドを使う Root
オブジェクトを作成します。詳細については、 Snowflake Python APIs によるSnowflakeへの接続 をご参照ください。
ユーザー定義関数の管理(UDFs)¶
ユーザー定義関数(UDFs)を記述して、Snowflakeが提供する組み込みのシステム定義関数では利用できない操作を実行するようにシステムを拡張できます。UDF を作成すると、何度でも再利用できます。詳細については、 ユーザー定義関数の概要 をご参照ください。
注釈
API を使った UDFs の呼び出しは現在サポートされていません。
Snowflake Python APIs は、 UDFs を2つの別々のタイプで表します。
UserDefinedFunction
: 名前、引数のリスト、戻り値の型、関数の定義など、 UDF のプロパティを公開します。UserDefinedFunctionResource
: 対応するUserDefinedFunction
オブジェクトを取得し、 UDF の名前を変更し、 UDF をドロップするために使用できるメソッドを公開します。
UDF の作成¶
UDF を作成するには、まず UserDefinedFunction
オブジェクトを作成して、 API Root
オブジェクトから UserDefinedFunctionCollection
オブジェクトを作成します。 UserDefinedFunctionCollection.create
を使って、新しい UDF をSnowflakeに追加します。
UDF を作成するときは、次のサポートされている言語の1つでコードが記述されているハンドラーを指定します。
Python¶
次の例のコードは、 my_db
データベースと my_schema
スキーマの my_python_function
という UDF を表す UserDefinedFunction
オブジェクトを、指定された引数、戻り値型、言語、 UDF Python 定義で作成します。
from snowflake.core.user_defined_function import (
PythonFunction,
ReturnDataType,
UserDefinedFunction
)
function_of_python = UserDefinedFunction(
"my_python_function",
arguments=[],
return_type=ReturnDataType(datatype="VARIANT"),
language_config=PythonFunction(runtime_version="3.9", packages=[], handler="udf"),
body="""
def udf():
return {"key": "value"}
""",
)
root.databases["my_db"].schemas["my_schema"].user_defined_functions.create(function_of_python)
Java¶
次の例のコードは、 my_db
データベースと my_schema
スキーマの my_java_function
という名前の UDF を表す UserDefinedFunction
オブジェクトを、指定された引数、戻り値型、言語、 UDF Java 定義で作成します。
from snowflake.core.user_defined_function import (
Argument,
JavaFunction,
ReturnDataType,
UserDefinedFunction
)
function_body = """
class TestFunc {
public static String echoVarchar(String x) {
return x;
}
}
"""
function_of_java = UserDefinedFunction(
name="my_java_function",
arguments=[Argument(name="x", datatype="STRING")],
return_type=ReturnDataType(datatype="VARCHAR", nullable=True),
language_config=JavaFunction(
handler="TestFunc.echoVarchar",
runtime_version="11",
target_path=target_path,
packages=[],
called_on_null_input=True,
is_volatile=True,
),
body=function_body,
comment="test_comment",
)
root.databases["my_db"].schemas["my_schema"].user_defined_functions.create(function_of_java)
JavaScript¶
次の例のコードは、 my_db
データベースと my_schema
スキーマの my_javascript_function
という名前の UDF を表す UserDefinedFunction
オブジェクトを、指定された引数、戻り値型、言語、 UDF JavaScript 定義で作成します。
from snowflake.core.user_defined_function import (
Argument,
ReturnDataType,
JavaScriptFunction,
UserDefinedFunction
)
function_body = """
if (D <= 0) {
return 1;
} else {
var result = 1;
for (var i = 2; i <= D; i++) {
result = result * i;
}
return result;
}
"""
function_of_javascript = UserDefinedFunction(
name="my_javascript_function",
arguments=[Argument(name="d", datatype="DOUBLE")],
return_type=ReturnDataType(datatype="DOUBLE"),
language_config=JavaScriptFunction(),
body=function_body,
)
root.databases["my_db"].schemas["my_schema"].user_defined_functions.create(function_of_javascript)
Scala¶
次の例のコードは、 my_db
データベースと my_schema
スキーマの my_scala_function
という UDF を表す UserDefinedFunction
オブジェクトを、指定された引数、戻り値型、言語、 UDF Scala 定義で作成します。
from snowflake.core.user_defined_function import (
Argument,
ReturnDataType,
ScalaFunction,
UserDefinedFunction
)
function_body = """
class Echo {
def echoVarchar(x : String): String = {
return x
}
}
"""
function_of_scala = UserDefinedFunction(
name="my_scala_function",
arguments=[Argument(name="x", datatype="VARCHAR")],
return_type=ReturnDataType(datatype="VARCHAR"),
language_config=ScalaFunction(
runtime_version="2.12", handler="Echo.echoVarchar", target_path=target_path, packages=[]
),
body=function_body,
comment="test_comment",
)
root.databases["my_db"].schemas["my_schema"].user_defined_functions.create(function_of_scala)
SQL¶
次の例のコードは、 my_db
データベースと my_schema
スキーマの my_sql_function
という名前の UDF を表す UserDefinedFunction
オブジェクトを、指定された引数、戻り値型、言語、 UDF SQL 定義で作成します。
from snowflake.core.user_defined_function import (
ReturnDataType,
SQLFunction,
UserDefinedFunction
)
function_body = """3.141592654::FLOAT"""
function_of_sql = UserDefinedFunction(
name="my_sql_function",
arguments=[],
return_type=ReturnDataType(datatype="FLOAT"),
language_config=SQLFunction(),
body=function_body,
)
root.databases["my_db"].schemas["my_schema"].user_defined_functions.create(function_of_sql)
UDF の詳細の取得¶
UserDefinedFunction
オブジェクトを返す UserDefinedFunctionResource.fetch
メソッドを呼び出すことで、 UDF に関する情報を取得できます。
次の例のコードは、 my_db
データベースの my_javascript_function(DOUBLE)
UDF と my_schema
スキーマの情報を取得します。
注釈
UDF リソースオブジェクトを取得する場合、 UDFs がオーバーロードされる可能性があるため、完全なシグネチャ(UDF の名前とそのパラメーターのデータ型)を指定する必要があります。
my_udf = root.databases["my_db"].schemas["my_schema"].user_defined_functions["my_javascript_function(DOUBLE)"].fetch()
print(my_udf.to_dict())
UDFs のリスト¶
UserDefinedFunction
オブジェクトの PagedIter
反復子を返す UserDefinedFunctionCollection.iter
メソッドを使用して、 UDFs を一覧表示することができます。
次の例のコードは、 my_db
データベースと my_schema
スキーマの my_java
で始まる名前を持つ UDFs をリストアップし、それぞれの名前を表示します。
udf_iter = root.databases["my_db"].schemas["my_schema"].user_defined_functions.iter(like="my_java%")
for udf_obj in udf_iter:
print(udf_obj.name)
UDF の名前の変更¶
UserDefinedFunctionResource
オブジェクトで UDF の名前を変更できます。
次の例のコードは、 my_db
データベースと my_schema
スキーマにある my_javascript_function(DOUBLE)
UDF リソース・オブジェクトを取得し、 UDF の名前を my_other_js_function
に変更すると同時に、 my_other_db
データベースと my_other_schema
スキーマに移動します。
root.databases["my_db"].schemas["my_schema"].user_defined_functions["my_javascript_function(DOUBLE)"].rename(
"my_other_js_function",
target_database = "my_other_database",
target_schema = "my_other_schema"
)
UDF のドロップ¶
UserDefinedFunctionResource
オブジェクトで UDF をドロップできます。
次の例のコードは、 my_javascript_function(DOUBLE)
UDF リソース・オブジェクトを取得し、 UDF をドロップします。
my_udf_res = root.databases["my_db"].schemas["my_schema"].user_defined_functions["my_javascript_function(DOUBLE)"]
my_udf_res.drop()
ストアドプロシージャの管理¶
ストアドプロシージャを記述して、 SQL を実行するプロシージャ型コードでシステムを拡張できます。ストアドプロシージャでは、プログラムによる構成を使用して、分岐とループを実行できます。ストアドプロシージャを作成すると、何度でも再利用できます。詳細については、 ストアドプロシージャの概要 をご参照ください。
Snowflake Python APIs は、プロシージャを2つの別々のタイプで表します。
Procedure
: プロシージャの名前、引数のリスト、戻り値の型、プロシージャの定義などのプロシージャのプロパティを公開します。ProcedureResource
: 対応するProcedure
オブジェクトの取得、プロシージャの呼び出し、プロシージャのドロップに使用できるメソッドを公開します。
ストアドプロシージャの作成¶
プロシージャを作成するには、まず Procedure
オブジェクトを作成して、 API Root
オブジェクトから ProcedureCollection
オブジェクトを作成します。 ProcedureCollection.create
を使って、新しいプロシージャをSnowflakeに追加します。
次の例のコードは、 my_db
データベースと my_schema
スキーマの my_procedure
という名前のプロシージャを表す Procedure
オブジェクトを、指定された引数、戻り値型、 SQL プロシージャ定義で作成します。
from snowflake.core.procedure import Argument, ColumnType, Procedure, ReturnTable, SQLFunction
procedure = Procedure(
name="my_procedure",
arguments=[Argument(name="id", datatype="VARCHAR")],
return_type=ReturnTable(
column_list=[
ColumnType(name="id", datatype="NUMBER),
ColumnType(name="price", datatype="NUMBER"),
]
),
language_config=SQLFunction(),
body="
DECLARE
res RESULTSET DEFAULT (SELECT * FROM invoices WHERE id = :id);
BEGIN
RETURN TABLE(res);
END;
",
)
procedures = root.databases["my_db"].schemas["my_schema"].procedures
procedures.create(procedure)
プロシージャの呼び出し¶
ProcedureResource
オブジェクトを使ってプロシージャを呼び出すことができます。
次の例のコードは、 my_procedure(NUMBER, NUMBER)
プロシージャ・リソース・オブジェクトを取得し、 CallArgumentList
オブジェクトを作成し、その引数リストを使用してプロシージャを呼び出します。
注釈
プロシージャ・リソース・オブジェクトを取得する場合、プロシージャはオーバーロードされる可能性があるため、完全なシグネチャ(プロシージャ名とそのパラメーターのデータ型)を指定する必要があります。
from snowflake.core.procedure import CallArgument, CallArgumentList
procedure_reference = root.databases["my_db"].schemas["my_schema"].procedures["my_procedure(NUMBER, NUMBER)"]
call_argument_list = CallArgumentList(call_arguments=[
CallArgument(name="id", datatype="NUMBER", value=1),
])
procedure_reference.call(call_argument_list)
プロシージャの詳細の取得¶
Procedure
オブジェクトを返す ProcedureResource.fetch
メソッドを呼び出すことで、プロシージャに関する情報を取得できます。
次の例のコードは、 my_db
データベースと my_schema
スキーマの my_procedure(NUMBER, NUMBER)
プロシージャに関する情報を取得します。
my_procedure = root.databases["my_db"].schemas["my_schema"].procedures["my_procedure(NUMBER, NUMBER)"].fetch()
print(my_procedure.to_dict())
プロシージャのリスト¶
Procedure
オブジェクトの PagedIter
反復子を返す ProcedureCollection.iter
メソッドを使用して、プロシージャを一覧表示することができます。
次の例のコードは、 my_db
データベースと my_schema
スキーマの my
で始まる名前のプロシージャをリストアップし、それぞれの名前を表示します。
procedure_iter = root.databases["my_db"].schemas["my_schema"].procedures.iter(like="my%")
for procedure_obj in procedure_iter:
print(procedure_obj.name)
プロシージャのドロップ¶
ProcedureResource
オブジェクトでプロシージャをドロップできます。
次の例のコードは、 my_db
データベースと my_schema
スキーマの my_procedure(NUMBER, NUMBER)
プロシージャ・リソース・オブジェクトを取得し、プロシージャをドロップします。
my_procedure_res = root.databases["my_db"].schemas["my_schema"].procedures["my_procedure(NUMBER, NUMBER)"]
my_procedure_res.drop()