Utilisation de DataFrames dans Snowpark

Dans Snowpark, le principal moyen par lequel vous interrogez et traitez les données est un DataFrame. Cette rubrique explique comment travailler avec des DataFrames.

Dans ce chapitre :

Pour récupérer et manipuler des données, vous utilisez la classe DataFrame. Un DataFrame représente un ensemble de données relationnelles qui est évalué de façon « lazy » : il ne s’exécute que lorsqu’une action spécifique est déclenchée. En un sens, un DataFrame est comme une requête qui doit être évaluée afin d’extraire des données.

Pour récupérer des données dans un DataFrame :

  1. Construit un DataFrame, en spécifiant la source des données pour l’ensemble de données.

    Par exemple, vous pouvez créer un DataFrame pour contenir les données d’une table, d’un fichier CSV externe ou de l’exécution d’une instruction SQL.

  2. Spécifier comment l’ensemble de données dans le DataFrame doit être transformé.

    Par exemple, vous pouvez spécifier quelles colonnes doivent être sélectionnées, comment les lignes doivent être filtrées, comment les résultats doivent être triés et regroupés, etc.

  3. Exécuter l’instruction pour récupérer les données dans le DataFrame.

    Pour récupérer les données dans le DataFrame, vous devez appeler une méthode qui exécute une action (par exemple, la méthode collect()).

Les sections suivantes expliquent ces étapes plus en détail.

Construire un DataFrame

Pour construire un DataFrame, vous pouvez utiliser les méthodes de la classe Session. Chacune des méthodes suivantes construit un DataFrame à partir d’un type différent de source de données :

  • Pour créer un DataFrame à partir des données d’une table, d’une vue ou d’un flux, appelez la méthode table :

    // Create a DataFrame from the data in the "products" table.
    val dfTable = session.table("products")
    

    Note

    La méthode session.table renvoie un objet Updatable. Updatable étend DataFrame et fournit des méthodes supplémentaires pour travailler avec les données de la table (par exemple, des méthodes pour mettre à jour et supprimer des données). Voir Mise à jour, suppression et fusion des lignes d’une table.

  • Pour créer un DataFrame à partir d’une séquence de valeurs, appelez la méthode createDataFrame :

    // Create a DataFrame containing a sequence of values.
    // In the DataFrame, name the columns "i" and "s".
    val dfSeq = session.createDataFrame(Seq((1, "one"), (2, "two"))).toDF("i", "s")
    
  • Pour créer un DataFrame contenant une plage de valeurs, appelez la méthode range :

    // Create a DataFrame from a range
    val dfRange = session.range(1, 10, 2)
    
  • Pour créer un DataFrame qui contiendra les données d’un fichier dans une zone de préparation, appelez read pour obtenir un objet DataFrameReader. Dans l’objet DataFrameReader, appelez la méthode correspondant au format des données du fichier :

    // Create a DataFrame from data in a stage.
    val dfJson = session.read.json("@mystage2/data1.json")
    
  • Pour créer un DataFrame qui contiendra les résultats d’une requête SQL, appelez la méthode sql :

    // Create a DataFrame from a SQL query
    val dfSql = session.sql("SELECT name from products")
    

    Remarque : bien que vous puissiez utiliser cette méthode pour exécuter des instructions SELECT qui récupèrent des données dans des tables et des fichiers mis en zone de préparation, vous devriez plutôt utiliser des méthodes table et read. Des méthodes telles que table et read permettent d’améliorer la mise en évidence de la syntaxe, la mise en évidence des erreurs et le remplissage automatique et intelligente du code dans les outils de développement.

Spécifier comment l’ensemble de données doit être transformé

Pour spécifier quelles colonnes doivent être sélectionnées et comment les résultats doivent être filtrés, triés, groupés, etc., appelez les méthodes DataFrame qui transforment l’ensemble de données. Pour identifier les colonnes dans ces méthodes, utilisez la fonction col ou une expression qui évalue une colonne. (Voir Spécification des colonnes et des expressions.)

Par exemple :

  • Pour spécifier les lignes à renvoyer, appelez la méthode filter :

    // Create a DataFrame for the rows with the ID 1
    // in the "products" table.
    //
    // This example uses the === operator of the Column object to perform an
    // equality check.
    val dfProductIdOne = dfProductInfo.filter(col("id") === 1)
    
  • Pour spécifier les colonnes qui doivent être sélectionnées, appelez la méthode select :

    // Import the col function from the functions object.
    import com.snowflake.snowpark.functions._
    
    // Create a DataFrame object for the "products" table.
    val dfProductInfo = session.table("products")
    
    // Create a DataFrame that contains the id, name, and serial_number
    // columns.
    val dfProductSerialNo =
        dfProductInfo.select(col("id"), col("name"), col("serial_number"))
    

Chaque méthode renvoie un nouvel objet DataFrame qui a été transformé. (La méthode n’affecte pas l’objet DataFrame original). Cela signifie que si vous souhaitez appliquer plusieurs transformations, vous pouvez enchaîner les appels de méthode, en appelant chaque méthode de transformation suivante sur le nouvel objet DataFrame renvoyé par l’appel de méthode précédent.

Notez que ces méthodes de transformation ne récupèrent pas les données de la base de données Snowflake. (Les méthodes d’action décrites dans Exécution d’une action pour évaluer un DataFrame effectuent la récupération des données). Les méthodes de transformation spécifient simplement comment l’instruction SQL doit être construite.

Joindre des DataFrames

Pour joindre des objets DataFrame, il faut appeler la méthode join :

// Create a DataFrame that joins two other DataFrames
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("key") === dfRhs.col("key"))

Notez que l’exemple utilise la méthode DataFrame.col pour spécifier les colonnes à utiliser dans la jointure. Voir Spécification des colonnes et des expressions pour en savoir plus sur cette méthode.

Si vous devez joindre une table avec elle-même sur différentes colonnes, vous ne pouvez pas effectuer l’auto-jointure avec un seul DataFrame. Les exemples suivants qui utilisent un seul DataFrame pour effectuer une auto-jointure échouent parce que les expressions de colonne pour "id" sont présentes dans les côtés gauche et droit de la jointure :

val dfJoined = df.join(df, col("id") === col("parent_id"))

val dfJoined = df.join(df, df("id") === df("parent_id"))

Ces deux exemples échouent avec l’exception suivante :

Exception in thread "main" com.snowflake.snowpark.SnowparkClientException:
  Joining a DataFrame to itself can lead to incorrect results due to ambiguity of column references.
  Instead, join this DataFrame to a clone() of itself.

Au lieu de cela, utilisez la méthode DataFrame.clone() pour créer un clone de l’objet DataFrame, et utilisez les deux objets DataFrame pour effectuer la jointure :

// Create a DataFrame object for the "products" table for the left-hand side of the join.
val dfLhs = session.table("products")
// Clone the DataFrame object to use as the right-hand side of the join.
val dfRhs = dfLhs.clone()

// Create a DataFrame that joins the two DataFrames
// for the "products" table on the "key" column.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id") === dfRhs.col("parent_id"))

Si vous souhaitez effectuer une auto-jointure sur la même colonne, appelez la méthode join qui transmet une Seq d’expressions de colonnes pour la clause USING :

// Create a DataFrame that performs a self-join on
// the DataFrame for the "products" table using the "key" column.
val dfJoined = df.join(df, Seq("key"))

Spécification des colonnes et des expressions

Lorsque vous appelez ces méthodes de transformation, vous pouvez avoir besoin de spécifier des colonnes ou des expressions qui utilisent des colonnes. Par exemple, lorsque vous appelez la méthode select, vous devez spécifier les colonnes qui doivent être sélectionnées.

Pour faire référence à une colonne, créez un objet Column en appelant la fonction col dans l’objet com.snowflake.snowpark.functions.

// Import the col function from the functions object.
import com.snowflake.snowpark.functions._

val dfProductInfo = session.table("products").select(col("id"), col("name"))

Note

Pour créer un objet Column pour un littéral, voir Utilisation de littéraux comme objets de colonne.

Lorsque vous spécifiez un filtre, une projection, une condition de jointure, etc., vous pouvez utiliser des objets Column dans une expression. L’exemple suivant utilise les objets Column dans les expressions pour :

  • Récupérer les lignes où la valeur de la colonne id est 20 et où la somme des valeurs des colonnes a et b est inférieure à 10.

  • Renvoyer la valeur de b multipliée par 10 dans la colonne nommée c. c est un alias de colonne qui est utilisé dans l’instruction suivante, qui joint la colonne DataFrame.

  • Joindre les DataFrame df avec les DataFrame dfCompute calculés.

val dfCompute = session.table("T").filter(col("id") === 20).filter((col("a") + col("b")) < 10).select((col("b") * 10) as "c")
val df2 = df.join(dfCompute, col("a") === col("c") && col("a") === col("d"))

Lorsque vous faites référence à des colonnes dans deux objets DataFrame différents qui ont le même nom (par exemple, en joignant les DataFrames sur cette colonne), vous pouvez utiliser la méthode DataFrame.col dans un objet DataFrame pour faire référence à une colonne dans cet objet (par exemple, df1.col("name") et df2.col("name")).

L’exemple suivant montre comment utiliser la méthode DataFrame.col pour faire référence à une colonne dans un DataFrame spécifique. L’exemple joint deux objets DataFrame qui ont tous deux une colonne nommée key. L’exemple utilise la méthode Column.as pour modifier les noms des colonnes dans le DataFrame nouvellement créé.

// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.col method to refer to the columns used in the join.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("key") === dfRhs.col("key")).select(dfLhs.col("value").as("L"), dfRhs.col("value").as("R"))

Comme alternative à la méthode DataFrame.col, vous pouvez utiliser la méthode DataFrame.apply pour faire référence à une colonne dans un DataFrame spécifique. Comme la méthode DataFrame.col, la méthode DataFrame.apply accepte un nom de colonne en entrée et renvoie un objet Column.

Notez que lorsqu’un objet possède une méthode apply en Scala, vous pouvez appeler la méthode apply en appelant l’objet comme s’il s’agissait d’une fonction. Par exemple, pour appeler df.apply("column_name"), vous pouvez simplement écrire df("column_name"). Les appels suivants sont équivalents :

  • df.col("<nom_colonne>")

  • df.apply("<nom_colonne>")

  • df("<nom_colonne>")

L’exemple suivant est identique à l’exemple précédent mais utilise la méthode DataFrame.apply pour faire référence aux colonnes dans une opération de jointure :

// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.apply method to refer to the columns used in the join.
// Note that dfLhs("key") is shorthand for dfLhs.apply("key").
val dfJoined = dfLhs.join(dfRhs, dfLhs("key") === dfRhs("key")).select(dfLhs("value").as("L"), dfRhs("value").as("R"))

Utilisation de Shorthand pour un objet de colonne

Au lieu d’utiliser la fonction col, vous pouvez faire référence à une colonne de l’une des manières suivantes :

  • Utilisez un signe dollar devant le nom de la colonne entre guillemets simples ($"column_name").

  • Utilisez une apostrophe (un guillemet simple) devant le nom de la colonne non mise entre guillemets simples ('column_name).

Pour ce faire, importez les noms de l’objet implicits après avoir créé un objet Session :

val session = Session.builder.configFile("/path/to/properties").create

// Import this after you create the session.
import session.implicits._

// Use the $ (dollar sign) shorthand.
val df = session.table("T").filter($"id" === 10).filter(($"a" + $"b") < 10)

// Use ' (apostrophe) shorthand.
val df = session.table("T").filter('id === 10).filter(('a + 'b) < 10).select('b * 10)

Utilisation de guillemets doubles autour des identificateurs d’objets (noms de tables, noms de colonnes, etc.)

Les noms des bases de données, des schémas, des tables et des zones de préparation que vous spécifiez doivent être conformes aux exigences de l’identificateur Snowflake. Lorsque vous spécifiez un nom, Snowflake considère que le nom est en majuscules. Par exemple, les appels suivants sont équivalents :

// The following calls are equivalent:
df.select(col("id123"))
df.select(col("ID123"))

Si le nom n’est pas conforme aux exigences de l’identificateur, vous devez utiliser des guillemets doubles (") autour du nom. Utilisez une barre oblique inverse (\) pour échapper le caractère de guillemet double dans un littéral de chaîne Scala. Par exemple, le nom de la table suivante ne commence pas par une lettre ou un trait de soulignement, vous devez donc utiliser des guillemets autour du nom :

val df = session.table("\"10tablename\"")

Notez que lorsque vous spécifiez le nom d’une colonne, vous n’avez pas besoin d’utiliser des guillemets doubles autour du nom. La bibliothèque Snowpark met automatiquement le nom de la colonne entre guillemets doubles pour vous si le nom ne respecte pas les exigences d’identification :

// The following calls are equivalent:
df.select(col("3rdID"))
df.select(col("\"3rdID\"))

// The following calls are equivalent:
df.select(col("id with space"))
df.select(col("\"id with space\""))

Si vous avez déjà ajouté des guillemets autour d’un nom de colonne, la bibliothèque n’insère pas de guillemets doubles supplémentaires autour du nom.

Dans certains cas, le nom de la colonne peut contenir des caractères guillemets doubles :

describe table quoted;
+------------------------+ ...
| name                   | ...
|------------------------+ ...
| name_with_"air"_quotes | ...
| "column_name_quoted"   | ...
+------------------------+ ...

Comme expliqué dans Exigences relatives à l’identificateur, pour chaque caractère entre guillemets doubles dans un identificateur entre guillemets doubles, vous devez utiliser deux caractères de guillemets doubles (par exemple, "name_with_""air""_quotes" et """column_name_quoted""") :

val dfTable = session.table("quoted")
dfTable.select("\"name_with_\"\"air\"\"_quotes\"").show()
dfTable.select("\"\"\"column_name_quoted\"\"\"").show()

N’oubliez pas que lorsqu’un identificateur est délimité par des guillemets doubles (que vous ayez explicitement ajouté des guillemets ou que la bibliothèque les ait ajoutés pour vous), Snowflake traite l’identificateur comme sensible à la casse :

// The following calls are NOT equivalent!
// The Snowpark library adds double quotes around the column name,
// which makes Snowflake treat the column name as case-sensitive.
df.select(col("id with space"))
df.select(col("ID WITH SPACE"))

Utilisation de littéraux comme objets de colonne

Pour utiliser un littéral dans une méthode qui transmet un objet Column, créez un objet Column pour le littéral en transmettant le littéral à la fonction lit dans l’objet com.snowflake.snowpark.functions. Par exemple :

// Import for the lit and col functions.
import com.snowflake.snowpark.functions._

// Show the first 10 rows in which num_items is greater than 5.
// Use `lit(5)` to create a Column object for the literal 5.
df.filter(col("num_items").gt(lit(5))).show()

Si le littéral est une valeur à virgule flottante ou double en Scala (par exemple, 0.05 est traité comme un double par défaut), la bibliothèque Snowpark génère du SQL qui convertit implicitement la valeur en type de données Snowpark correspondant (par exemple, 0.05::DOUBLE). Cela peut produire une valeur approximative qui diffère du nombre exact spécifié.

Par exemple, le code suivant n’affiche aucune ligne correspondante, alors que le filtre (qui correspond aux valeurs supérieures ou égales à 0.05) devrait correspondre aux lignes de DataFrame :

// Create a DataFrame that contains the value 0.05.
val df = session.sql("select 0.05 :: Numeric(5, 2) as a")

// Applying this filter results in no matching rows in the DataFrame.
df.filter(col("a") <= lit(0.06) - lit(0.01)).show()

Le problème est que lit(0.06) et lit(0.01) produisent des valeurs approximatives pour 0.06 et 0.01, et non les valeurs exactes.

Pour éviter ce problème, vous pouvez utiliser l’une des approches suivantes :

  • Option 1 : convertir le littéral dans le type Snowpark que vous voulez utiliser. Par exemple, pour utiliser un NUMBER avec une précision de 5 et une échelle de 2 :

    df.filter(col("a") <= lit(0.06).cast(new DecimalType(5, 2)) - lit(0.01).cast(new DecimalType(5, 2))).show()
    
  • Option 2 : convertir la valeur dans le type que vous souhaitez utiliser avant de la transmettre à la fonction lit. Par exemple, si vous voulez utiliser le type BigDecimal :

    df.filter(col("a") <= lit(BigDecimal(0.06)) - lit(BigDecimal(0.01))).show()
    

Conversion d’un objet de colonne en un type spécifique

Pour convertir un objet column en un type spécifique, appelez la méthode cast et transmettez un objet de type à partir du package com.snowflake.snowpark.types. Par exemple, pour convertir un littéral en NUMBER avec une précision de 5 et une échelle de 2 :

// Import for the lit function.
import com.snowflake.snowpark.functions._
// Import for the DecimalType class..
import com.snowflake.snowpark.types._

val decimalValue = lit(0.05).cast(new DecimalType(5,2))

Chaîner les appels de méthode

Parce que chaque méthode qui transforme un objet DataFrame renvoie un nouvel objet DataFrame auquel la transformation a été appliquée, vous pouvez chaîner les appels de méthode pour produire un nouvel objet DataFrame qui est transformé de manière supplémentaire.

L’exemple suivant renvoie un DataFrame qui est configuré pour :

  • Interroger la table products.

  • Retourner la ligne avec id = 1.

  • Sélectionner les colonnes name et serial_number.

val dfProductInfo = session.table("products").filter(col("id") === 1).select(col("name"), col("serial_number"))

Dans cet exemple :

  • session.table("products") retourne un DataFrame pour la table products.

    Bien que le DataFrame ne contienne pas encore les données de la table, l’objet contient les définitions des colonnes de la table.

  • filter(col("id") === 1) retourne un DataFrame pour la table products qui est configurée pour retourner la ligne avec id = 1.

    Notez à nouveau que le DataFrame ne contient pas encore la ligne correspondante de la table. La ligne correspondante n’est pas récupérée avant que vous appeliez une méthode d’action.

  • select(col("name"), col("serial_number")) renvoie un DataFrame qui contient les colonnes name et serial_number de la ligne de la table products qui a id = 1.

Lorsque vous enchaînez des appels de méthodes, n’oubliez pas que l’ordre des appels est important. Chaque appel de méthode renvoie un DataFrame qui a été transformé. Assurez-vous que les appels ultérieurs fonctionnent avec le DataFrame transformé.

Par exemple, dans le code ci-dessous, la méthode select renvoie un DataFrame qui ne contient que deux colonnes : name et serial_number. L’appel de la méthode filter sur ce DataFrame échoue car il utilise la colonne id, qui ne se trouve pas dans le DataFrame transformé.

// This fails with the error "invalid identifier 'ID'."
val dfProductInfo = session.table("products").select(col("name"), col("serial_number")).filter(col("id") === 1)

En revanche, le code suivant s’exécute avec succès car la méthode filter() est appelée sur un DataFrame qui contient toutes les colonnes de la table products (y compris la colonne id) :

// This succeeds because the DataFrame returned by the table() method
// includes the id column.
val dfProductInfo = session.table("products").filter(col("id") === 1).select(col("name"), col("serial_number"))

N’oubliez pas que vous devrez peut-être effectuer les appels de méthode select et filter dans un ordre différent de celui dans lequel vous utiliseriez les mots-clés équivalents (SELECT et WHERE) dans une instruction SQL.

Récupération des définitions de colonnes

Pour récupérer la définition des colonnes dans le jeu de données pour le DataFrame, appelez la méthode schema. Cette méthode renvoie un objet StructType qui contient un Array d’objets StructField. Chaque objet StructField contient la définition d’une colonne.

// Get the StructType object that describes the columns in the
// underlying rowset.
val dfDefinition = session.table("products").schema

Dans l’objet StructType retourné, les noms de colonnes sont toujours normalisés. Les identificateurs non mis entre guillemets simples sont renvoyés en majuscules, et les identificateurs mis entre guillemets simples sont renvoyés dans la casse exacte dans laquelle ils ont été définis.

L’exemple suivant renvoie un DataFrame contenant les colonnes nommées ID et 3rd. Pour le nom de colonne 3rd, la bibliothèque Snowpark place automatiquement le nom entre guillemets doubles ("3rd") car le nom ne répond pas aux exigences d’un identificateur.

L’exemple appelle la méthode schema, puis la méthode names sur l’objet StructType renvoyé pour obtenir un Seq de noms de colonnes. Les noms sont normalisés dans les StructType retournés par la méthode schema.

// This returns Seq("ID", "\"3rd\"")
df.select(col("id"), col("3rd")).schema.names.toSeq

Exécution d’une action pour évaluer un DataFrame

Comme nous l’avons mentionné précédemment, le DataFrame est évalué de façon « lazy », ce qui signifie que l’instruction SQL n’est pas envoyée au serveur pour être exécutée tant que vous n’avez pas effectué une action. Une action provoque l’évaluation du DataFrame et envoie l’instruction SQL correspondante au serveur pour exécution.

Dans cette version, les méthodes suivantes exécutent une action :

Classe

Méthode

Description

DataFrame

collect

Évalue les DataFrame et renvoie le jeu de données résultant sous la forme d’un Array d’objets Row.

DataFrame

count

Évalue le DataFrame et retourne le nombre de lignes.

DataFrame

show

Évalue le DataFrame et imprime les lignes dans la console. Notez que cette méthode limite le nombre de lignes à 10 (par défaut).

DataFrameWriter

saveAsTable

Sauvegarde les données du DataFrame dans la table spécifiée. Voir Sauvegarde des données dans une table.

Updatable

delete

Supprime les lignes de la table spécifiée. Voir Mise à jour, suppression et fusion des lignes d’une table.

Updatable

update

Met à jour les lignes de la table spécifiée. Voir Mise à jour, suppression et fusion des lignes d’une table.

MergeBuilder

collect

Fusionne les lignes dans la table spécifiée. Voir Mise à jour, suppression et fusion des lignes d’une table.

Par exemple, pour exécuter une requête sur une table et renvoyer les résultats, appelez la méthode collect :

// Create a DataFrame for the row in the "products" table with the id 1.
// This does not execute the query.
val dfProductIdOne = session.table("products").filter(col("id") === 1)

// Send the query to the server for execution and
// return an Array of Rows containing the results.
val results = dfProductIdOne.collect()

Pour exécuter la requête et renvoyer le nombre de résultats, appelez la méthode count :

// Create a DataFrame for the "products" table.
val dfProducts = session.table("products")

// Send the query to the server for execution and
// return the count of rows in the table.
val resultCount = dfProducts.count()

Pour exécuter une requête et imprimer les résultats dans la console, appelez la méthode show :

// Create a DataFrame for the "products" table.
val dfProducts = session.table("products")

// Send the query to the server for execution and
// print the results to the console.
// The query limits the number of rows to 10 by default.
dfProducts.show()

// Limit the number of rows to 20, rather than 10.
dfProducts.show(20)

Remarque : si vous appelez la méthode schema pour obtenir les définitions des colonnes dans DataFrame, vous n’avez pas besoin d’appeler une méthode d’action.

Mise à jour, suppression et fusion des lignes d’une table

Note

Cette fonctionnalité a été introduite dans Snowpark 0.7.0.

Lorsque vous appelez Session.table pour créer un objet DataFrame pour une table, la méthode renvoie un objet Updatable qui étend DataFrame avec des méthodes supplémentaires pour mettre à jour et supprimer des données dans la table. (Voir Updatable.)

Si vous devez mettre à jour ou supprimer des lignes dans une table, vous pouvez utiliser les méthodes suivantes de la classe Updatable :

Mise à jour des lignes dans une table

Pour la méthode update on transmet un Map qui associe les colonnes à mettre à jour et les valeurs correspondantes à affecter à ces colonnes. update renvoie un objet UpdateResult qui contient le nombre de lignes mises à jour. (Voir UpdateResult.)

Note

update est une méthode d’action, ce qui signifie que l’appel de la méthode envoie des instructions SQL au serveur pour exécution.

Par exemple, pour remplacer les valeurs de la colonne nommée count par la valeur 1 :

val updatableDf = session.table("products")
val updateResult = updatableDf.update(Map("count" -> lit(1)))
println(s"Number of rows updated: ${updateResult.rowsUpdated}")

L’exemple ci-dessus utilise le nom de la colonne pour identifier la colonne. Vous pouvez également utiliser une expression de colonne :

val updateResult = updatableDf.update(Map(col("count") -> lit(1)))

Si la mise à jour ne doit être effectuée que lorsqu’une condition est remplie, vous pouvez spécifier cette condition comme argument. Par exemple, pour remplacer les valeurs de la colonne nommée count pour les lignes dans lesquelles la colonne category_id a la valeur 20 :

val updateResult = updatableDf.update(Map(col("count") -> lit(1)), col("category_id") === 20)

Si vous devez baser la condition sur une jointure avec un objet DataFrame différent, vous pouvez transmettre ce DataFrame en tant qu’argument et utiliser ce DataFrame dans la condition. Par exemple, pour remplacer les valeurs de la colonne nommée count pour les lignes dans lesquelles la colonne category_id correspond à category_id dans les DataFrame dfPart :

val updatableDf = session.table("products")
val dfParts = session.table("parts")
val updateResult = updatableDf.update(Map(col("count") -> lit(1)), updatableDf("category_id") === dfParts("category_id"))

Suppression de lignes dans une table

Pour la méthode delete vous pouvez spécifier une condition qui identifie les lignes à supprimer, et vous pouvez baser cette condition sur une jointure avec un autre DataFrame. delete renvoie un objet DeleteResult qui contient le nombre de lignes supprimées. (Voir DeleteResult.)

Note

delete est une méthode d’action, ce qui signifie que l’appel de la méthode envoie des instructions SQL au serveur pour exécution.

Par exemple, pour supprimer les lignes dans lesquelles la colonne category_id correspond à category_id dans DataFrame dfPart :

val updatableDf = session.table("products")
val deleteResult = updatableDf.delete(updatableDf("category_id") === dfParts("category_id"))
println(s"Number of rows deleted: ${deleteResult.rowsDeleted}")

Fusion de lignes dans une table

Pour insérer, mettre à jour et supprimer des lignes dans une table en fonction des valeurs d’une deuxième table ou d’une sous-requête (l’équivalent de la commande MERGE en SQL), procédez comme suit :

  1. Dans l’objet Updatable de la table dans laquelle vous voulez fusionner les données, appelez la méthode merge en transmettant l’objet DataFrame de l’autre table et l’expression de la colonne pour la condition de jointure.

    Le résultat est un objet MergeBuilder que vous pouvez utiliser pour spécifier les actions à entreprendre (par exemple, insérer, mettre à jour ou supprimer) sur les lignes qui correspondent et celles qui ne correspondent pas. (Voir MergeBuilder.)

  2. En utilisant l’objet MergeBuilder :

    • Pour spécifier la mise à jour ou la suppression qui doit être effectuée sur les lignes correspondantes, appelez la méthode whenMatched.

      Si vous devez spécifier une condition supplémentaire pour que les lignes soient mises à jour ou supprimées, vous pouvez transmettre une expression de colonne pour cette condition.

      Cette méthode renvoie un objet MatchedClauseBuilder que vous pouvez utiliser pour spécifier l’action à exécuter. (Voir MatchedClauseBuilder.)

      Appelez la méthode update ou delete dans l’objet MatchedClauseBuilder pour spécifier l’action de mise à jour ou de suppression qui doit être exécutée sur les lignes correspondantes. Ces méthodes renvoient un objet MergeBuilder que vous pouvez utiliser pour spécifier des clauses supplémentaires.

    • Pour spécifier l’insertion qui doit être effectuée lorsque des lignes ne correspondent pas, appelez la méthode whenNotMatched.

      Si vous devez spécifier une condition supplémentaire pour l’insertion de lignes, vous pouvez transmettre une expression de colonne pour cette condition.

      Cette méthode renvoie un objet NotMatchedClauseBuilder que vous pouvez utiliser pour spécifier l’action à exécuter. (Voir NotMatchedClauseBuilder.)

      Appelez la méthode insert dans l’objet NotMatchedClauseBuilder pour spécifier l’action d’insertion qui doit être exécutée lorsque les lignes ne correspondent pas. Ces méthodes renvoient un objet MergeBuilder que vous pouvez utiliser pour spécifier des clauses supplémentaires.

  3. Lorsque vous avez fini de spécifier les insertions, mises à jour et suppressions à effectuer, appelez la méthode collect de l’objet MergeBuilder pour effectuer les insertions, mises à jour et suppressions spécifiées sur la table.

    collect renvoie un objet MergeResult qui contient le nombre de lignes qui ont été insérées, mises à jour et supprimées. (Voir MergeResult.)

L’exemple suivant insère une ligne avec les colonnes id et value de la table source dans la table target si la table target ne contient pas de ligne avec un ID correspondant :

val mergeResult = target.merge(source, target("id") === source("id"))
                      .whenNotMatched.insert(Seq(source("id"), source("value")))
                      .collect()

L’exemple suivant met à jour une ligne de la table target avec la valeur de la colonne value de la ligne de la table source qui a le même ID :

val mergeResult = target.merge(source, target("id") === source("id"))
                      .whenMatched.update(Map("value" -> source("value")))
                      .collect()

Sauvegarde des données dans une table

Pour enregistrer le contenu d’un DataFrame dans une table :

  1. Appelez la méthode write pour obtenir un objet DataFrameWriter.

  2. Appelez la méthode mode dans l’objet DataFrameWriter et précisez si vous voulez insérer des lignes ou mettre à jour des lignes dans la table. Cette méthode renvoie un nouvel objet DataFrameWriter qui est configuré avec le mode spécifié.

  3. Appelez la méthode saveToTable dans l’objet DataFrameWriter pour sauvegarder le contenu du DataFrame dans une table spécifiée.

Notez que vous n’avez pas besoin d’appeler une méthode distincte (par exemple collect) pour exécuter l’instruction SQL qui enregistre les données dans la table.

Par exemple :

df.write.mode(SaveMode.Overwrite).saveAsTable(tableName)

Création d’une vue à partir d’un DataFrame

Pour créer une vue à partir d’un DataFrame, appelez la méthode createOrReplaceView :

df.createOrReplaceView("db.schema.viewName")

Notez que l’appel à createOrReplaceView crée immédiatement la nouvelle vue. Plus important encore, il ne provoque pas l’évaluation du DataFrame. (Le DataFrame lui-même n’est pas évalué avant que vous réalisiez une action).

Les vues que vous créez en appelant createOrReplaceView sont persistantes. Si vous n’avez plus besoin de cette vue, vous pouvez détruire la vue manuellement.

Travailler avec des fichiers dans une zone de préparation

Cette section explique comment interroger les données d’un fichier dans une zone de préparation Snowflake. Pour les autres opérations sur les fichiers, utilisez des instructions SQL.

Pour interroger des données dans des fichiers dans une zone de préparation Snowflake, utilisez la classe DataFrameReader :

  1. Appelez la méthode read de la classe Session pour accéder à un objet DataFrameReader.

  2. Si les fichiers sont au format CSV, décrivez les champs du fichier. Pour ce faire :

    1. Créez un objet StructType composé d’une séquence d’objets StructField qui décrivent les champs du fichier.

    2. Pour chaque objet StructField, indiquez ce qui suit :

      • Nom du champ.

      • Le type de données du champ (spécifié comme un objet dans le package com.snowflake.snowpark.types).

      • Indique si le champ peut être « Null » ou non.

      Par exemple :

      import com.snowflake.snowpark.types._
      
      val schemaForDataFile = StructType(
          Seq(
              StructField("id", StringType, true),
              StructField("name", StringType, true)))
      
    3. Appelez la méthode schema dans l’objet DataFrameReader, en transmettant l’objet StructType.

      Par exemple :

      var dfReader = session.read.schema(schemaForDataFile)
      

      La méthode schema renvoie un objet DataFrameReader qui est configuré pour lire les fichiers contenant les champs spécifiés.

      Notez que vous n’avez pas besoin de faire cela pour les fichiers dans d’autres formats (tels que JSON). Pour ces fichiers, le DataFrameReader traite les données comme un champ unique de type VARIANT avec le nom de champ $1.

  3. Si vous devez spécifier des informations supplémentaires sur la manière dont les données doivent être lues (par exemple, que les données sont compressées ou qu’un fichier CSV utilise un point-virgule au lieu d’une virgule pour délimiter les champs), appelez la méthode options de l’objet DataFrameReader.

    Saisissez le nom et la valeur de l’option que vous souhaitez définir. Pour connaître les noms et les valeurs des options de format de fichier, consultez la documentation sur CREATE FILE FORMAT.

    Vous pouvez également définir les options de copie décrites dans la documentation COPY INTO TABLE. Notez que la définition des options de copie peut entraîner une stratégie d’exécution plus coûteuse lorsque vous récupérez les données dans le DataFrame.

    L’exemple suivant configure l’objet DataFrameReader pour interroger les données d’un fichier CSV qui n’est pas compressé et qui utilise un point-virgule comme délimiteur de champ.

    dfReader = dfReader.option("field_delimiter", ";").option("COMPRESSION", "NONE")
    

    La méthode options renvoie un objet DataFrameReader qui est configuré avec les options spécifiées.

  4. Appelez la méthode correspondant au format du fichier (par exemple, la méthode csv), en transmettant l’emplacement du fichier.

    val df = dfReader.csv("@s3_ts_stage/emails/data_0_0_0.csv")
    

    Les méthodes correspondant au format d’un fichier renvoient un objet DataFrame qui est configuré pour contenir les données de ce fichier.

  5. Utilisez les méthodes de l’objet DataFrame pour effectuer toute transformation nécessaire sur l’ensemble de données (par exemple, sélection de champs spécifiques, filtrage des lignes, etc.)

    Par exemple, pour extraire l’élément color d’un fichier JSON dans la zone de préparation nommée mystage :

    // Import the sqlExpr function from the functions object.
    import com.snowflake.snowpark.functions._
    
    val df = session.read.json("@mystage").select(sqlExpr("$1:color"))
    

    Comme expliqué précédemment, pour les fichiers dans des formats autres que CSV (par exemple JSON), le DataFrameReader traite les données du fichier comme une seule colonne VARIANT portant le nom $1.

    Cet exemple utilise la fonction sqlExpr dans l’objet com.snowflake.snowpark.functions pour spécifier le chemin d’accès à l’élément color.

    Notez que la fonction sqlExpr n’interprète ni ne modifie l’argument d’entrée. La fonction vous permet simplement de construire des expressions et des snippets dans SQL qui ne sont pas encore pris en charge par l’API Snowpark.

  6. Appelez une méthode d’action pour interroger les données dans le fichier.

    Comme c’est le cas avec les DataFrames pour les tables, les données ne sont pas récupérées dans les DataFrame avant que vous n’appeliez une méthode d’action.

Exécution d’instructions SQL

Pour exécuter une instruction SQL que vous spécifiez, appelez la méthode sql de la classe Session, et transmettez l’instruction à exécuter. Cette méthode renvoie un DataFrame.

Notez que l’instruction SQL ne sera pas exécutée tant que vous n’aurez pas appelé une méthode d’action.

// Get the list of the files in a stage.
// The collect() method causes this SQL statement to be executed.
val stageFilesDf = session.sql("ls @myStage").collect()

// Resume the operation of a warehouse.
// Note that you must call the collect method in order to execute
// the SQL statement.
session.sql("alter warehouse if exists myWarehouse resume if suspended").collect()

val tableDf = session.table("table").select(col("a"), col("b"))
// Get the count of rows from the table.
val numRows = tableDf.count()

// Set up a SQL statement to copy data from a stage to a table.
val copyDf = session.sql("copy into myTable from @myStage file_format=(type = csv)").collect()

Si vous voulez appeler des méthodes pour transformer les DataFrame (par exemple, filtrer, sélectionner, etc.), notez que ces méthodes ne fonctionnent que si l’instruction SQL sous-jacente est une instruction SELECT. Les méthodes de transformation ne sont pas prises en charge pour les autres types d’instructions SQL.

val df = session.sql("select a, c from table where b < 1")
// Because the underlying SQL statement for the DataFrame is a SELECT statement,
// you can call the filter method to transform this DataFrame.
val results = df.filter(col("c") < 10).select(col("a")).collect()

// In this example, the underlying SQL statement is not a SELECT statement.
val df = session.sql("ls @myStage")
// Calling the filter method results in an error.
df.filter(...)