Référence rapide : APIs Snowpark Scala pour commandes SQL

Cette rubrique fournit une référence rapide de certaines des APIs Snowpark qui correspondent aux commandes SQL.

(Notez que ce n’est pas une liste complète des APIs qui correspondent aux commandes SQL).

Dans ce chapitre :

Exécution des requêtes

Sélection de colonnes

Pour sélectionner des colonnes spécifiques, utilisez DataFrame.select.

Exemple d’une instruction SQL

Exemple de code Snowpark

SELECT id, name FROM sample_product_data;
Copy
val dfSelectedCols = df.select(col("id"), col("name"))
dfSelectedCols.show()
Copy

Renommage de colonnes

Pour renommer une colonne, utilisez Column.as, Column.alias, ou Column.name.

Exemple d’une instruction SQL

Exemple de code Snowpark

SELECT id AS item_id FROM sample_product_data;
Copy
val dfRenamedCol = df.select(col("id").as("item_id"))
dfRenamedCol.show()
Copy
val dfRenamedCol = df.select(col("id").alias("item_id"))
dfRenamedCol.show()
Copy
val dfRenamedCol = df.select(col("id").name("item_id"))
dfRenamedCol.show()
Copy

Filtrage des données

Pour filtrer les données, utilisez DataFrame.filter ou DataFrame.where.

Exemple d’une instruction SQL

Exemple de code Snowpark

SELECT * FROM sample_product_data WHERE id = 1;
Copy
val dfFilteredRows = df.filter((col("id") === 1))
dfFilteredRows.show()
Copy
val dfFilteredRows = df.where((col("id") === 1))
dfFilteredRows.show()
Copy

Tri des données

Pour trier les données, utilisez DataFrame.sort.

Exemple d’une instruction SQL

Exemple de code Snowpark

SELECT * FROM sample_product_data ORDER BY category_id;
Copy
val dfSorted = df.sort(col("category_id"))
dfSorted.show()
Copy

Limitation du nombre de lignes retournées

Pour limiter le nombre de lignes retournées, utilisez DataFrame.limit. Voir Limitation du nombre de lignes dans un DataFrame.

Exemple d’une instruction SQL

Exemple de code Snowpark

SELECT * FROM sample_product_data
  ORDER BY category_id LIMIT 2;
Copy
val dfSorted = df.sort(col("category_id")).limit(2);
val arrayRows = dfSorted.collect()
Copy

Exécution de jointures

Pour effectuer une jointure, utilisez DataFrame.join ou DataFrame.naturalJoin. Voir Joindre des DataFrames.

Exemple d’une instruction SQL

Exemple de code Snowpark

SELECT * FROM sample_a
  INNER JOIN sample_b
  on sample_a.id_a = sample_b.id_a;
Copy
val dfJoined =
  dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"))
dfJoined.show()
Copy
SELECT * FROM sample_a NATURAL JOIN sample_b;
Copy
val dfJoined = dfLhs.naturalJoin(dfRhs)
dfJoined.show()
Copy

Interrogation de données semi-structurées

Pour parcourir des données semi-structurées, utilisez Column.apply(« <field_name> ») et Column.apply(<index>). Voir Utilisation de données semi-structurées.

Exemple d’une instruction SQL

Exemple de code Snowpark

SELECT src:salesperson.name FROM car_sales;
Copy
dfJsonField =
  df.select(col("src")("salesperson")("name"))
dfJsonField.show()
Copy

Regroupement et agrégation de données

Pour regrouper des données, utilisez DataFrame.groupBy. Cela renvoie un objet RelationalGroupedDataFrame, que vous pouvez utiliser pour effectuer les agrégations.

Exemple d’une instruction SQL

Exemple de code Snowpark

SELECT category_id, count(*)
  FROM sample_product_data GROUP BY category_id;
Copy
val dfCountPerCategory = df.groupBy(col("category")).count()
dfCountPerCategory.show()
Copy

Appel des fonctions de fenêtre

Pour appeler une fonction de fenêtre, utilisez les méthodes de l’objet Window pour créer un objet WindowSpec, que vous pourrez utiliser à votre tour pour les fonctions de fenêtre (de la même manière que pour “<function> OVER … PARTITION BY … ORDER BY”).

Exemple d’une instruction SQL

Exemple de code Snowpark

SELECT category_id, price_date, SUM(amount) OVER
  (PARTITION BY category_id ORDER BY price_date)
  FROM prices ORDER BY price_date;
Copy
val window = Window.partitionBy(
  col("category")).orderBy(col("price_date"))
val dfCumulativePrices = dfPrices.select(
  col("category"), col("price_date"),
  sum(col("amount")).over(window)).sort(col("price_date"))
dfCumulativePrices.show()
Copy

Mise à jour, suppression et fusion des lignes

Pour mettre à jour, supprimer et fusionner les lignes d’une table, utilisez Updatable. Voir Mise à jour, suppression et fusion des lignes d’une table.

Exemple d’une instruction SQL

Exemple de code Snowpark

UPDATE sample_product_data
  SET serial_number = 'xyz' WHERE id = 12;
Copy
val updateResult =
  updatableDf.update(
    Map("serial_number" -> lit("xyz")),
    col("id") === 12)
Copy
DELETE FROM sample_product_data
  WHERE category_id = 50;
Copy
val deleteResult =
  updatableDf.delete(updatableDf("category_id") === 50)
Copy
MERGE  INTO target_table USING source_table
  ON target_table.id = source_table.id
  WHEN MATCHED THEN
    UPDATE SET target_table.description =
      source_table.description;
Copy
val mergeResult =
   target.merge(source, target("id") === source("id"))
  .whenMatched.update(Map("description" -> source("description")))
  .collect()
Copy

Utilisation des zones de préparation

Pour plus d’informations sur l’utilisation de zones de préparation, voir Travailler avec des fichiers dans une zone de préparation.

Chargement et téléchargement de fichiers à partir d’une zone de préparation

Pour charger et télécharger des fichiers à partir d’une zone de préparation, utilisez FileOperation. Voir Chargement et téléchargement de fichiers dans une zone de préparation.

Exemple d’une instruction SQL

Exemple de code Snowpark

PUT file:///tmp/*.csv @myStage OVERWRITE = TRUE;
Copy
val putOptions = Map("OVERWRITE" -> "TRUE")
val putResults = session.file.put(
  "file:///tmp/*.csv", "@myStage", putOptions)
Copy
GET @myStage file:///tmp PATTERN = '.*.csv.gz';
Copy
val getOptions = Map("PATTERN" -> s"'.*.csv.gz'")
val getResults = session.file.get(
 "@myStage", "file:///tmp", getOptions)
Copy

Lecture des données de fichiers dans une zone de préparation

Pour lire les données de fichiers dans une zone de préparation, utilisez DataFrameReader pour créer un DataFrame pour les données. Voir Configuration d’un DataFrame pour des fichiers dans une zone de préparation.

Exemple d’une instruction SQL

Exemple de code Snowpark

CREATE FILE FORMAT snowpark_temp_format TYPE = JSON;
SELECT "$1"[0]['salesperson']['name'] FROM (
  SELECT $1::VARIANT AS "$1" FROM @mystage/car_sales.json(
    FILE_FORMAT => 'snowpark_temp_format')) LIMIT 10;
DROP FILE FORMAT snowpark_temp_format;
Copy
val df = session.read.json(
  "@mystage/car_sales.json").select(
    col("$1")(0)("salesperson")("name"))
df.show();
Copy

Copie des données de fichiers dans une zone de préparation vers une table

Pour copier des données de fichiers dans une zone de préparation vers une table, utilisez DataFrameReader pour créer un CopyableDataFrame pour les données, et utilisez la méthode CopyableDataFrame.copyInto pour copier les données dans la table. Voir Copier les données d’un fichier dans une table.

Exemple d’une instruction SQL

Exemple de code Snowpark

COPY INTO new_car_sales
  FROM @mystage/car_sales.json
  FILE_FORMAT = (TYPE = JSON);
Copy
val dfCopyableDf = session.read.json("@mystage/car_sales.json")
dfCopyableDf.copyInto("new_car_sales")
Copy

Sauvegarde d’un DataFrame dans des fichiers sur une zone de préparation

Pour enregistrer un DataFrame dans des fichiers sur une zone de préparation, utilisez la méthode DataFrameWriter dont le nom correspond au format des fichiers que vous souhaitez utiliser. Voir Sauvegarde d’un DataFrame dans des fichiers sur une zone de préparation.

Exemple d’une instruction SQL

Exemple de code Snowpark

COPY INTO @mystage/saved_data.json
  FROM (  SELECT  *  FROM (car_sales) )
  FILE_FORMAT = ( TYPE = JSON COMPRESSION = 'none' )
  OVERWRITE = TRUE
  DETAILED_OUTPUT = TRUE
Copy
val df = session.table("car_sales")
val writeFileResult = df.write.mode(
  SaveMode.Overwrite).option(
  "DETAILED_OUTPUT", "TRUE").option(
  "compression", "none").json(
  "@mystage/saved_data.json")
Copy

Création et appel de fonctions définies par l’utilisateur (UDFs)

Pour créer une fonction Scala qui sert d’UDF (une UDF anonyme), utilisez udf.

Pour créer une UDF temporaire ou permanente que vous pouvez appeler par son nom, utilisez UDFRegistration.registerTemporary ou UDFRegistration.registerPermanent.

Pour appeler une UDF permanente par son nom, utilisez callUDF.

Pour plus de détails, voir Création de fonctions définies par l’utilisateur (UDFs) pour DataFrames dans Scala et Appel de fonctions scalaires définies par l’utilisateur (UDFs).

Exemple d’une instruction SQL

Exemple de code Snowpark

CREATE FUNCTION <temp_function_name>
  RETURNS INT
  LANGUAGE JAVA
  ...
  AS
  ...;

SELECT ...,
  <temp_function_name>(amount) AS doublenum
  FROM sample_product_data;
Copy
val doubleUdf = udf((x: Int) => x + x)
val dfWithDoubleNum = df.withColumn(
 "doubleNum", doubleUdf(col("amount")))
dfWithDoubleNum.show()
Copy
CREATE FUNCTION <temp_function_name>
  RETURNS INT
  LANGUAGE JAVA
  ...
  AS
  ...;

SELECT ...,
  <temp_function_name>(amount) AS doublenum
  FROM sample_product_data;
Copy
session.udf.registerTemporary(
  "doubleUdf", (x: Int) => x + x)
val dfWithDoubleNum = df.withColumn(
 "doubleNum", callUDF("doubleUdf", (col("amount"))))
dfWithDoubleNum.show()
Copy
CREATE FUNCTION doubleUdf(arg1 INT)
  RETURNS INT
  LANGUAGE JAVA
  ...
  AS
  ...;

SELECT ...,
  doubleUdf(amount) AS doublenum
  FROM sample_product_data;
Copy
session.udf.registerPermanent(
  "doubleUdf", (x: Int) => x + x, "mystage")
val dfWithDoubleNum = df.withColumn(
 "doubleNum", callUDF("doubleUdf", (col("amount"))))
dfWithDoubleNum.show()
Copy

Création et appel de procédures stockées

Pour un guide sur la création de procédures stockées avec Snowpark, consultez Création de procédures stockées pour DataFrames dans Scala.

Exemple d’une instruction SQL

Exemple de code Snowpark

CREATE PROCEDURE <temp_procedure_name>(x INTEGER, y INTEGER)
  RETURNS INTEGER
  LANGUAGE JAVA
  ...
  AS
  $$
  BEGIN
   RETURN x + y;
  END
  $$
  ;

CALL <temp_procedure_name>(2, 3);
Copy
StoredProcedure sp =
  session.sproc().registerTemporary((Session session, Integer x, Integer y) -> x + y,
    new DataType[] {DataTypes.IntegerType, DataTypes.IntegerType},
    DataTypes.IntegerType);

  session.storedProcedure(sp, 2, 3).show();
Copy
CREATE PROCEDURE sproc(x INTEGER, y INTEGER)
  RETURNS INTEGER
  LANGUAGE JAVA
  ...
  AS
  $$
  BEGIN
   RETURN x + y;
  END
  $$
  ;

CALL sproc(2, 3);
Copy
String name = "sproc";
StoredProcedure sp =
  session.sproc().registerTemporary(name,
    (Session session, Integer x, Integer y) -> x + y,
    new DataType[] {DataTypes.IntegerType, DataTypes.IntegerType},
    DataTypes.IntegerType);

  session.storedProcedure(name, 2, 3).show();
Copy
CREATE PROCEDURE add_hundred(x INTEGER)
  RETURNS INTEGER
  LANGUAGE JAVA
  ...
  AS
  $$
  BEGIN
   RETURN x + 100;
  END
  $$
  ;

CALL add_hundred(3);
Copy
val name: String = "add_hundred"
val stageName: String = "sproc_libs"

val sp: StoredProcedure =
  session.sproc.registerPermanent(
    name,
    (session: Session, x: Int) => x + 100,
    stageName,
    true
  )

session.storedProcedure(name, 3).show
Copy