빠른 참조: SQL 명령용 Snowpark Scala API

이 항목에서는 SQL 명령에 해당하는 일부 Snowpark API에 대한 빠른 참조를 제공합니다.

(이것은 SQL 명령에 해당하는 API의 전체 목록이 아닙니다.)

이 항목의 내용:

쿼리 수행하기

열 선택하기

특정 열을 선택하려면 DataFrame.select 를 사용합니다.

SQL 문의 예

Snowpark 코드의 예

SELECT id, name FROM sample_product_data;
Copy
val dfSelectedCols = df.select(col("id"), col("name"))
dfSelectedCols.show()
Copy

열 이름 바꾸기

열 이름을 바꾸려면 Column.as, Column.alias 또는 Column.name 을 사용합니다.

SQL 문의 예

Snowpark 코드의 예

SELECT id AS item_id FROM sample_product_data;
Copy
val dfRenamedCol = df.select(col("id").as("item_id"))
dfRenamedCol.show()
Copy
val dfRenamedCol = df.select(col("id").alias("item_id"))
dfRenamedCol.show()
Copy
val dfRenamedCol = df.select(col("id").name("item_id"))
dfRenamedCol.show()
Copy

데이터 필터링하기

데이터를 필터링하려면 DataFrame.filter 또는 DataFrame.where 를 사용합니다.

SQL 문의 예

Snowpark 코드의 예

SELECT * FROM sample_product_data WHERE id = 1;
Copy
val dfFilteredRows = df.filter((col("id") === 1))
dfFilteredRows.show()
Copy
val dfFilteredRows = df.where((col("id") === 1))
dfFilteredRows.show()
Copy

데이터 정렬하기

데이터를 정렬하려면 DataFrame.sort 를 사용합니다.

SQL 문의 예

Snowpark 코드의 예

SELECT * FROM sample_product_data ORDER BY category_id;
Copy
val dfSorted = df.sort(col("category_id"))
dfSorted.show()
Copy

반환되는 행 수 제한하기

반환되는 행 수를 제한하려면 DataFrame.limit 를 사용합니다. DataFrame의 행 수 제한하기 섹션을 참조하십시오.

SQL 문의 예

Snowpark 코드의 예

SELECT * FROM sample_product_data
  ORDER BY category_id LIMIT 2;
Copy
val dfSorted = df.sort(col("category_id")).limit(2);
val arrayRows = dfSorted.collect()
Copy

조인 수행하기

조인을 수행하려면 DataFrame.join 또는 DataFrame.naturalJoin 을 사용합니다. DataFrame 조인하기 섹션을 참조하십시오.

SQL 문의 예

Snowpark 코드의 예

SELECT * FROM sample_a
  INNER JOIN sample_b
  on sample_a.id_a = sample_b.id_a;
Copy
val dfJoined =
  dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"))
dfJoined.show()
Copy
SELECT * FROM sample_a NATURAL JOIN sample_b;
Copy
val dfJoined = dfLhs.naturalJoin(dfRhs)
dfJoined.show()
Copy

반정형 데이터 쿼리하기

반정형 데이터를 탐색하려면 Column.apply(“<필드_이름>”)Column.apply(<인덱스>) 를 사용합니다. 반정형 데이터로 작업하기 섹션을 참조하십시오.

SQL 문의 예

Snowpark 코드의 예

SELECT src:salesperson.name FROM car_sales;
Copy
dfJsonField =
  df.select(col("src")("salesperson")("name"))
dfJsonField.show()
Copy

데이터를 그룹화하고 집계하기

데이터를 그룹화하려면 DataFrame.groupBy 를 사용합니다. 그러면 집계를 수행하는 데 사용할 수 있는 RelationalGroupedDataFrame 오브젝트가 반환됩니다.

SQL 문의 예

Snowpark 코드의 예

SELECT category_id, count(*)
  FROM sample_product_data GROUP BY category_id;
Copy
val dfCountPerCategory = df.groupBy(col("category")).count()
dfCountPerCategory.show()
Copy

윈도우 함수 호출하기

윈도우 함수 를 호출하려면 윈도우 오브젝트 메서드를 사용하여 WindowSpec 오브젝트를 빌드합니다. 그러면 이를 윈도우 함수에 사용할 수 있습니다(‘<function> OVER … PARTITION BY … ORDER BY’ 사용과 비슷함).

SQL 문의 예

Snowpark 코드의 예

SELECT category_id, price_date, SUM(amount) OVER
  (PARTITION BY category_id ORDER BY price_date)
  FROM prices ORDER BY price_date;
Copy
val window = Window.partitionBy(
  col("category")).orderBy(col("price_date"))
val dfCumulativePrices = dfPrices.select(
  col("category"), col("price_date"),
  sum(col("amount")).over(window)).sort(col("price_date"))
dfCumulativePrices.show()
Copy

행 업데이트, 삭제, 병합하기

테이블의 행을 업데이트, 삭제, 병합하려면 Updatable 을 사용합니다. 테이블의 행 업데이트, 삭제, 병합하기 섹션을 참조하십시오.

SQL 문의 예

Snowpark 코드의 예

UPDATE sample_product_data
  SET serial_number = 'xyz' WHERE id = 12;
Copy
val updateResult =
  updatableDf.update(
    Map("serial_number" -> lit("xyz")),
    col("id") === 12)
Copy
DELETE FROM sample_product_data
  WHERE category_id = 50;
Copy
val deleteResult =
  updatableDf.delete(updatableDf("category_id") === 50)
Copy
MERGE  INTO target_table USING source_table
  ON target_table.id = source_table.id
  WHEN MATCHED THEN
    UPDATE SET target_table.description =
      source_table.description;
Copy
val mergeResult =
   target.merge(source, target("id") === source("id"))
  .whenMatched.update(Map("description" -> source("description")))
  .collect()
Copy

스테이지 작업하기

스테이지 작업에 대한 자세한 내용은 스테이지에서 파일 작업하기 섹션을 참조하십시오.

스테이지에서 파일 업로드 및 다운로드하기

스테이지에서 파일을 업로드 및 다운로드하려면 FileOperation 을 사용합니다. 스테이지에서 파일 업로드 및 다운로드하기 섹션을 참조하십시오.

SQL 문의 예

Snowpark 코드의 예

PUT file:///tmp/*.csv @myStage OVERWRITE = TRUE;
Copy
val putOptions = Map("OVERWRITE" -> "TRUE")
val putResults = session.file.put(
  "file:///tmp/*.csv", "@myStage", putOptions)
Copy
GET @myStage file:///tmp PATTERN = '.*.csv.gz';
Copy
val getOptions = Map("PATTERN" -> s"'.*.csv.gz'")
val getResults = session.file.get(
 "@myStage", "file:///tmp", getOptions)
Copy

스테이지의 파일에서 데이터 읽기

스테이지의 파일에서 데이터를 읽으려면 DataFrameReader 를 사용하여 데이터에 대한 DataFrame을 만듭니다. 스테이지의 파일에 대한 DataFrame 설정하기 섹션을 참조하십시오.

SQL 문의 예

Snowpark 코드의 예

CREATE FILE FORMAT snowpark_temp_format TYPE = JSON;
SELECT "$1"[0]['salesperson']['name'] FROM (
  SELECT $1::VARIANT AS "$1" FROM @mystage/car_sales.json(
    FILE_FORMAT => 'snowpark_temp_format')) LIMIT 10;
DROP FILE FORMAT snowpark_temp_format;
Copy
val df = session.read.json(
  "@mystage/car_sales.json").select(
    col("$1")(0)("salesperson")("name"))
df.show();
Copy

스테이지의 파일에서 테이블로 데이터 복사하기

스테이지의 파일에서 테이블로 데이터를 복사하려면 DataFrameReader 를 사용하여 데이터에 대한 CopyableDataFrame 을 만들고, CopyableDataFrame.copyInto 메서드를 사용하여 데이터를 테이블에 복사합니다. 파일에서 테이블로 데이터 복사하기 섹션을 참조하십시오.

SQL 문의 예

Snowpark 코드의 예

COPY INTO new_car_sales
  FROM @mystage/car_sales.json
  FILE_FORMAT = (TYPE = JSON);
Copy
val dfCopyableDf = session.read.json("@mystage/car_sales.json")
dfCopyableDf.copyInto("new_car_sales")
Copy

스테이지의 파일에 DataFrame 저장하기

스테이지의 파일에 DataFrame을 저장하려면 사용하려는 파일의 형식을 따서 명명된 DataFrameWriter 메서드를 사용하십시오. 스테이지의 파일에 DataFrame 저장하기 섹션을 참조하십시오.

SQL 문의 예

Snowpark 코드의 예

COPY INTO @mystage/saved_data.json
  FROM (  SELECT  *  FROM (car_sales) )
  FILE_FORMAT = ( TYPE = JSON COMPRESSION = 'none' )
  OVERWRITE = TRUE
  DETAILED_OUTPUT = TRUE
Copy
val df = session.table("car_sales")
val writeFileResult = df.write.mode(
  SaveMode.Overwrite).option(
  "DETAILED_OUTPUT", "TRUE").option(
  "compression", "none").json(
  "@mystage/saved_data.json")
Copy

사용자 정의 함수(UDF)를 만들고 호출하기

UDF(익명 UDF) 역할을 하는 Scala 함수를 만들려면 udf 를 사용합니다.

이름으로 호출할 수 있는 임시 또는 영구 UDF를 만들려면 UDFRegistration.registerTemporary 또는 UDFRegistration.registerPermanent 를 사용합니다.

영구 UDF를 이름으로 호출하려면 callUDF 를 사용합니다.

자세한 내용은 Scala에서 DataFrame용 사용자 정의 함수(UDF) 만들기스칼라 사용자 정의 함수(UDF) 호출하기 섹션을 참조하십시오.

SQL 문의 예

Snowpark 코드의 예

CREATE FUNCTION <temp_function_name>
  RETURNS INT
  LANGUAGE JAVA
  ...
  AS
  ...;

SELECT ...,
  <temp_function_name>(amount) AS doublenum
  FROM sample_product_data;
Copy
val doubleUdf = udf((x: Int) => x + x)
val dfWithDoubleNum = df.withColumn(
 "doubleNum", doubleUdf(col("amount")))
dfWithDoubleNum.show()
Copy
CREATE FUNCTION <temp_function_name>
  RETURNS INT
  LANGUAGE JAVA
  ...
  AS
  ...;

SELECT ...,
  <temp_function_name>(amount) AS doublenum
  FROM sample_product_data;
Copy
session.udf.registerTemporary(
  "doubleUdf", (x: Int) => x + x)
val dfWithDoubleNum = df.withColumn(
 "doubleNum", callUDF("doubleUdf", (col("amount"))))
dfWithDoubleNum.show()
Copy
CREATE FUNCTION doubleUdf(arg1 INT)
  RETURNS INT
  LANGUAGE JAVA
  ...
  AS
  ...;

SELECT ...,
  doubleUdf(amount) AS doublenum
  FROM sample_product_data;
Copy
session.udf.registerPermanent(
  "doubleUdf", (x: Int) => x + x, "mystage")
val dfWithDoubleNum = df.withColumn(
 "doubleNum", callUDF("doubleUdf", (col("amount"))))
dfWithDoubleNum.show()
Copy

저장 프로시저 만들기 및 호출하기

Snowpark를 사용하여 저장 프로시저를 만드는 방법에 대한 안내는 Scala로 DataFrames용 저장 프로시저 만들기 섹션을 참조하십시오.

SQL 문의 예

Snowpark 코드의 예

CREATE PROCEDURE <temp_procedure_name>(x INTEGER, y INTEGER)
  RETURNS INTEGER
  LANGUAGE JAVA
  ...
  AS
  $$
  BEGIN
   RETURN x + y;
  END
  $$
  ;

CALL <temp_procedure_name>(2, 3);
Copy
StoredProcedure sp =
  session.sproc().registerTemporary((Session session, Integer x, Integer y) -> x + y,
    new DataType[] {DataTypes.IntegerType, DataTypes.IntegerType},
    DataTypes.IntegerType);

  session.storedProcedure(sp, 2, 3).show();
Copy
CREATE PROCEDURE sproc(x INTEGER, y INTEGER)
  RETURNS INTEGER
  LANGUAGE JAVA
  ...
  AS
  $$
  BEGIN
   RETURN x + y;
  END
  $$
  ;

CALL sproc(2, 3);
Copy
String name = "sproc";
StoredProcedure sp =
  session.sproc().registerTemporary(name,
    (Session session, Integer x, Integer y) -> x + y,
    new DataType[] {DataTypes.IntegerType, DataTypes.IntegerType},
    DataTypes.IntegerType);

  session.storedProcedure(name, 2, 3).show();
Copy
CREATE PROCEDURE add_hundred(x INTEGER)
  RETURNS INTEGER
  LANGUAGE JAVA
  ...
  AS
  $$
  BEGIN
   RETURN x + 100;
  END
  $$
  ;

CALL add_hundred(3);
Copy
val name: String = "add_hundred"
val stageName: String = "sproc_libs"

val sp: StoredProcedure =
  session.sproc.registerPermanent(
    name,
    (session: Session, x: Int) => x + 100,
    stageName,
    true
  )

session.storedProcedure(name, 3).show
Copy