Référence rapide : APIs Snowpark Scala pour commandes SQL¶
Cette rubrique fournit une référence rapide de certaines des APIs Snowpark qui correspondent aux commandes SQL.
(Notez que ce n’est pas une liste complète des APIs qui correspondent aux commandes SQL).
Dans ce chapitre :
Exécution des requêtes¶
Sélection de colonnes¶
Pour sélectionner des colonnes spécifiques, utilisez DataFrame.select.
Exemple d’une instruction SQL |
Exemple de code Snowpark |
---|---|
SELECT id, name FROM sample_product_data;
|
val dfSelectedCols = df.select(col("id"), col("name"))
dfSelectedCols.show()
|
Renommage de colonnes¶
Pour renommer une colonne, utilisez Column.as, Column.alias, ou Column.name.
Exemple d’une instruction SQL |
Exemple de code Snowpark |
---|---|
SELECT id AS item_id FROM sample_product_data;
|
val dfRenamedCol = df.select(col("id").as("item_id"))
dfRenamedCol.show()
|
val dfRenamedCol = df.select(col("id").alias("item_id"))
dfRenamedCol.show()
|
|
val dfRenamedCol = df.select(col("id").name("item_id"))
dfRenamedCol.show()
|
Filtrage des données¶
Pour filtrer les données, utilisez DataFrame.filter ou DataFrame.where.
Exemple d’une instruction SQL |
Exemple de code Snowpark |
---|---|
SELECT * FROM sample_product_data WHERE id = 1;
|
val dfFilteredRows = df.filter((col("id") === 1))
dfFilteredRows.show()
|
val dfFilteredRows = df.where((col("id") === 1))
dfFilteredRows.show()
|
Tri des données¶
Pour trier les données, utilisez DataFrame.sort.
Exemple d’une instruction SQL |
Exemple de code Snowpark |
---|---|
SELECT * FROM sample_product_data ORDER BY category_id;
|
val dfSorted = df.sort(col("category_id"))
dfSorted.show()
|
Limitation du nombre de lignes retournées¶
Pour limiter le nombre de lignes retournées, utilisez DataFrame.limit. Voir Limitation du nombre de lignes dans un DataFrame.
Exemple d’une instruction SQL |
Exemple de code Snowpark |
---|---|
SELECT * FROM sample_product_data
ORDER BY category_id LIMIT 2;
|
val dfSorted = df.sort(col("category_id")).limit(2);
val arrayRows = dfSorted.collect()
|
Exécution de jointures¶
Pour effectuer une jointure, utilisez DataFrame.join ou DataFrame.naturalJoin. Voir Joindre des DataFrames.
Exemple d’une instruction SQL |
Exemple de code Snowpark |
---|---|
SELECT * FROM sample_a
INNER JOIN sample_b
on sample_a.id_a = sample_b.id_a;
|
val dfJoined =
dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"))
dfJoined.show()
|
SELECT * FROM sample_a NATURAL JOIN sample_b;
|
val dfJoined = dfLhs.naturalJoin(dfRhs)
dfJoined.show()
|
Interrogation de données semi-structurées¶
Pour parcourir des données semi-structurées, utilisez Column.apply(« <field_name> ») et Column.apply(<index>). Voir Utilisation de données semi-structurées.
Exemple d’une instruction SQL |
Exemple de code Snowpark |
---|---|
SELECT src:salesperson.name FROM car_sales;
|
dfJsonField =
df.select(col("src")("salesperson")("name"))
dfJsonField.show()
|
Regroupement et agrégation de données¶
Pour regrouper des données, utilisez DataFrame.groupBy. Cela renvoie un objet RelationalGroupedDataFrame, que vous pouvez utiliser pour effectuer les agrégations.
Exemple d’une instruction SQL |
Exemple de code Snowpark |
---|---|
SELECT category_id, count(*)
FROM sample_product_data GROUP BY category_id;
|
val dfCountPerCategory = df.groupBy(col("category")).count()
dfCountPerCategory.show()
|
Appel des fonctions de fenêtre¶
Pour appeler une fonction de fenêtre, utilisez les méthodes de l’objet Window pour créer un objet WindowSpec, que vous pourrez utiliser à votre tour pour les fonctions de fenêtre (de la même manière que pour “<function> OVER … PARTITION BY … ORDER BY”).
Exemple d’une instruction SQL |
Exemple de code Snowpark |
---|---|
SELECT category_id, price_date, SUM(amount) OVER
(PARTITION BY category_id ORDER BY price_date)
FROM prices ORDER BY price_date;
|
val window = Window.partitionBy(
col("category")).orderBy(col("price_date"))
val dfCumulativePrices = dfPrices.select(
col("category"), col("price_date"),
sum(col("amount")).over(window)).sort(col("price_date"))
dfCumulativePrices.show()
|
Mise à jour, suppression et fusion des lignes¶
Pour mettre à jour, supprimer et fusionner les lignes d’une table, utilisez Updatable. Voir Mise à jour, suppression et fusion des lignes d’une table.
Exemple d’une instruction SQL |
Exemple de code Snowpark |
---|---|
UPDATE sample_product_data
SET serial_number = 'xyz' WHERE id = 12;
|
val updateResult =
updatableDf.update(
Map("serial_number" -> lit("xyz")),
col("id") === 12)
|
DELETE FROM sample_product_data
WHERE category_id = 50;
|
val deleteResult =
updatableDf.delete(updatableDf("category_id") === 50)
|
MERGE INTO target_table USING source_table
ON target_table.id = source_table.id
WHEN MATCHED THEN
UPDATE SET target_table.description =
source_table.description;
|
val mergeResult =
target.merge(source, target("id") === source("id"))
.whenMatched.update(Map("description" -> source("description")))
.collect()
|
Utilisation des zones de préparation¶
Pour plus d’informations sur l’utilisation de zones de préparation, voir Travailler avec des fichiers dans une zone de préparation.
Chargement et téléchargement de fichiers à partir d’une zone de préparation¶
Pour charger et télécharger des fichiers à partir d’une zone de préparation, utilisez FileOperation. Voir Chargement et téléchargement de fichiers dans une zone de préparation.
Exemple d’une instruction SQL |
Exemple de code Snowpark |
---|---|
PUT file:///tmp/*.csv @myStage OVERWRITE = TRUE;
|
val putOptions = Map("OVERWRITE" -> "TRUE")
val putResults = session.file.put(
"file:///tmp/*.csv", "@myStage", putOptions)
|
GET @myStage file:///tmp PATTERN = '.*.csv.gz';
|
val getOptions = Map("PATTERN" -> s"'.*.csv.gz'")
val getResults = session.file.get(
"@myStage", "file:///tmp", getOptions)
|
Lecture des données de fichiers dans une zone de préparation¶
Pour lire les données de fichiers dans une zone de préparation, utilisez DataFrameReader pour créer un DataFrame pour les données. Voir Configuration d’un DataFrame pour des fichiers dans une zone de préparation.
Exemple d’une instruction SQL |
Exemple de code Snowpark |
---|---|
CREATE FILE FORMAT snowpark_temp_format TYPE = JSON;
SELECT "$1"[0]['salesperson']['name'] FROM (
SELECT $1::VARIANT AS "$1" FROM @mystage/car_sales.json(
FILE_FORMAT => 'snowpark_temp_format')) LIMIT 10;
DROP FILE FORMAT snowpark_temp_format;
|
val df = session.read.json(
"@mystage/car_sales.json").select(
col("$1")(0)("salesperson")("name"))
df.show();
|
Copie des données de fichiers dans une zone de préparation vers une table¶
Pour copier des données de fichiers dans une zone de préparation vers une table, utilisez DataFrameReader pour créer un CopyableDataFrame pour les données, et utilisez la méthode CopyableDataFrame.copyInto pour copier les données dans la table. Voir Copier les données d’un fichier dans une table.
Exemple d’une instruction SQL |
Exemple de code Snowpark |
---|---|
COPY INTO new_car_sales
FROM @mystage/car_sales.json
FILE_FORMAT = (TYPE = JSON);
|
val dfCopyableDf = session.read.json("@mystage/car_sales.json")
dfCopyableDf.copyInto("new_car_sales")
|
Sauvegarde d’un DataFrame dans des fichiers sur une zone de préparation¶
Pour enregistrer un DataFrame dans des fichiers sur une zone de préparation, utilisez la méthode DataFrameWriter dont le nom correspond au format des fichiers que vous souhaitez utiliser. Voir Sauvegarde d’un DataFrame dans des fichiers sur une zone de préparation.
Exemple d’une instruction SQL |
Exemple de code Snowpark |
---|---|
COPY INTO @mystage/saved_data.json
FROM ( SELECT * FROM (car_sales) )
FILE_FORMAT = ( TYPE = JSON COMPRESSION = 'none' )
OVERWRITE = TRUE
DETAILED_OUTPUT = TRUE
|
val df = session.table("car_sales")
val writeFileResult = df.write.mode(
SaveMode.Overwrite).option(
"DETAILED_OUTPUT", "TRUE").option(
"compression", "none").json(
"@mystage/saved_data.json")
|
Création et appel de fonctions définies par l’utilisateur (UDFs)¶
Pour créer une fonction Scala qui sert d’UDF (une UDF anonyme), utilisez udf.
Pour créer une UDF temporaire ou permanente que vous pouvez appeler par son nom, utilisez UDFRegistration.registerTemporary ou UDFRegistration.registerPermanent.
Pour appeler une UDF permanente par son nom, utilisez callUDF.
Pour plus de détails, voir Création de fonctions définies par l’utilisateur (UDFs) pour DataFrames dans Scala et Appel de fonctions scalaires définies par l’utilisateur (UDFs).
Exemple d’une instruction SQL |
Exemple de code Snowpark |
---|---|
CREATE FUNCTION <temp_function_name>
RETURNS INT
LANGUAGE JAVA
...
AS
...;
SELECT ...,
<temp_function_name>(amount) AS doublenum
FROM sample_product_data;
|
val doubleUdf = udf((x: Int) => x + x)
val dfWithDoubleNum = df.withColumn(
"doubleNum", doubleUdf(col("amount")))
dfWithDoubleNum.show()
|
CREATE FUNCTION <temp_function_name>
RETURNS INT
LANGUAGE JAVA
...
AS
...;
SELECT ...,
<temp_function_name>(amount) AS doublenum
FROM sample_product_data;
|
session.udf.registerTemporary(
"doubleUdf", (x: Int) => x + x)
val dfWithDoubleNum = df.withColumn(
"doubleNum", callUDF("doubleUdf", (col("amount"))))
dfWithDoubleNum.show()
|
CREATE FUNCTION doubleUdf(arg1 INT)
RETURNS INT
LANGUAGE JAVA
...
AS
...;
SELECT ...,
doubleUdf(amount) AS doublenum
FROM sample_product_data;
|
session.udf.registerPermanent(
"doubleUdf", (x: Int) => x + x, "mystage")
val dfWithDoubleNum = df.withColumn(
"doubleNum", callUDF("doubleUdf", (col("amount"))))
dfWithDoubleNum.show()
|
Création et appel de procédures stockées¶
Pour un guide sur la création de procédures stockées avec Snowpark, consultez Création de procédures stockées pour DataFrames dans Scala.
Pour créer une procédure temporaire anonyme ou nommée, utilisez une méthode
registerTemporary
de com.snowflake.snowpark.SProcRegistration.Pour créer une procédure permanente nommée, utilisez une méthode
registerPermanent
de la classe com.snowflake.snowpark.SProcRegistration.Pour appeler une procédure stockée, utilisez la méthode
storedProcedure
de la classe com.snowflake.snowpark.Session.
Exemple d’une instruction SQL |
Exemple de code Snowpark |
---|---|
CREATE PROCEDURE <temp_procedure_name>(x INTEGER, y INTEGER)
RETURNS INTEGER
LANGUAGE JAVA
...
AS
$$
BEGIN
RETURN x + y;
END
$$
;
CALL <temp_procedure_name>(2, 3);
|
StoredProcedure sp =
session.sproc().registerTemporary((Session session, Integer x, Integer y) -> x + y,
new DataType[] {DataTypes.IntegerType, DataTypes.IntegerType},
DataTypes.IntegerType);
session.storedProcedure(sp, 2, 3).show();
|
CREATE PROCEDURE sproc(x INTEGER, y INTEGER)
RETURNS INTEGER
LANGUAGE JAVA
...
AS
$$
BEGIN
RETURN x + y;
END
$$
;
CALL sproc(2, 3);
|
String name = "sproc";
StoredProcedure sp =
session.sproc().registerTemporary(name,
(Session session, Integer x, Integer y) -> x + y,
new DataType[] {DataTypes.IntegerType, DataTypes.IntegerType},
DataTypes.IntegerType);
session.storedProcedure(name, 2, 3).show();
|
CREATE PROCEDURE add_hundred(x INTEGER)
RETURNS INTEGER
LANGUAGE JAVA
...
AS
$$
BEGIN
RETURN x + 100;
END
$$
;
CALL add_hundred(3);
|
val name: String = "add_hundred"
val stageName: String = "sproc_libs"
val sp: StoredProcedure =
session.sproc.registerPermanent(
name,
(session: Session, x: Int) => x + 100,
stageName,
true
)
session.storedProcedure(name, 3).show
|