クイックリファレンス: SQL コマンド用Snowpark Java APIs

このトピックでは、 SQL コマンドに対応するSnowpark APIs のいくつかのクイックリファレンスを提供します。

(SQL コマンドに対応する APIs の完全なリストではないことに注意してください。)

このトピックの内容:

クエリの実行

コラムの選択

特定の列を選択するには、 select を使用します。

SQL ステートメントの例

Snowparkコードの例

SELECT id, name FROM sample_product_data;
Copy
DataFrame df = session.table("sample_product_data");

DataFrame dfSelectedCols = df.select(Functions.col("id"), Functions.col("name"));

dfSelectedCols.show();
Copy

列の名前の変更

列の名前を変更するには、 as または alias を使用します。

SQL ステートメントの例

Snowparkコードの例

SELECT id AS item_id FROM sample_product_data;
Copy
DataFrame df = session.table("sample_product_data");

DataFrame dfRenamedCol = df.select(Functions.col("id").as("item_id"));

dfRenamedCol.show();
Copy
DataFrame df = session.table("sample_product_data");

DataFrame dfRenamedCol = df.select(Functions.col("id").alias("item_id"));

dfRenamedCol.show();
Copy

データのフィルタリング

データをフィルターするには、 filter または where を使用します。

SQL ステートメントの例

Snowparkコードの例

SELECT * FROM sample_product_data WHERE id = 1;
Copy
DataFrame df = session.table("sample_product_data");

DataFrame dfFilteredRows = df.filter(Functions.col("id").equal_to(Functions.lit(1)));

dfFilteredRows.show();
Copy
DataFrame df = session.table("sample_product_data");

DataFrame dfFilteredRows = df.where(Functions.col("id").equal_to(Functions.lit(1)));

dfFilteredRows.show();
Copy

データの並べ替え

データを並べ替えるには、 sort を使用します。

SQL ステートメントの例

Snowparkコードの例

SELECT * FROM sample_product_data ORDER BY category_id;
Copy
DataFrame df = session.table("sample_product_data");

DataFrame dfSorted = df.sort(Functions.col("category_id"));

dfSorted.show();
Copy

返される行数の制限

返される行数を制限するには、 limit を使用します。 DataFrame の行数制限 をご参照ください。

SQL ステートメントの例

Snowparkコードの例

SELECT * FROM sample_product_data
  ORDER BY category_id LIMIT 2;
Copy
DataFrame df = session.table("sample_product_data");

DataFrame dfSorted = df.sort(Functions.col("category_id")).limit(2);

Row[] arrayRows = dfSorted.collect();
Copy

結合の実行

結合を実行するには、 join または naturalJoin を使用します。 DataFrames の結合 をご参照ください。

SQL ステートメントの例

Snowparkコードの例

SELECT * FROM sample_a
  INNER JOIN sample_b
  on sample_a.id_a = sample_b.id_a;
Copy
DataFrame dfLhs = session.table("sample_a");

DataFrame dfRhs = session.table("sample_b");

DataFrame dfJoined =
  dfLhs.join(dfRhs, dfLhs.col("id_a").equal_to(dfRhs.col("id_a")));

dfJoined.show();
Copy
SELECT * FROM sample_a NATURAL JOIN sample_b;
Copy
DataFrame dfLhs = session.table("sample_a");

DataFrame dfRhs = session.table("sample_b");

DataFrame dfJoined = dfLhs.naturalJoin(dfRhs);

dfJoined.show();
Copy

半構造化データのクエリ

半構造化データを走査するには、 subField("<フィールド名>") および subField(<インデックス>) を使用します。 半構造化データの操作 をご参照ください。

SQL ステートメントの例

Snowparkコードの例

SELECT src:salesperson.name FROM car_sales;
Copy
DataFrame df = session.table("car_sales");

DataFrame dfJsonField =
  df.select(Functions.col("src").subField("salesperson").subField("name"));

dfJsonField.show();
Copy

データのグループ化と集計

データをグループ化するには、 groupBy を使用します。これにより、集計を実行するために使用できる RelationalGroupedDataFrame オブジェクトが返されます。

SQL ステートメントの例

Snowparkコードの例

SELECT category_id, count(*)
  FROM sample_product_data GROUP BY category_id;
Copy
DataFrame df = session.table("sample_product_data");

DataFrame dfCountPerCategory = df.groupBy(Functions.col("category_id")).count();

dfCountPerCategory.show();
Copy

ウィンドウ関数の呼び出し

ウィンドウ関数 を呼び出すには、 ウィンドウ オブジェクトメソッドを使用して WindowSpec オブジェクトを作成します。これは、ウィンドウ関数に使用できます(「<関数> OVER ... PARTITION BY ... ORDER BY」の使用に類似)。

SQL ステートメントの例

Snowparkコードの例

SELECT category_id, id, SUM(amount) OVER
  (PARTITION BY category_id ORDER BY product_date)
  FROM sample_product_data ORDER BY product_date;
Copy
WindowSpec window = Window.partitionBy(
  Functions.col("category_id")).orderBy(Functions.col("product_date"));

DataFrame df = session.table("sample_product_data");

DataFrame dfCumulativePrices = df.select(
  Functions.col("category_id"), Functions.col("product_date"),
  Functions.sum(Functions.col("amount")).over(window)).sort(Functions.col("product_date"));

dfCumulativePrices.show();
Copy

行の更新、削除、およびマージ

テーブルの行を更新、削除、およびマージするには、 Updatable を使用します。 テーブル内の行の更新、削除、およびマージ をご参照ください。

SQL ステートメントの例

Snowparkコードの例

UPDATE sample_product_data
  SET serial_number = 'xyz' WHERE id = 12;
Copy
import java.util.HashMap;
import java.util.Map;
...

Map<Column, Column> assignments = new HashMap<>();

assignments.put(Functions.col("serial_number"), Functions.lit("xyz"));

Updatable updatableDf = session.table("sample_product_data");

UpdateResult updateResult =
  updatableDf.update(
    assignments,
    Functions.col("id").equal_to(Functions.lit(12)));

System.out.println("Number of rows updated: " + updateResult.getRowsUpdated());
Copy
DELETE FROM sample_product_data
  WHERE category_id = 50;
Copy
Updatable updatableDf = session.table("sample_product_data");

DeleteResult deleteResult =
  updatableDf.delete(updatableDf.col("category_id").equal_to(Functions.lit(50)));

System.out.println("Number of rows deleted: " + deleteResult.getRowsDeleted());
Copy
MERGE  INTO target_table USING source_table
  ON target_table.id = source_table.id
  WHEN MATCHED THEN
    UPDATE SET target_table.description =
      source_table.description;
Copy
import java.util.HashMap;
import java.util.Map;

Map<String, Column> assignments = new HashMap<>();
assignments.put("description", source.col("description"));
MergeResult mergeResult =
   target.merge(source, target.col("id").equal_to(source.col("id")))
  .whenMatched.updateColumn(assignments)
  .collect();
Copy

ステージの操作

ステージの操作の詳細については、 ステージでのファイルの操作 をご参照ください。

ステージからのファイルのアップロードおよびダウンロード

ステージからファイルをアップロードおよびダウンロードするには、 FileOperation を使用します。 ステージでのファイルのアップロードとダウンロード をご参照ください。

SQL ステートメントの例

Snowparkコードの例

PUT file:///tmp/*.csv @myStage OVERWRITE = TRUE;
Copy
import java.util.HashMap;
import java.util.Map;
...
Map<String, String> putOptions = new HashMap<>();

putOptions.put("OVERWRITE", "TRUE");

PutResult[] putResults = session.file().put(
  "file:///tmp/*.csv", "@myStage", putOptions);

for (PutResult result : putResults) {
  System.out.println(result.getSourceFileName() + ": " + result.getStatus());
}
Copy
GET @myStage file:///tmp PATTERN = '.*.csv.gz';
Copy
import java.util.HashMap;
import java.util.Map;
...
Map<String, String> getOptions = new HashMap<>();

getOptions.put("PATTERN", "'.*.csv.gz'");

GetResult[] getResults = session.file().get(
 "@myStage", "file:///tmp", getOptions);

for (GetResult result : getResults) {
  System.out.println(result.getFileName() + ": " + result.getStatus());
}
Copy

ステージ内にあるファイルからのデータの読み取り

ステージ内のファイルからデータを読み取るには、 DataFrameReader を使用してデータの DataFrame を作成します。 ステージ内におけるファイルの DataFrame の設定 をご参照ください。

SQL ステートメントの例

Snowparkコードの例

CREATE FILE FORMAT snowpark_temp_format TYPE = JSON;

SELECT "$1"[0]['salesperson']['name'] FROM (
  SELECT $1::VARIANT AS "$1" FROM @mystage/car_sales.json(
    FILE_FORMAT => 'snowpark_temp_format')) LIMIT 10;

DROP FILE FORMAT snowpark_temp_format;
Copy
DataFrame df = session.read().json(
  "@mystage/car_sales.json").select(
    Functions.col("$1").subField(0).subField("salesperson").subField("name"));

df.show();
Copy

ステージ内のファイルからテーブルへのデータのコピー

ステージ内のファイルからテーブルにデータをコピーするには、 DataFrameReader を使用してデータの CopyableDataFrame を作成し、 copyInto メソッドを使用してデータをテーブルにコピーします。 ファイルからテーブルへのデータのコピー をご参照ください。

SQL ステートメントの例

Snowparkコードの例

COPY INTO new_car_sales
  FROM @mystage/car_sales.json
  FILE_FORMAT = (TYPE = JSON);
Copy
CopyableDataFrame dfCopyableDf = session.read().json("@mystage/car_sales.json");
dfCopyableDf.copyInto("new_car_sales");
Copy

ステージにあるファイルへの DataFrame の保存

ステージにあるファイルに DataFrame を保存するには、使用するファイルの形式にちなんで名付けられた DataFrameWriter メソッドを使用します。 ステージにあるファイルへの DataFrame の保存 をご参照ください。

SQL ステートメントの例

Snowparkコードの例

COPY INTO @mystage/saved_data.json
  FROM (  SELECT  *  FROM (car_sales) )
  FILE_FORMAT = ( TYPE = JSON COMPRESSION = 'none' )
  OVERWRITE = TRUE
  DETAILED_OUTPUT = TRUE
Copy
DataFrame df = session.table("car_sales");

WriteFileResult writeFileResult = df.write().mode(
  SaveMode.Overwrite).option(
  "DETAILED_OUTPUT", "TRUE").option(
  "compression", "none").json(
  "@mystage/saved_data.json");
Copy

ユーザー定義関数の作成と呼び出し(UDFs)

匿名の UDF を作成するには、 Functions.udf を使用します。

名前で呼び出すことができる一時的または永続的な UDF を作成するには、 UDFRegistration.registerTemporary または UDFRegistration.registerPermanent を使用します。

名前で永続的 UDF を呼び出すには、 Functions.callUDF を使用します。

詳細については、 Javaでの DataFrames 用ユーザー定義関数(UDFs)の作成スカラーユーザー定義関数(UDFs)の呼び出し をご参照ください。

SQL ステートメントの例

Snowparkコードの例

CREATE FUNCTION <temp_function_name>
  RETURNS INT
  LANGUAGE JAVA
  ...
  AS
  ...;

SELECT ...,
  <temp_function_name>(quantity) AS doublenum
  FROM sample_product_data;
Copy
UserDefinedFunction doubleUdf =
  Functions.udf(
    (Integer x) -> x + x,
    DataTypes.IntegerType,
    DataTypes.IntegerType);

DataFrame df = session.table("sample_product_data");

DataFrame dfWithDoubleNum =
  df.withColumn("doubleNum",
    doubleUdf.apply(Functions.col("quantity")));

dfWithDoubleNum.show();
Copy
CREATE FUNCTION <temp_function_name>
  RETURNS INT
  LANGUAGE JAVA
  ...
  AS
  ...;

SELECT ...,
  <temp_function_name>(quantity) AS doublenum
  FROM sample_product_data;
Copy
UserDefinedFunction doubleUdf =
  session
    .udf()
    .registerTemporary(
      "doubleUdf",
      (Integer x) -> x + x,
      DataTypes.IntegerType,
      DataTypes.IntegerType);

DataFrame df = session.table("sample_product_data");

DataFrame dfWithDoubleNum =
  df.withColumn("doubleNum",
    Functions.callUDF("doubleUdf", Functions.col("quantity")));
dfWithDoubleNum.show();
Copy
CREATE FUNCTION doubleUdf(arg1 INT)
  RETURNS INT
  LANGUAGE JAVA
  ...
  AS
  ...;

SELECT ...,
  doubleUdf(quantity) AS doublenum
  FROM sample_product_data;
Copy
UserDefinedFunction doubleUdf =
  session
    .udf()
    .registerPermanent(
      "doubleUdf",
      (Integer x) -> x + x,
      DataTypes.IntegerType,
      DataTypes.IntegerType,
      "mystage");

DataFrame df = session.table("sample_product_data");

DataFrame dfWithDoubleNum =
  df.withColumn("doubleNum",
    Functions.callUDF("doubleUdf", Functions.col("quantity")));
dfWithDoubleNum.show();
Copy

ストアドプロシージャの作成と呼び出し

Snowparkを使用したストアドプロシージャの作成ガイドについては、 Javaにおける DataFrames のストアドプロシージャの作成 をご参照ください。

SQL ステートメントの例

Snowparkコードの例

CREATE PROCEDURE <temp_procedure_name>(x INTEGER, y INTEGER)
  RETURNS INTEGER
  LANGUAGE JAVA
  ...
  AS
  $$
  BEGIN
   RETURN x + y;
  END
  $$
  ;

CALL <temp_procedure_name>(2, 3);
Copy
StoredProcedure sp =
  session.sproc().registerTemporary((Session session, Integer x, Integer y) -> x + y,
    new DataType[] {DataTypes.IntegerType, DataTypes.IntegerType},
    DataTypes.IntegerType);

  session.storedProcedure(sp, 2, 3).show();
Copy
CREATE PROCEDURE sproc(x INTEGER, y INTEGER)
  RETURNS INTEGER
  LANGUAGE JAVA
  ...
  AS
  $$
  BEGIN
   RETURN x + y;
  END
  $$
  ;

CALL sproc(2, 3);
Copy
String name = "sproc";

StoredProcedure sp =
  session.sproc().registerTemporary(name,
    (Session session, Integer x, Integer y) -> x + y,
    new DataType[] {DataTypes.IntegerType, DataTypes.IntegerType},
    DataTypes.IntegerType);

  session.storedProcedure(name, 2, 3).show();
Copy
CREATE PROCEDURE add_hundred(x INTEGER)
  RETURNS INTEGER
  LANGUAGE JAVA
  ...
  AS
  $$
  BEGIN
   RETURN x + 100;
  END
  $$
  ;

CALL add_hundred(3);
Copy
String name = "add_hundred";
String stageName = "sproc_libs";

StoredProcedure sp =
  session.sproc().registerPermanent(
    name,
    (Session session, Integer x) -> x + 100,
    DataTypes.IntegerType,
    DataTypes.IntegerType,
    stageName,
    true);

  session.storedProcedure(name, 3).show();
Copy