クイックリファレンス: SQL コマンド用Snowpark Scala APIs¶
このトピックでは、 SQL コマンドに対応するSnowpark APIs のいくつかのクイックリファレンスを提供します。
(SQL コマンドに対応する APIs の完全なリストではないことに注意してください。)
このトピックの内容:
クエリの実行¶
コラムの選択¶
特定の列を選択するには、 DataFrame.select を使用します。
SQL ステートメントの例 |
Snowparkコードの例 |
---|---|
SELECT id, name FROM sample_product_data;
|
val dfSelectedCols = df.select(col("id"), col("name"))
dfSelectedCols.show()
|
列の名前の変更¶
列の名前を変更するには、 Column.as、 Column.alias、または Column.name を使用します。
SQL ステートメントの例 |
Snowparkコードの例 |
---|---|
SELECT id AS item_id FROM sample_product_data;
|
val dfRenamedCol = df.select(col("id").as("item_id"))
dfRenamedCol.show()
|
val dfRenamedCol = df.select(col("id").alias("item_id"))
dfRenamedCol.show()
|
|
val dfRenamedCol = df.select(col("id").name("item_id"))
dfRenamedCol.show()
|
データのフィルタリング¶
データをフィルターするには、 DataFrame.filter または DataFrame.where を使用します。
SQL ステートメントの例 |
Snowparkコードの例 |
---|---|
SELECT * FROM sample_product_data WHERE id = 1;
|
val dfFilteredRows = df.filter((col("id") === 1))
dfFilteredRows.show()
|
val dfFilteredRows = df.where((col("id") === 1))
dfFilteredRows.show()
|
データの並べ替え¶
データを並べ替えるには、 DataFrame.sort を使用します。
SQL ステートメントの例 |
Snowparkコードの例 |
---|---|
SELECT * FROM sample_product_data ORDER BY category_id;
|
val dfSorted = df.sort(col("category_id"))
dfSorted.show()
|
返される行数の制限¶
返される行数を制限するには、 DataFrame.limit を使用します。 DataFrame の行数制限 をご参照ください。
SQL ステートメントの例 |
Snowparkコードの例 |
---|---|
SELECT * FROM sample_product_data
ORDER BY category_id LIMIT 2;
|
val dfSorted = df.sort(col("category_id")).limit(2);
val arrayRows = dfSorted.collect()
|
結合の実行¶
結合を実行するには、 DataFrame.join または DataFrame.naturalJoin を使用します。 DataFrames の結合 をご参照ください。
SQL ステートメントの例 |
Snowparkコードの例 |
---|---|
SELECT * FROM sample_a
INNER JOIN sample_b
on sample_a.id_a = sample_b.id_a;
|
val dfJoined =
dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"))
dfJoined.show()
|
SELECT * FROM sample_a NATURAL JOIN sample_b;
|
val dfJoined = dfLhs.naturalJoin(dfRhs)
dfJoined.show()
|
半構造化データのクエリ¶
半構造化データを走査するには、 Column.apply("<フィールド名>") および Column.apply(<インデックス>) を使用します。 半構造化データの操作 をご参照ください。
SQL ステートメントの例 |
Snowparkコードの例 |
---|---|
SELECT src:salesperson.name FROM car_sales;
|
dfJsonField =
df.select(col("src")("salesperson")("name"))
dfJsonField.show()
|
データのグループ化と集計¶
データをグループ化するには、 DataFrame.groupBy を使用します。これにより、集計を実行するために使用できる RelationalGroupedDataFrame オブジェクトが返されます。
SQL ステートメントの例 |
Snowparkコードの例 |
---|---|
SELECT category_id, count(*)
FROM sample_product_data GROUP BY category_id;
|
val dfCountPerCategory = df.groupBy(col("category")).count()
dfCountPerCategory.show()
|
ウィンドウ関数の呼び出し¶
ウィンドウ関数 を呼び出すには、 ウィンドウ オブジェクトメソッドを使用して WindowSpec オブジェクトを作成します。これは、ウィンドウ関数に使用できます(「<関数> OVER ... PARTITION BY ... ORDER BY」の使用に類似)。
SQL ステートメントの例 |
Snowparkコードの例 |
---|---|
SELECT category_id, price_date, SUM(amount) OVER
(PARTITION BY category_id ORDER BY price_date)
FROM prices ORDER BY price_date;
|
val window = Window.partitionBy(
col("category")).orderBy(col("price_date"))
val dfCumulativePrices = dfPrices.select(
col("category"), col("price_date"),
sum(col("amount")).over(window)).sort(col("price_date"))
dfCumulativePrices.show()
|
行の更新、削除、およびマージ¶
テーブルの行を更新、削除、およびマージするには、 Updatable を使用します。 テーブル内の行の更新、削除、およびマージ をご参照ください。
SQL ステートメントの例 |
Snowparkコードの例 |
---|---|
UPDATE sample_product_data
SET serial_number = 'xyz' WHERE id = 12;
|
val updateResult =
updatableDf.update(
Map("serial_number" -> lit("xyz")),
col("id") === 12)
|
DELETE FROM sample_product_data
WHERE category_id = 50;
|
val deleteResult =
updatableDf.delete(updatableDf("category_id") === 50)
|
MERGE INTO target_table USING source_table
ON target_table.id = source_table.id
WHEN MATCHED THEN
UPDATE SET target_table.description =
source_table.description;
|
val mergeResult =
target.merge(source, target("id") === source("id"))
.whenMatched.update(Map("description" -> source("description")))
.collect()
|
ステージの操作¶
ステージの操作の詳細については、 ステージでのファイルの操作 をご参照ください。
ステージからのファイルのアップロードおよびダウンロード¶
ステージからファイルをアップロードおよびダウンロードするには、 FileOperation を使用します。 ステージでのファイルのアップロードとダウンロード をご参照ください。
SQL ステートメントの例 |
Snowparkコードの例 |
---|---|
PUT file:///tmp/*.csv @myStage OVERWRITE = TRUE;
|
val putOptions = Map("OVERWRITE" -> "TRUE")
val putResults = session.file.put(
"file:///tmp/*.csv", "@myStage", putOptions)
|
GET @myStage file:///tmp PATTERN = '.*.csv.gz';
|
val getOptions = Map("PATTERN" -> s"'.*.csv.gz'")
val getResults = session.file.get(
"@myStage", "file:///tmp", getOptions)
|
ステージ内にあるファイルからのデータの読み取り¶
ステージ内のファイルからデータを読み取るには、 DataFrameReader を使用してデータの DataFrame を作成します。 ステージ内におけるファイルの DataFrame の設定 をご参照ください。
SQL ステートメントの例 |
Snowparkコードの例 |
---|---|
CREATE FILE FORMAT snowpark_temp_format TYPE = JSON;
SELECT "$1"[0]['salesperson']['name'] FROM (
SELECT $1::VARIANT AS "$1" FROM @mystage/car_sales.json(
FILE_FORMAT => 'snowpark_temp_format')) LIMIT 10;
DROP FILE FORMAT snowpark_temp_format;
|
val df = session.read.json(
"@mystage/car_sales.json").select(
col("$1")(0)("salesperson")("name"))
df.show();
|
ステージ内のファイルからテーブルへのデータのコピー¶
ステージ内のファイルからテーブルにデータをコピーするには、 DataFrameReader を使用してデータの CopyableDataFrame を作成し、 CopyableDataFramecopyInto メソッドを使用してデータをテーブルにコピーします。 ファイルからテーブルへのデータのコピー をご参照ください。
SQL ステートメントの例 |
Snowparkコードの例 |
---|---|
COPY INTO new_car_sales
FROM @mystage/car_sales.json
FILE_FORMAT = (TYPE = JSON);
|
val dfCopyableDf = session.read.json("@mystage/car_sales.json")
dfCopyableDf.copyInto("new_car_sales")
|
ステージにあるファイルへの DataFrame の保存¶
ステージにあるファイルに DataFrame を保存するには、使用するファイルの形式にちなんで名付けられた DataFrameWriter メソッドを使用します。 ステージにあるファイルへの DataFrame の保存 をご参照ください。
SQL ステートメントの例 |
Snowparkコードの例 |
---|---|
COPY INTO @mystage/saved_data.json
FROM ( SELECT * FROM (car_sales) )
FILE_FORMAT = ( TYPE = JSON COMPRESSION = 'none' )
OVERWRITE = TRUE
DETAILED_OUTPUT = TRUE
|
val df = session.table("car_sales")
val writeFileResult = df.write.mode(
SaveMode.Overwrite).option(
"DETAILED_OUTPUT", "TRUE").option(
"compression", "none").json(
"@mystage/saved_data.json")
|
ユーザー定義関数の作成と呼び出し(UDFs)¶
UDF (匿名の UDF)として機能するScala関数を作成するには、 udf を使用します。
名前で呼び出すことができる一時的または永続的な UDF を作成するには、 UDFRegistration.registerTemporary または UDFRegistration.registerPermanent を使用します。
名前で永続的な UDF を呼び出すには、 callUDF を使用します。
詳細については、 Scalaでの DataFrames 用ユーザー定義関数(UDFs)の作成 と スカラーユーザー定義関数(UDFs)の呼び出し をご参照ください。
SQL ステートメントの例 |
Snowparkコードの例 |
---|---|
CREATE FUNCTION <temp_function_name>
RETURNS INT
LANGUAGE JAVA
...
AS
...;
SELECT ...,
<temp_function_name>(amount) AS doublenum
FROM sample_product_data;
|
val doubleUdf = udf((x: Int) => x + x)
val dfWithDoubleNum = df.withColumn(
"doubleNum", doubleUdf(col("amount")))
dfWithDoubleNum.show()
|
CREATE FUNCTION <temp_function_name>
RETURNS INT
LANGUAGE JAVA
...
AS
...;
SELECT ...,
<temp_function_name>(amount) AS doublenum
FROM sample_product_data;
|
session.udf.registerTemporary(
"doubleUdf", (x: Int) => x + x)
val dfWithDoubleNum = df.withColumn(
"doubleNum", callUDF("doubleUdf", (col("amount"))))
dfWithDoubleNum.show()
|
CREATE FUNCTION doubleUdf(arg1 INT)
RETURNS INT
LANGUAGE JAVA
...
AS
...;
SELECT ...,
doubleUdf(amount) AS doublenum
FROM sample_product_data;
|
session.udf.registerPermanent(
"doubleUdf", (x: Int) => x + x, "mystage")
val dfWithDoubleNum = df.withColumn(
"doubleNum", callUDF("doubleUdf", (col("amount"))))
dfWithDoubleNum.show()
|
ストアドプロシージャの作成と呼び出し¶
Snowparkを使用したストアドプロシージャの作成ガイドについては、 Scalaにおける DataFrames のストアドプロシージャの作成 をご参照ください。
匿名または名前付きの仮プロシージャを作成するには、 com.snowflake.snowpark.SProcRegistration の
registerTemporary
メソッドを使用します。名前付き永続プロシージャを作成するには、 com.snowflake.snowpark.SProcRegistration クラスの
registerPermanent
メソッドを使用します。ストアドプロシージャを呼び出すには、 com.snowflake.snowpark.Session クラスの
storedProcedure
メソッドを使用します。
SQL ステートメントの例 |
Snowparkコードの例 |
---|---|
CREATE PROCEDURE <temp_procedure_name>(x INTEGER, y INTEGER)
RETURNS INTEGER
LANGUAGE JAVA
...
AS
$$
BEGIN
RETURN x + y;
END
$$
;
CALL <temp_procedure_name>(2, 3);
|
StoredProcedure sp =
session.sproc().registerTemporary((Session session, Integer x, Integer y) -> x + y,
new DataType[] {DataTypes.IntegerType, DataTypes.IntegerType},
DataTypes.IntegerType);
session.storedProcedure(sp, 2, 3).show();
|
CREATE PROCEDURE sproc(x INTEGER, y INTEGER)
RETURNS INTEGER
LANGUAGE JAVA
...
AS
$$
BEGIN
RETURN x + y;
END
$$
;
CALL sproc(2, 3);
|
String name = "sproc";
StoredProcedure sp =
session.sproc().registerTemporary(name,
(Session session, Integer x, Integer y) -> x + y,
new DataType[] {DataTypes.IntegerType, DataTypes.IntegerType},
DataTypes.IntegerType);
session.storedProcedure(name, 2, 3).show();
|
CREATE PROCEDURE add_hundred(x INTEGER)
RETURNS INTEGER
LANGUAGE JAVA
...
AS
$$
BEGIN
RETURN x + 100;
END
$$
;
CALL add_hundred(3);
|
val name: String = "add_hundred"
val stageName: String = "sproc_libs"
val sp: StoredProcedure =
session.sproc.registerPermanent(
name,
(session: Session, x: Int) => x + 100,
stageName,
true
)
session.storedProcedure(name, 3).show
|