Kurzübersicht: Snowpark Scala-APIs für SQL-Befehle

Dieses Thema bietet eine Kurzübersicht einiger Snowpark-APIs, die den SQL-Befehlen entsprechen.

(Beachten Sie, dass dies keine vollständige Liste der APIs ist, die SQL-Befehlen entsprechen).

Unter diesem Thema:

Ausführen von Abfragen

Auswählen von Spalten

Um bestimmte Spalten auszuwählen, verwenden Sie DataFrame.select.

Beispiel für eine SQL-Anweisung

Beispiel für Snowpark-Code

SELECT id, name FROM sample_product_data;
Copy
val dfSelectedCols = df.select(col("id"), col("name"))
dfSelectedCols.show()
Copy

Umbenennen von Spalten

Verwenden Sie zum Umbenennen einer Spalte Column.as, Column.alias oder Column.name.

Beispiel für eine SQL-Anweisung

Beispiel für Snowpark-Code

SELECT id AS item_id FROM sample_product_data;
Copy
val dfRenamedCol = df.select(col("id").as("item_id"))
dfRenamedCol.show()
Copy
val dfRenamedCol = df.select(col("id").alias("item_id"))
dfRenamedCol.show()
Copy
val dfRenamedCol = df.select(col("id").name("item_id"))
dfRenamedCol.show()
Copy

Filtern von Daten

Verwenden Sie zum Filtern von Daten DataFrame.filter oder DataFrame.where.

Beispiel für eine SQL-Anweisung

Beispiel für Snowpark-Code

SELECT * FROM sample_product_data WHERE id = 1;
Copy
val dfFilteredRows = df.filter((col("id") === 1))
dfFilteredRows.show()
Copy
val dfFilteredRows = df.where((col("id") === 1))
dfFilteredRows.show()
Copy

Sortieren von Daten

Verwenden Sie zum Sortieren von Daten DataFrame.sort.

Beispiel für eine SQL-Anweisung

Beispiel für Snowpark-Code

SELECT * FROM sample_product_data ORDER BY category_id;
Copy
val dfSorted = df.sort(col("category_id"))
dfSorted.show()
Copy

Begrenzen der zurückgegebenen Zeilenanzahl

Um die Anzahl der zurückgegebenen Zeilen zu begrenzen, verwenden Sie DataFrame.limit. Siehe Begrenzen der Anzahl von Zeilen in einem DataFrame.

Beispiel für eine SQL-Anweisung

Beispiel für Snowpark-Code

SELECT * FROM sample_product_data
  ORDER BY category_id LIMIT 2;
Copy
val dfSorted = df.sort(col("category_id")).limit(2);
val arrayRows = dfSorted.collect()
Copy

Ausführen von Verknüpfungen (Joins)

Verwenden Sie zum Verknüpfen DataFrame.join oder DataFrame.naturalJoin. Siehe Verknüpfen von DataFrames.

Beispiel für eine SQL-Anweisung

Beispiel für Snowpark-Code

SELECT * FROM sample_a
  INNER JOIN sample_b
  on sample_a.id_a = sample_b.id_a;
Copy
val dfJoined =
  dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"))
dfJoined.show()
Copy
SELECT * FROM sample_a NATURAL JOIN sample_b;
Copy
val dfJoined = dfLhs.naturalJoin(dfRhs)
dfJoined.show()
Copy

Abfragen von semistrukturierten Daten

Verwenden Sie zum Durchlaufen semistrukturierter Daten Column.apply(„<Feldname>“) und Column.apply(<Index>). Siehe Verwenden von semistrukturierten Daten.

Beispiel für eine SQL-Anweisung

Beispiel für Snowpark-Code

SELECT src:salesperson.name FROM car_sales;
Copy
dfJsonField =
  df.select(col("src")("salesperson")("name"))
dfJsonField.show()
Copy

Gruppieren und Aggregieren von Daten

Verwenden Sie zum Gruppieren von Daten DataFrame.groupBy. Dies gibt ein RelationalGroupedDataFrame-Objekt zurück, das Sie zum Ausführen von Aggregationen verwenden können.

Beispiel für eine SQL-Anweisung

Beispiel für Snowpark-Code

SELECT category_id, count(*)
  FROM sample_product_data GROUP BY category_id;
Copy
val dfCountPerCategory = df.groupBy(col("category")).count()
dfCountPerCategory.show()
Copy

Aufrufen von Fensterfunktionen

Um eine Fensterfunktion aufzurufen, verwenden Sie die Methoden des Window-Objekts, um ein WindowSpec-Objekt zu erstellen, das Sie wiederum für Fensterfunktionen verwenden können (ähnlich wie bei „<Funktion> OVER … PARTITION BY … ORDER BY“).

Beispiel für eine SQL-Anweisung

Beispiel für Snowpark-Code

SELECT category_id, price_date, SUM(amount) OVER
  (PARTITION BY category_id ORDER BY price_date)
  FROM prices ORDER BY price_date;
Copy
val window = Window.partitionBy(
  col("category")).orderBy(col("price_date"))
val dfCumulativePrices = dfPrices.select(
  col("category"), col("price_date"),
  sum(col("amount")).over(window)).sort(col("price_date"))
dfCumulativePrices.show()
Copy

Aktualisieren, Löschen und Zusammenführen von Zeilen

Verwenden Sie zum Aktualisieren, Löschen und Zusammenzuführen von Zeilen Updatable. Siehe Aktualisieren, Löschen und Zusammenführen von Zeilen in einer Tabelle.

Beispiel für eine SQL-Anweisung

Beispiel für Snowpark-Code

UPDATE sample_product_data
  SET serial_number = 'xyz' WHERE id = 12;
Copy
val updateResult =
  updatableDf.update(
    Map("serial_number" -> lit("xyz")),
    col("id") === 12)
Copy
DELETE FROM sample_product_data
  WHERE category_id = 50;
Copy
val deleteResult =
  updatableDf.delete(updatableDf("category_id") === 50)
Copy
MERGE  INTO target_table USING source_table
  ON target_table.id = source_table.id
  WHEN MATCHED THEN
    UPDATE SET target_table.description =
      source_table.description;
Copy
val mergeResult =
   target.merge(source, target("id") === source("id"))
  .whenMatched.update(Map("description" -> source("description")))
  .collect()
Copy

Verwenden von Stagingbereichen

Weitere Informationen zum Verwenden von Stagingbereichen finden Sie unter Verwenden von Dateien in Stagingbereichen.

Hochladen und Herunterladen von Dateien in/aus Stagingbereichen

Verwenden Sie zum Hoch- und Herunterladen von Dateien in/aus Stagingbereichen FileOperation. Siehe Hochladen und Herunterladen von Dateien in Stagingbereichen.

Beispiel für eine SQL-Anweisung

Beispiel für Snowpark-Code

PUT file:///tmp/*.csv @myStage OVERWRITE = TRUE;
Copy
val putOptions = Map("OVERWRITE" -> "TRUE")
val putResults = session.file.put(
  "file:///tmp/*.csv", "@myStage", putOptions)
Copy
GET @myStage file:///tmp PATTERN = '.*.csv.gz';
Copy
val getOptions = Map("PATTERN" -> s"'.*.csv.gz'")
val getResults = session.file.get(
 "@myStage", "file:///tmp", getOptions)
Copy

Lesen von Daten aus Dateien in einem Stagingbereich

Verwenden Sie zum Lesen von Daten aus Dateien, die sich in einem Stagingbereich befinden, die DataFrameReader-Klasse, die das Erstellen eines DataFrame-Objekts für die Daten ermöglicht. Siehe Einrichten eines DataFrame für Dateien in einem Stagingbereich.

Beispiel für eine SQL-Anweisung

Beispiel für Snowpark-Code

CREATE FILE FORMAT snowpark_temp_format TYPE = JSON;
SELECT "$1"[0]['salesperson']['name'] FROM (
  SELECT $1::VARIANT AS "$1" FROM @mystage/car_sales.json(
    FILE_FORMAT => 'snowpark_temp_format')) LIMIT 10;
DROP FILE FORMAT snowpark_temp_format;
Copy
val df = session.read.json(
  "@mystage/car_sales.json").select(
    col("$1")(0)("salesperson")("name"))
df.show();
Copy

Kopieren von Daten aus Dateien in einem Stagingbereich in eine Tabelle

Verwenden Sie zum Kopieren von Dateien von einem Stagingbereich in eine Tabelle DataFrameReader, um ein CopyableDataFrame-Objekt für die Daten zu erstellen, und verwenden Sie die Methode CopyableDataFrame.copyInto, um die Daten in die Tabelle zu kopieren. Siehe Kopieren von Daten aus Dateien in eine Tabelle.

Beispiel für eine SQL-Anweisung

Beispiel für Snowpark-Code

COPY INTO new_car_sales
  FROM @mystage/car_sales.json
  FILE_FORMAT = (TYPE = JSON);
Copy
val dfCopyableDf = session.read.json("@mystage/car_sales.json")
dfCopyableDf.copyInto("new_car_sales")
Copy

Speichern eines DataFrame in Dateien in einem Stagingbereich

Um ein DataFrame-Objekt in Dateien in einem Stagingbereich zu speichern, verwenden Sie die DataFrameWriter-Methode, die nach dem Format der zu verwendenden Dateien benannt ist. Siehe Speichern eines DataFrame in Dateien in einem Stagingbereich.

Beispiel für eine SQL-Anweisung

Beispiel für Snowpark-Code

COPY INTO @mystage/saved_data.json
  FROM (  SELECT  *  FROM (car_sales) )
  FILE_FORMAT = ( TYPE = JSON COMPRESSION = 'none' )
  OVERWRITE = TRUE
  DETAILED_OUTPUT = TRUE
Copy
val df = session.table("car_sales")
val writeFileResult = df.write.mode(
  SaveMode.Overwrite).option(
  "DETAILED_OUTPUT", "TRUE").option(
  "compression", "none").json(
  "@mystage/saved_data.json")
Copy

Erstellen und Aufrufen von benutzerdefinierten Funktionen (UDFs)

Um eine Scala-Funktion zu erstellen, die als UDF (eine anonyme UDF) dient, verwenden Sie udf.

Um eine temporäre oder permanente UDF zu erstellen, die Sie mit Namen aufrufen können, verwenden Sie UDFRegistration.registerTemporary oder UDFRegistration.registerPermanent.

Um eine permanente UDF mit Namen aufzurufen, verwenden Sie callUDF.

Weitere Details dazu finden Sie unter Erstellen von benutzerdefinierten Funktionen (UDFs) für DataFrames in Scala und Aufrufen von skalaren benutzerdefinierten Funktionen (UDFs).

Beispiel für eine SQL-Anweisung

Beispiel für Snowpark-Code

CREATE FUNCTION <temp_function_name>
  RETURNS INT
  LANGUAGE JAVA
  ...
  AS
  ...;

SELECT ...,
  <temp_function_name>(amount) AS doublenum
  FROM sample_product_data;
Copy
val doubleUdf = udf((x: Int) => x + x)
val dfWithDoubleNum = df.withColumn(
 "doubleNum", doubleUdf(col("amount")))
dfWithDoubleNum.show()
Copy
CREATE FUNCTION <temp_function_name>
  RETURNS INT
  LANGUAGE JAVA
  ...
  AS
  ...;

SELECT ...,
  <temp_function_name>(amount) AS doublenum
  FROM sample_product_data;
Copy
session.udf.registerTemporary(
  "doubleUdf", (x: Int) => x + x)
val dfWithDoubleNum = df.withColumn(
 "doubleNum", callUDF("doubleUdf", (col("amount"))))
dfWithDoubleNum.show()
Copy
CREATE FUNCTION doubleUdf(arg1 INT)
  RETURNS INT
  LANGUAGE JAVA
  ...
  AS
  ...;

SELECT ...,
  doubleUdf(amount) AS doublenum
  FROM sample_product_data;
Copy
session.udf.registerPermanent(
  "doubleUdf", (x: Int) => x + x, "mystage")
val dfWithDoubleNum = df.withColumn(
 "doubleNum", callUDF("doubleUdf", (col("amount"))))
dfWithDoubleNum.show()
Copy

Erstellen und Aufrufen von gespeicherten Prozeduren

Eine Anleitung zum Erstellen von gespeicherten Prozeduren mit Snowpark finden Sie unter Erstellen von gespeicherten Prozeduren für DataFrames in Scala.

Beispiel für eine SQL-Anweisung

Beispiel für Snowpark-Code

CREATE PROCEDURE <temp_procedure_name>(x INTEGER, y INTEGER)
  RETURNS INTEGER
  LANGUAGE JAVA
  ...
  AS
  $$
  BEGIN
   RETURN x + y;
  END
  $$
  ;

CALL <temp_procedure_name>(2, 3);
Copy
StoredProcedure sp =
  session.sproc().registerTemporary((Session session, Integer x, Integer y) -> x + y,
    new DataType[] {DataTypes.IntegerType, DataTypes.IntegerType},
    DataTypes.IntegerType);

  session.storedProcedure(sp, 2, 3).show();
Copy
CREATE PROCEDURE sproc(x INTEGER, y INTEGER)
  RETURNS INTEGER
  LANGUAGE JAVA
  ...
  AS
  $$
  BEGIN
   RETURN x + y;
  END
  $$
  ;

CALL sproc(2, 3);
Copy
String name = "sproc";
StoredProcedure sp =
  session.sproc().registerTemporary(name,
    (Session session, Integer x, Integer y) -> x + y,
    new DataType[] {DataTypes.IntegerType, DataTypes.IntegerType},
    DataTypes.IntegerType);

  session.storedProcedure(name, 2, 3).show();
Copy
CREATE PROCEDURE add_hundred(x INTEGER)
  RETURNS INTEGER
  LANGUAGE JAVA
  ...
  AS
  $$
  BEGIN
   RETURN x + 100;
  END
  $$
  ;

CALL add_hundred(3);
Copy
val name: String = "add_hundred"
val stageName: String = "sproc_libs"

val sp: StoredProcedure =
  session.sproc.registerPermanent(
    name,
    (session: Session, x: Int) => x + 100,
    stageName,
    true
  )

session.storedProcedure(name, 3).show
Copy