Utilisation de DataFrames dans Snowpark Scala

Dans Snowpark, le principal moyen par lequel vous interrogez et traitez les données est un DataFrame. Cette rubrique explique comment travailler avec des DataFrames.

Dans ce chapitre :

Pour récupérer et manipuler des données, vous utilisez la classe DataFrame. Un DataFrame représente un ensemble de données relationnelles qui est évalué de façon « lazy » : il ne s’exécute que lorsqu’une action spécifique est déclenchée. En un sens, un DataFrame est comme une requête qui doit être évaluée afin d’extraire des données.

Pour récupérer des données dans un DataFrame :

  1. Construit un DataFrame, en spécifiant la source des données pour l’ensemble de données.

    Par exemple, vous pouvez créer un DataFrame pour contenir les données d’une table, d’un fichier CSV externe ou de l’exécution d’une instruction SQL.

  2. Spécifier comment l’ensemble de données dans le DataFrame doit être transformé.

    Par exemple, vous pouvez spécifier quelles colonnes doivent être sélectionnées, comment les lignes doivent être filtrées, comment les résultats doivent être triés et regroupés, etc.

  3. Exécuter l’instruction pour récupérer les données dans le DataFrame.

    Pour récupérer les données dans le DataFrame, vous devez appeler une méthode qui exécute une action (par exemple, la méthode collect()).

Les sections suivantes expliquent ces étapes plus en détail.

Configuration des exemples pour cette section

Certains des exemples de cette section utilisent un DataFrame pour interroger une table nommée sample_product_data. Si vous voulez exécuter ces exemples, vous pouvez créer cette table et la remplir avec des données en exécutant les instructions SQL suivantes :

CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT);
INSERT INTO sample_product_data VALUES
    (1, 0, 5, 'Product 1', 'prod-1', 1, 10),
    (2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20),
    (3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30),
    (4, 0, 10, 'Product 2', 'prod-2', 2, 40),
    (5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50),
    (6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60),
    (7, 0, 20, 'Product 3', 'prod-3', 3, 70),
    (8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80),
    (9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90),
    (10, 0, 50, 'Product 4', 'prod-4', 4, 100),
    (11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100),
    (12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100);
Copy

Pour vérifier que la table a été créée, exécutez :

SELECT * FROM sample_product_data;
Copy

Construire un DataFrame

Pour construire un DataFrame, vous pouvez utiliser les méthodes de la classe Session. Chacune des méthodes suivantes construit un DataFrame à partir d’un type différent de source de données :

  • Pour créer un DataFrame à partir des données d’une table, d’une vue ou d’un flux, appelez la méthode table :

    // Create a DataFrame from the data in the "sample_product_data" table.
    val dfTable = session.table("sample_product_data")
    
    // To print out the first 10 rows, call:
    //   dfTable.show()
    
    Copy

    Note

    La méthode session.table renvoie un objet Updatable. Updatable étend DataFrame et fournit des méthodes supplémentaires pour travailler avec les données de la table (par exemple, des méthodes pour mettre à jour et supprimer des données). Voir Mise à jour, suppression et fusion des lignes d’une table.

  • Pour créer un DataFrame à partir d’une séquence de valeurs, appelez la méthode createDataFrame :

    // Create a DataFrame containing a sequence of values.
    // In the DataFrame, name the columns "i" and "s".
    val dfSeq = session.createDataFrame(Seq((1, "one"), (2, "two"))).toDF("i", "s")
    
    Copy

    Note

    Les mots réservés par Snowflake ne sont pas valables comme noms de colonnes lors de la construction d’un DataFrame. Pour la liste des mots réservés, voir Mots clés réservés et limités.

  • Pour créer un DataFrame contenant une plage de valeurs, appelez la méthode range :

    // Create a DataFrame from a range
    val dfRange = session.range(1, 10, 2)
    
    Copy
  • Pour créer un DataFrame pour un fichier dans une zone de préparation, appelez read pour obtenir un objet DataFrameReader. Dans l’objet DataFrameReader appelez la méthode correspondant au format des données du fichier :

    // Create a DataFrame from data in a stage.
    val dfJson = session.read.json("@mystage2/data1.json")
    
    Copy
  • Pour créer un DataFrame qui contiendra les résultats d’une requête SQL, appelez la méthode sql :

    // Create a DataFrame from a SQL query
    val dfSql = session.sql("SELECT name from products")
    
    Copy

    Remarque : bien que vous puissiez utiliser cette méthode pour exécuter des instructions SELECT qui récupèrent des données dans des tables et des fichiers en zone de préparation, vous devriez plutôt utiliser les méthodes table et read. Les méthodes table et read peuvent fournir une meilleure coloration syntaxique, une meilleure mise en évidence des erreurs et une meilleure complétion de code intelligente dans les outils de développement.

Spécifier comment l’ensemble de données doit être transformé

Pour spécifier quelles colonnes doivent être sélectionnées et comment les résultats doivent être filtrés, triés, groupés, etc., appelez les méthodes DataFrame qui transforment l’ensemble de données. Pour identifier les colonnes dans ces méthodes, utilisez la fonction col ou une expression qui évalue une colonne. (Voir Spécification des colonnes et des expressions.)

Par exemple :

  • Pour spécifier les lignes à renvoyer, appelez la méthode filter :

    // Import the col function from the functions object.
    import com.snowflake.snowpark.functions._
    
    // Create a DataFrame for the rows with the ID 1
    // in the "sample_product_data" table.
    //
    // This example uses the === operator of the Column object to perform an
    // equality check.
    val df = session.table("sample_product_data").filter(col("id") === 1)
    df.show()
    
    Copy
  • Pour spécifier les colonnes qui doivent être sélectionnées, appelez la méthode select :

    // Import the col function from the functions object.
    import com.snowflake.snowpark.functions._
    
    // Create a DataFrame that contains the id, name, and serial_number
    // columns in te "sample_product_data" table.
    val df = session.table("sample_product_data").select(col("id"), col("name"), col("serial_number"))
    df.show()
    
    Copy

Chaque méthode renvoie un nouvel objet DataFrame qui a été transformé. (La méthode n’affecte pas l’objet DataFrame d’origine.) Cela signifie que si vous souhaitez appliquer plusieurs transformations, vous pouvez enchaîner les appels de méthode, en appelant chaque méthode de transformation suivante sur le nouvel objet DataFrame renvoyé par l’appel de méthode précédent.

Notez que ces méthodes de transformation ne récupèrent pas les données de la base de données Snowflake. (Les méthodes d’action décrites dans Exécution d’une action pour évaluer un DataFrame effectuent la récupération des données). Les méthodes de transformation spécifient simplement comment l’instruction SQL doit être construite.

Spécification des colonnes et des expressions

Lorsque vous appelez ces méthodes de transformation, vous pouvez avoir besoin de spécifier des colonnes ou des expressions qui utilisent des colonnes. Par exemple, lorsque vous appelez la méthode select, vous devez spécifier les colonnes qui doivent être sélectionnées.

Pour faire référence à une colonne, créez un objet Colonne en appelant la fonction col dans l’objet com.snowflake.snowpark.functions.

// Import the col function from the functions object.
import com.snowflake.snowpark.functions._

val dfProductInfo = session.table("sample_product_data").select(col("id"), col("name"))
dfProductInfo.show()
Copy

Note

Pour créer un objet Column pour un littéral, voir Utilisation de littéraux comme objets de colonne.

Lorsque vous spécifiez un filtre, une projection, une condition de jointure, etc., vous pouvez utiliser des objets Column dans une expression. Par exemple :

  • Vous pouvez utiliser des objets Column avec la méthode filter pour spécifier une condition de filtrage :

    // Specify the equivalent of "WHERE id = 20"
    // in an SQL SELECT statement.
    df.filter(col("id") === 20)
    
    Copy
    // Specify the equivalent of "WHERE a + b < 10"
    // in an SQL SELECT statement.
    df.filter((col("a") + col("b")) < 10)
    
    Copy
  • Vous pouvez utiliser les objets Column avec la méthode select pour définir un alias :

    // Specify the equivalent of "SELECT b * 10 AS c"
    // in an SQL SELECT statement.
    df.select((col("b") * 10) as "c")
    
    Copy
  • Vous pouvez utiliser les objets Column avec la méthode join pour définir une condition de jointure :

    // Specify the equivalent of "X JOIN Y on X.a_in_X = Y.b_in_Y"
    // in an SQL SELECT statement.
    dfX.join(dfY, col("a_in_X") === col("b_in_Y"))
    
    Copy

Se référer à des colonnes dans différents DataFrames

Lorsque vous faites référence à des colonnes dans deux objets DataFrame différents qui ont le même nom (par exemple, en joignant les DataFrames sur cette colonne), vous pouvez utiliser la méthode DataFrame.col dans un objet DataFrame pour faire référence à une colonne dans cet objet (par exemple, df1.col("name") et df2.col("name")).

L’exemple suivant montre comment utiliser la méthode DataFrame.col pour faire référence à une colonne dans un DataFrame spécifique. L’exemple joint deux objets DataFrame qui ont tous deux une colonne nommée key. L’exemple utilise la méthode Column.as pour modifier les noms des colonnes dans le DataFrame nouvellement créé.

// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.col method to refer to the columns used in the join.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("key") === dfRhs.col("key")).select(dfLhs.col("value").as("L"), dfRhs.col("value").as("R"))
Copy

Utilisation de la méthode apply pour faire référence à une colonne

Comme alternative à la méthode DataFrame.col, vous pouvez utiliser la méthode DataFrame.apply pour faire référence à une colonne dans un DataFrame spécifique. Comme la méthode DataFrame.col , la méthode DataFrame.apply accepte un nom de colonne en entrée et renvoie un objet Column.

Notez que lorsqu’un objet possède une méthode apply en Scala, vous pouvez appeler la méthode apply en appelant l’objet comme s’il s’agissait d’une fonction. Par exemple, pour appeler df.apply("column_name"), vous pouvez simplement écrire df("column_name"). Les appels suivants sont équivalents :

  • df.col("<nom_colonne>")

  • df.apply("<nom_colonne>")

  • df("<nom_colonne>")

L’exemple suivant est identique à l’exemple précédent mais utilise la méthode DataFrame.apply pour faire référence aux colonnes dans une opération de jointure :

// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.apply method to refer to the columns used in the join.
// Note that dfLhs("key") is shorthand for dfLhs.apply("key").
val dfJoined = dfLhs.join(dfRhs, dfLhs("key") === dfRhs("key")).select(dfLhs("value").as("L"), dfRhs("value").as("R"))
Copy

Utilisation de Shorthand pour un objet de colonne

Au lieu d’utiliser la fonction col, vous pouvez faire référence à une colonne de l’une des manières suivantes :

  • Utilisez un signe dollar devant le nom de la colonne entre guillemets simples ($"column_name").

  • Utilisez une apostrophe (un guillemet simple) devant le nom de la colonne non mise entre guillemets simples ('column_name).

Pour ce faire, importez les noms de l’objet implicits après avoir créé un objet Session :

val session = Session.builder.configFile("/path/to/properties").create

// Import this after you create the session.
import session.implicits._

// Use the $ (dollar sign) shorthand.
val df = session.table("T").filter($"id" === 10).filter(($"a" + $"b") < 10)

// Use ' (apostrophe) shorthand.
val df = session.table("T").filter('id === 10).filter(('a + 'b) < 10).select('b * 10)
Copy

Utilisation de guillemets doubles autour des identificateurs d’objets (noms de tables, noms de colonnes, etc.)

Les noms des bases de données, des schémas, des tables et des zones de préparation que vous spécifiez doivent être conformes aux exigences de l’identificateur Snowflake. Lorsque vous spécifiez un nom, Snowflake considère que le nom est en majuscules. Par exemple, les appels suivants sont équivalents :

// The following calls are equivalent:
df.select(col("id123"))
df.select(col("ID123"))
Copy

Si le nom n’est pas conforme aux exigences de l’identificateur, vous devez utiliser des guillemets doubles (") autour du nom. Utilisez une barre oblique inverse (\) pour échapper le caractère de guillemet double dans un littéral de chaîne Scala. Par exemple, le nom de la table suivante ne commence pas par une lettre ou un trait de soulignement, vous devez donc utiliser des guillemets autour du nom :

val df = session.table("\"10tablename\"")
Copy

Notez que lorsque vous spécifiez le nom d’une colonne, vous n’avez pas besoin d’utiliser des guillemets doubles autour du nom. La bibliothèque Snowpark met automatiquement le nom de la colonne entre guillemets doubles pour vous si le nom ne respecte pas les exigences d’identificateur :

// The following calls are equivalent:
df.select(col("3rdID"))
df.select(col("\"3rdID\""))

// The following calls are equivalent:
df.select(col("id with space"))
df.select(col("\"id with space\""))
Copy

Si vous avez déjà ajouté des guillemets autour d’un nom de colonne, la bibliothèque n’insère pas de guillemets doubles supplémentaires autour du nom.

Dans certains cas, le nom de la colonne peut contenir des caractères guillemets doubles :

describe table quoted;
+------------------------+ ...
| name                   | ...
|------------------------+ ...
| name_with_"air"_quotes | ...
| "column_name_quoted"   | ...
+------------------------+ ...
Copy

Comme expliqué dans Exigences relatives à l’identificateur, pour chaque caractère entre guillemets doubles dans un identificateur entre guillemets doubles, vous devez utiliser deux caractères de guillemets doubles (par exemple, "name_with_""air""_quotes" et """column_name_quoted""") :

val dfTable = session.table("quoted")
dfTable.select("\"name_with_\"\"air\"\"_quotes\"").show()
dfTable.select("\"\"\"column_name_quoted\"\"\"").show()
Copy

N’oubliez pas que lorsqu’un identificateur est délimité par des guillemets doubles (que vous ayez explicitement ajouté des guillemets ou que la bibliothèque les ait ajoutés pour vous), Snowflake traite l’identificateur comme sensible à la casse :

// The following calls are NOT equivalent!
// The Snowpark library adds double quotes around the column name,
// which makes Snowflake treat the column name as case-sensitive.
df.select(col("id with space"))
df.select(col("ID WITH SPACE"))
Copy

Utilisation de littéraux comme objets de colonne

Pour utiliser un littéral dans une méthode qui transmet un objet Column , créez un objet Column pour le littéral en transmettant le littéral à la fonction lit dans l’objet com.snowflake.snowpark.functions. Par exemple :

// Import for the lit and col functions.
import com.snowflake.snowpark.functions._

// Show the first 10 rows in which num_items is greater than 5.
// Use `lit(5)` to create a Column object for the literal 5.
df.filter(col("num_items").gt(lit(5))).show()
Copy

Si le littéral est une valeur à virgule flottante ou double en Scala (par exemple, 0.05 est traité comme un double par défaut), la bibliothèque Snowpark génère du SQL qui convertit implicitement la valeur en type de données Snowpark correspondant (par exemple, 0.05::DOUBLE). Cela peut produire une valeur approximative qui diffère du nombre exact spécifié.

Par exemple, le code suivant n’affiche aucune ligne correspondante, alors que le filtre (qui correspond aux valeurs supérieures ou égales à 0.05) devrait correspondre aux lignes de DataFrame :

// Create a DataFrame that contains the value 0.05.
val df = session.sql("select 0.05 :: Numeric(5, 2) as a")

// Applying this filter results in no matching rows in the DataFrame.
df.filter(col("a") <= lit(0.06) - lit(0.01)).show()
Copy

Le problème est que lit(0.06) et lit(0.01) produisent des valeurs approximatives pour 0.06 et 0.01, et non les valeurs exactes.

Pour éviter ce problème, vous pouvez utiliser l’une des approches suivantes :

  • Option 1 : convertir le littéral dans le type Snowpark que vous voulez utiliser. Par exemple, pour utiliser un NUMBER avec une précision de 5 et une échelle de 2 :

    df.filter(col("a") <= lit(0.06).cast(new DecimalType(5, 2)) - lit(0.01).cast(new DecimalType(5, 2))).show()
    
    Copy
  • Option 2 : convertir la valeur dans le type que vous souhaitez utiliser avant de la transmettre à la fonction lit. Par exemple, si vous voulez utiliser le type BigDecimal :

    df.filter(col("a") <= lit(BigDecimal(0.06)) - lit(BigDecimal(0.01))).show()
    
    Copy

Conversion d’un objet de colonne en un type spécifique

Pour convertir un objet Column en un type spécifique, appelez la méthode Column.cast et transmettez un objet de type à partir du paquet com.snowflake.snowpark.types package. Par exemple, pour convertir un littéral en NUMBER avec une précision de 5 et une échelle de 2 :

// Import for the lit function.
import com.snowflake.snowpark.functions._
// Import for the DecimalType class..
import com.snowflake.snowpark.types._

val decimalValue = lit(0.05).cast(new DecimalType(5,2))
Copy

Chaîner les appels de méthode

Parce que chaque méthode qui transforme un objet DataFrame renvoie un nouvel objet DataFrame auquel la transformation a été appliquée, vous pouvez chaîner les appels de méthode pour produire un nouvel objet DataFrame qui est transformé de manière supplémentaire.

L’exemple suivant renvoie un DataFrame qui est configuré pour :

  • Interroger la table sample_product_data.

  • Retourner la ligne avec id = 1.

  • Sélectionner les colonnes name et serial_number.

val dfProductInfo = session.table("sample_product_data").filter(col("id") === 1).select(col("name"), col("serial_number"))
dfProductInfo.show()
Copy

Dans cet exemple :

  • session.table("sample_product_data") retourne un DataFrame pour la table sample_product_data.

    Bien que le DataFrame ne contienne pas encore les données de la table, l’objet contient les définitions des colonnes de la table.

  • filter(col("id") === 1) retourne un DataFrame pour la table sample_product_data qui est configurée pour retourner la ligne avec id = 1.

    Notez à nouveau que le DataFrame ne contient pas encore la ligne correspondante de la table. La ligne correspondante n’est pas récupérée avant que vous appeliez une méthode d’action.

  • select(col("name"), col("serial_number")) renvoie un DataFrame qui contient les colonnes name et serial_number de la ligne de la table sample_product_data qui a id = 1.

Lorsque vous enchaînez des appels de méthodes, n’oubliez pas que l’ordre des appels est important. Chaque appel de méthode renvoie un DataFrame qui a été transformé. Assurez-vous que les appels ultérieurs fonctionnent avec le DataFrame transformé.

Par exemple, dans le code ci-dessous, la méthode select renvoie un DataFrame qui ne contient que deux colonnes : name et serial_number. L’appel de la méthode filter sur ce DataFrame échoue, car il utilise la colonne id, qui ne se trouve pas dans le DataFrame transformé.

// This fails with the error "invalid identifier 'ID'."
val dfProductInfo = session.table("sample_product_data").select(col("name"), col("serial_number")).filter(col("id") === 1)
Copy

En revanche, le code suivant s’exécute avec succès car la méthode filter() est appelée sur un DataFrame qui contient toutes les colonnes de la table sample_product_data (y compris la colonne id) :

// This succeeds because the DataFrame returned by the table() method
// includes the "id" column.
val dfProductInfo = session.table("sample_product_data").filter(col("id") === 1).select(col("name"), col("serial_number"))
dfProductInfo.show()
Copy

N’oubliez pas que vous devrez peut-être effectuer les appels de méthode select et filter dans un ordre différent de celui dans lequel vous utiliseriez les mots-clés équivalents (SELECT et WHERE) dans une instruction SQL.

Limitation du nombre de lignes dans un DataFrame

Pour limiter le nombre de lignes dans un DataFrame, vous pouvez utiliser la méthode de transformation DataFrame.limit.

L’API de Snowpark fournit également des méthodes d’action pour récupérer et imprimer un nombre limité de lignes :

  • la méthode d’action DataFrame.first (pour exécuter la requête et renvoyer les n premières lignes)

  • la méthode d’action DataFrame.show (pour exécuter la requête et imprimer les n premières lignes)

Ces méthodes ajoutent effectivement une clause LIMIT à l’instruction SQL qui est exécutée.

Comme expliqué dans les notes sur l’utilisation de LIMIT, les résultats sont non-déterministes à moins que vous ne spécifiiez un ordre de tri (ORDER BY) en conjonction avec LIMIT.

Pour conserver la clause ORDER BY avec la clause LIMIT (par exemple, pour que ORDER BY ne soit pas dans une sous-requête séparée), vous devez appeler la méthode qui limite les résultats sur le DataFrame retourné par la méthode sort.

Par exemple, si vous enchaînez des appels de méthode :

// Limit the number of rows to 5, sorted by parent_id.
var dfSubset = df.sort(col("parent_id")).limit(5);

// Return the first 5 rows, sorted by parent_id.
var arrayOfRows = df.sort(col("parent_id")).first(5)

// Print the first 5 rows, sorted by parent_id.
df.sort(col("parent_id")).show(5)
Copy

Récupération des définitions de colonnes

Pour récupérer la définition des colonnes dans le jeu de données pour le DataFrame, appelez la méthode schema. Cette méthode renvoie un objet StructType qui contient un Array d’objets StructField. Chaque objet StructField contient la définition d’une colonne.

// Get the StructType object that describes the columns in the
// underlying rowset.
val tableSchema = session.table("sample_product_data").schema
println("Schema for sample_product_data: " + tableSchema);
Copy

Dans l’objet StructType retourné, les noms de colonnes sont toujours normalisés. Les identificateurs non mis entre guillemets simples sont renvoyés en majuscules, et les identificateurs mis entre guillemets simples sont renvoyés dans la casse exacte dans laquelle ils ont été définis.

L’exemple suivant crée un DataFrame contenant les colonnes nommées ID et 3rd. Pour le nom de colonne 3rd, la bibliothèque Snowpark place automatiquement le nom entre guillemets doubles ("3rd"), car le nom ne répond pas aux exigences d’un identificateur.

L’exemple appelle la méthode schema, puis la méthode names sur l’objet StructType renvoyé pour obtenir un ArraySeq de noms de colonnes. Les noms sont normalisés dans les StructType retournés par la méthode schema.

// Create a DataFrame containing the "id" and "3rd" columns.
val dfSelectedColumns = session.table("sample_product_data").select(col("id"), col("3rd"))
// Print out the names of the columns in the schema. This prints out:
//   ArraySeq(ID, "3rd")
println(dfSelectedColumns.schema.names.toSeq)
Copy

Joindre des DataFrames

Pour joindre des objets DataFrame, il faut appeler la méthode DataFrame.join.

Les sections suivantes expliquent comment utiliser des DataFrames pour effectuer une jointure :

Configuration des données d’exemple pour les jointures

Les exemples des sections suivantes utilisent des données d’exemple que vous pouvez configurer en exécutant les instructions SQL suivantes :

create or replace table sample_a (
  id_a integer,
  name_a varchar,
  value integer
);
insert into sample_a (id_a, name_a, value) values
  (10, 'A1', 5),
  (40, 'A2', 10),
  (80, 'A3', 15),
  (90, 'A4', 20)
;
create or replace table sample_b (
  id_b integer,
  name_b varchar,
  id_a integer,
  value integer
);
insert into sample_b (id_b, name_b, id_a, value) values
  (4000, 'B1', 40, 10),
  (4001, 'B2', 10, 5),
  (9000, 'B3', 80, 15),
  (9099, 'B4', null, 200)
;
create or replace table sample_c (
  id_c integer,
  name_c varchar,
  id_a integer,
  id_b integer
);
insert into sample_c (id_c, name_c, id_a, id_b) values
  (1012, 'C1', 10, null),
  (1040, 'C2', 40, 4000),
  (1041, 'C3', 40, 4001)
;
Copy

Spécifier les colonnes pour la jointure

Avec la méthode DataFrame.join, vous pouvez spécifier les colonnes à utiliser de l’une des manières suivantes :

  • Spécifiez une expression de colonne qui décrit la condition de jointure.

  • Spécifiez une ou plusieurs colonnes qui doivent être utilisées comme colonnes communes dans la jointure.

L’exemple suivant effectue une jointure interne sur la colonne nommée id_a :

// Create a DataFrame that joins the DataFrames for the tables
// "sample_a" and "sample_b" on the column named "id_a".
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"))
dfJoined.show()
Copy

Notez que l’exemple utilise la méthode DataFrame.col pour spécifier la condition à utiliser dans la jointure. Voir Spécification des colonnes et des expressions pour en savoir plus sur cette méthode.

Ceci imprime la sortie suivante :

----------------------------------------------------------------------
|"ID_A"  |"NAME_A"  |"VALUE"  |"ID_B"  |"NAME_B"  |"ID_A"  |"VALUE"  |
----------------------------------------------------------------------
|10      |A1        |5        |4001    |B2        |10      |5        |
|40      |A2        |10       |4000    |B1        |40      |10       |
|80      |A3        |15       |9000    |B3        |80      |15       |
----------------------------------------------------------------------
Copy
Noms de colonnes identiques dupliqués dans le résultat de la jointure

Dans le DataFrame résultant d’une jointure, la bibliothèque Snowpark utilise les noms de colonnes trouvés dans les tables qui ont été jointes, même si les noms de colonnes sont identiques à travers les tables. Lorsque cela se produit, les noms de ces colonnes sont dupliqués dans le DataFrame résultant de la jointure. Pour accéder à une colonne dupliquée par son nom, appelez la méthode col sur le DataFrame représentant la table d’origine de la colonne. (Pour plus d’informations sur la spécification des colonnes, voir Se référer à des colonnes dans différents DataFrames).

Dans l’exemple suivant, le code joint deux DataFrames, puis appelle la méthode select sur les DataFrame joints. Il spécifie les colonnes à sélectionner en appelant la méthode col à partir de la variable représentant les objets DataFrame respectifs : dfRhs et dfLhs. Il utilise la méthode as pour donner aux colonnes de nouveaux noms dans le DataFrame que la méthode select crée.

val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"))
val dfSelected = dfJoined.select(dfLhs.col("value").as("LeftValue"), dfRhs.col("value").as("RightValue"))
dfSelected.show()
Copy

Ceci imprime la sortie suivante :

------------------------------
|"LEFTVALUE"  |"RIGHTVALUE"  |
------------------------------
|5            |5             |
|10           |10            |
|15           |15            |
------------------------------
Copy
Dédupliquer les colonnes avant de les enregistrer ou de les mettre en cache

Notez que lorsqu’un DataFrame résultant d’une jointure comprend des noms de colonnes dupliqués, vous devez dédupliquer ou renommer les colonnes pour supprimer la duplication dans le DataFrame avant d’enregistrer le résultat dans une table ou de mettre en cache le DataFrame. Pour les noms de colonnes dupliqués dans un DataFrame que vous enregistrez dans une table ou dans le cache, la bibliothèque Snowpark remplacera les noms de colonnes dupliqués par des alias afin qu’ils ne soient plus dupliqués.

L’exemple suivant illustre comment la sortie d’un DataFrame mis en cache peut apparaître si les noms de colonnes ID_A et VALUE ont été dupliqués dans une jointure de deux tables, puis n’ont pas été dédupliqués ou renommés avant la mise en cache du résultat.

--------------------------------------------------------------------------------------------------
|"l_ZSz7_ID_A"  |"NAME_A"  |"l_ZSz7_VALUE"  |"ID_B"  |"NAME_B"  |"r_heec_ID_A"  |"r_heec_VALUE"  |
--------------------------------------------------------------------------------------------------
|10             |A1        |5               |4001    |B2        |10             |5               |
|40             |A2        |10              |4000    |B1        |40             |10              |
|80             |A3        |15              |9000    |B3        |80             |15              |
--------------------------------------------------------------------------------------------------
Copy

Exécution d’une jointure naturelle

Pour effectuer une jointure naturelle (où des DataFrames sont joints sur des colonnes qui ont le même nom), appelez la méthode DataFrame.naturalJoin.

L’exemple suivant joint les DataFrames des tables sample_a et sample_b sur leurs colonnes communes (la colonne id_a) :

val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.naturalJoin(dfRhs)
dfJoined.show()
Copy

Ceci imprime la sortie suivante :

---------------------------------------------------
|"ID_A"  |"VALUE"  |"NAME_A"  |"ID_B"  |"NAME_B"  |
---------------------------------------------------
|10      |5        |A1        |4001    |B2        |
|40      |10       |A2        |4000    |B1        |
|80      |15       |A3        |9000    |B3        |
---------------------------------------------------
Copy

Spécification du type de jointure

Par défaut, la méthode DataFrame.join crée une jointure interne. Pour spécifier un autre type de jointure, donnez à l’argument joinType l’une des valeurs suivantes :

Type de jointure

joinType

Jointure intérieure

inner (par défaut)

Jonction extérieure gauche

left

Jonction extérieure droite

right

Jonction extérieure complète

full

Jointure croisée

cross

Par exemple :

// Create a DataFrame that performs a left outer join on
// "sample_a" and "sample_b" on the column named "id_a".
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfLeftOuterJoin = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"), "left")
dfLeftOuterJoin.show()
Copy

Ceci imprime la sortie suivante :

----------------------------------------------------------------------
|"ID_A"  |"NAME_A"  |"VALUE"  |"ID_B"  |"NAME_B"  |"ID_A"  |"VALUE"  |
----------------------------------------------------------------------
|40      |A2        |10       |4000    |B1        |40      |10       |
|10      |A1        |5        |4001    |B2        |10      |5        |
|80      |A3        |15       |9000    |B3        |80      |15       |
|90      |A4        |20       |NULL    |NULL      |NULL    |NULL     |
----------------------------------------------------------------------
Copy

Joindre plusieurs tables

Pour joindre plusieurs tables :

  1. Créez un DataFrame pour chaque table.

  2. Appelez la méthode DataFrame.join sur le premier DataFrame, en transférant le second DataFrame.

  3. En utilisant le DataFrame renvoyé par la méthode join, appelez la méthode join en transmettant le troisième DataFrame.

Vous pouvez chaîner les appels join comme indiqué ci-dessous :

val dfFirst = session.table("sample_a")
val dfSecond  = session.table("sample_b")
val dfThird = session.table("sample_c")
val dfJoinThreeTables = dfFirst.join(dfSecond, dfFirst.col("id_a") === dfSecond.col("id_a")).join(dfThird, dfFirst.col("id_a") === dfThird.col("id_a"))
dfJoinThreeTables.show()
Copy

Ceci imprime la sortie suivante :

------------------------------------------------------------------------------------------------------------
|"ID_A"  |"NAME_A"  |"VALUE"  |"ID_B"  |"NAME_B"  |"ID_A"  |"VALUE"  |"ID_C"  |"NAME_C"  |"ID_A"  |"ID_B"  |
------------------------------------------------------------------------------------------------------------
|10      |A1        |5        |4001    |B2        |10      |5        |1012    |C1        |10      |NULL    |
|40      |A2        |10       |4000    |B1        |40      |10       |1040    |C2        |40      |4000    |
|40      |A2        |10       |4000    |B1        |40      |10       |1041    |C3        |40      |4001    |
------------------------------------------------------------------------------------------------------------
Copy

Exécution d’une auto-jointure

Si vous devez joindre une table avec elle-même sur différentes colonnes, vous ne pouvez pas effectuer l’auto-jointure avec un seul DataFrame. Les exemples suivants qui utilisent un seul DataFrame pour effectuer une auto-jointure échouent parce que les expressions de colonne pour "id" sont présentes dans les côtés gauche et droit de la jointure :

// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
val df = session.table("sample_product_data");
val dfJoined = df.join(df, col("id") === col("parent_id"))
Copy
// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
val df = session.table("sample_product_data");
val dfJoined = df.join(df, df("id") === df("parent_id"))
Copy

Ces deux exemples échouent avec l’exception suivante :

Exception in thread "main" com.snowflake.snowpark.SnowparkClientException:
  Joining a DataFrame to itself can lead to incorrect results due to ambiguity of column references.
  Instead, join this DataFrame to a clone() of itself.
Copy

Au lieu de cela, utilisez la méthode DataFrame.clone pour créer un clone de l’objet DataFrame, et utilisez les deux objets DataFrame pour effectuer la jointure :

// Create a DataFrame object for the "sample_product_data" table for the left-hand side of the join.
val dfLhs = session.table("sample_product_data")
// Clone the DataFrame object to use as the right-hand side of the join.
val dfRhs = dfLhs.clone()

// Create a DataFrame that joins the two DataFrames
// for the "sample_product_data" table on the
// "id" and "parent_id" columns.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id") === dfRhs.col("parent_id"))
dfJoined.show()
Copy

Si vous souhaitez effectuer une auto-jointure sur la même colonne, appelez la méthode join qui transmet une Seq d’expressions de colonnes pour la clause USING :

// Create a DataFrame that performs a self-join on a DataFrame
// using the column named "key".
val df = session.table("sample_product_data");
val dfJoined = df.join(df, Seq("key"))
Copy

Exécution d’une action pour évaluer un DataFrame

Comme nous l’avons mentionné précédemment, le DataFrame est évalué de façon « lazy », ce qui signifie que l’instruction SQL n’est pas envoyée au serveur pour être exécutée tant que vous n’avez pas effectué une action. Une action provoque l’évaluation du DataFrame et envoie l’instruction SQL correspondante au serveur pour exécution.

Les sections suivantes expliquent comment effectuer une action de manière synchrone et asynchrone sur un DataFrame :

Exécution d’une action de manière synchrone

Pour exécuter une action de manière synchrone, appelez l’une des méthodes d’action suivantes :

Méthode pour exécuter une action de manière synchrone

Description

DataFrame.collect

Évalue les DataFrame et renvoie le jeu de données résultant sous la forme d’une Array d’objets Row. Voir Renvoi de toutes les lignes.

DataFrame.toLocalIterator

Évalue le DataFrame et renvoie un itérateur d’objets Row. Si le jeu de résultats est important, utilisez cette méthode pour éviter de charger tous les résultats en mémoire en même temps. Voir Renvoie d’un itérateur pour les lignes.

DataFrame.count

Évalue le DataFrame et retourne le nombre de lignes.

DataFrame.show

Évalue le DataFrame et imprime les lignes dans la console. Notez que cette méthode limite le nombre de lignes à 10 (par défaut). Voir Impression des lignes dans un DataFrame.

DataFrame.cacheResult

Exécute la requête, crée une table temporaire et place les résultats dans la table. La méthode renvoie un objet HasCachedResult que vous pouvez utiliser pour accéder aux données de cette table temporaire. Voir Mise en cache d’un DataFrame.

DataFrame.write.saveAsTable

Sauvegarde les données du DataFrame dans la table spécifiée. Voir Sauvegarde des données dans une table.

DataFrame.write.(csv |json| parquet)

Enregistre un DataFrame dans un fichier spécifié dans une zone de préparation. Voir Sauvegarde d’un DataFrame dans des fichiers sur une zone de préparation.

DataFrame.read.fileformat.copyInto('tableName')

Copie les données du DataFrame dans la table spécifiée. Voir Copier les données d’un fichier dans une table.

Session.table('tableName').delete

Supprime les lignes de la table spécifiée. Voir Mise à jour, suppression et fusion des lignes d’une table.

Session.table('tableName').update

Met à jour les lignes de la table spécifiée. Voir Mise à jour, suppression et fusion des lignes d’une table.

Session.table('tableName').merge.methods.collect

Fusionne les lignes dans la table spécifiée. Voir Mise à jour, suppression et fusion des lignes d’une table.

Pour exécuter la requête et renvoyer le nombre de résultats, appelez la méthode count :

// Create a DataFrame for the "sample_product_data" table.
val dfProducts = session.table("sample_product_data")

// Send the query to the server for execution and
// print the count of rows in the table.
println("Rows returned: " + dfProducts.count())
Copy

Vous pouvez également appeler des méthodes d’action pour :

Remarque : si vous appelez la méthode schema pour obtenir les définitions des colonnes dans DataFrame, vous n’avez pas besoin d’appeler une méthode d’action.

Exécution d’une action de manière asynchrone

Note

Cette fonctionnalité a été introduite dans Snowpark 0.11.0.

Pour effectuer une action de manière asynchrone, appelez la méthode async pour retourner un objet « acteur asynchrone » (par exemple DataFrameAsyncActor), et appelez une méthode d’action asynchrone dans cet objet.

Ces méthodes d’action d’un objet acteur asynchrone renvoient un objet TypedAsyncJob que vous pouvez utiliser pour vérifier l’état de l’action asynchrone et récupérer les résultats de l’action.

Les sections suivantes expliquent comment effectuer des actions de manière asynchrone et vérifier les résultats.

Comprendre le déroulement de base des actions asynchrones

Vous pouvez utiliser les méthodes suivantes pour exécuter une action de manière asynchrone :

Méthode pour exécuter une action de manière asynchrone

Description

DataFrame.async.collect

Évalue de manière asynchrone le DataFrame pour récupérer le jeu de données résultant sous la forme d’un Array d’objets Row. Voir Renvoi de toutes les lignes.

DataFrame.async.toLocalIterator

Évalue de manière asynchrone le DataFrame pour récupérer un itérateur d’objets Row. Si le jeu de résultats est important, utilisez cette méthode pour éviter de charger tous les résultats en mémoire en même temps. Voir Renvoie d’un itérateur pour les lignes.

DataFrame.async.count

Évalue de manière asynchrone le DataFrame pour récupérer le nombre de lignes.

DataFrame.write.async.saveAsTable

Sauvegarde de manière asynchrone les données du DataFrame dans la table spécifiée. Voir Sauvegarde des données dans une table.

DataFrame.write.async.(csv |json| parquet)

Enregistre un DataFrame dans un fichier spécifié dans une zone de préparation. Voir Sauvegarde d’un DataFrame dans des fichiers sur une zone de préparation.

DataFrame.read.fileformat.async.copyInto('tableName')

Copie de manière asynchrone les données du DataFrame dans la table spécifiée. Voir Copier les données d’un fichier dans une table.

Session.table('tableName').async.delete

Supprime de manière asynchrone les lignes de la table spécifiée. Voir Mise à jour, suppression et fusion des lignes d’une table.

Session.table('tableName').async.update

Met à jour de manière asynchrone les lignes de la table spécifiée. Voir Mise à jour, suppression et fusion des lignes d’une table.

Session.table('tableName').merge.methods.async.collect

Fusionne de manière asynchrone les lignes dans la table spécifiée. Voir Mise à jour, suppression et fusion des lignes d’une table.

À partir de l’objet TypedAsyncJob renvoyé, vous pouvez effectuer les opérations suivantes :

  • Pour déterminer si l’action est terminée, appelez la méthode isDone.

  • Pour obtenir l’ID de requête qui correspond à l’action, appelez la méthode getQueryId.

  • Pour renvoyer les résultats de l’action (par exemple, le Array des objets Row pour la méthode collect ou le nombre de lignes pour la méthode count ), appelez la méthode getResult.

    Notez que getResult est un appel bloquant.

  • Pour annuler l’action, appelez la méthode cancel.

Par exemple, pour exécuter une requête de manière asynchrone et récupérer les résultats sous la forme d’un Array d’objets Row, appelez DataFrame.async.collect :

// Create a DataFrame with the "id" and "name" columns from the "sample_product_data" table.
// This does not execute the query.
val df = session.table("sample_product_data").select(col("id"), col("name"))

// Execute the query asynchronously.
// This call does not block.
val asyncJob = df.async.collect()
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// Get an Array of Rows containing the results, and print the results.
// Note that getResult is a blocking call.
val results = asyncJob.getResult()
results.foreach(println)
Copy

Pour exécuter la requête de manière asynchrone et récupérer le nombre de résultats, appelez DataFrame.async.count :

// Create a DataFrame for the "sample_product_data" table.
val dfProducts = session.table("sample_product_data")

// Execute the query asynchronously.
// This call does not block.
val asyncJob = df.async.count()
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// Print the count of rows in the table.
// Note that getResult is a blocking call.
println("Rows returned: " + asyncJob.getResult())
Copy

Spécification du nombre maximum de secondes à attendre

Lorsque vous appelez la méthode getResult vous pouvez utiliser l’argument maxWaitTimeInSeconds pour spécifier le nombre maximal de secondes à attendre que la requête soit terminée avant de tenter de récupérer les résultats. Par exemple :

// Wait a maximum of 10 seconds for the query to complete before retrieving the results.
val results = asyncJob.getResult(10)
Copy

Si vous omettez cet argument, la méthode attend le nombre maximum de secondes spécifié par la propriété de configuration snowpark_request_timeout_in_seconds. (Il s’agit d’une propriété que vous pouvez définir lors de la création de l’objet session).

Accès à une requête asynchrone par ID

Si vous avez l’ID d’une requête asynchrone que vous avez soumise précédemment, vous pouvez appeler la méthode Session.createAsyncJob pour créer un objet AsyncJob que vous pouvez utiliser pour vérifier le statut de la requête, récupérer les résultats de la requête ou annuler la requête.

Notez que contrairement à TypedAsyncJob, AsyncJob ne fournit pas de méthode getResult pour récupérer les résultats. Si vous devez récupérer les résultats, appelez la méthode getRows ou getIterator à la place.

Par exemple :

val asyncJob = session.createAsyncJob(myQueryId)
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// If you need to retrieve the results, call getRows to return an Array of Rows containing the results.
// Note that getRows is a blocking call.
val rows = asyncJob.getRows()
rows.foreach(println)
Copy

Récupération de lignes dans un DataFrame

Après avoir spécifié comment le DataFrame doit être transformé, vous pouvez appeler une méthode d’action pour exécuter une requête et renvoyer les résultats. Vous pouvez renvoyer toutes les lignes dans un Array, ou renvoyer un itérateur qui vous permet d’effectuer une itération sur les résultats, ligne par ligne. Dans ce dernier cas, si la quantité de données est importante, les lignes sont chargées en mémoire par morceaux pour éviter de trop la charger.

Renvoi de toutes les lignes

Pour renvoyer toutes les lignes en une seule fois, appelez la méthode DataFrame.collect. Cette méthode renvoie un tableau d’objets Row. Pour récupérer les valeurs de la ligne, appelez la méthode getType (par exemple getString, getInt, etc.).

Par exemple :

import com.snowflake.snowpark.functions_

val rows = session.table("sample_product_data").select(col("name"), col("category_id")).sort(col("name")).collect()
for (row <- rows) {
  println(s"Name: ${row.getString(0)}; Category ID: ${row.getInt(1)}")
}
Copy

Renvoie d’un itérateur pour les lignes

Si vous voulez utiliser un itérateur pour effectuer une itération sur les objets Row dans les résultats, appelez DataFrame.toLocalIterator. Si la quantité de données dans les résultats est importante, la méthode charge les lignes par morceaux pour éviter de charger toutes les lignes en mémoire en même temps.

Par exemple :

import com.snowflake.snowpark.functions_

while (rowIterator.hasNext) {
  val row = rowIterator.next()
  println(s"Name: ${row.getString(0)}; Category ID: ${row.getInt(1)}")
}
Copy

Retourner les n premières lignes

Pour renvoyer les n premières lignes, appelez la méthode DataFrame.first, en indiquant le nombre de lignes à renvoyer.

Comme expliqué dans Limitation du nombre de lignes dans un DataFrame, les résultats sont non déterministes. Si vous voulez que les résultats soient déterministes, appelez cette méthode sur un DataFrame trié (df.sort().first()).

Par exemple :

import com.snowflake.snowpark.functions_

val df = session.table("sample_product_data")
val rows = df.sort(col("name")).first(5)
rows.foreach(println)
Copy

Impression des lignes dans un DataFrame

Pour afficher les 10 premières lignes dans le DataFrame dans la console, appelez la méthode DataFrame.show. Pour afficher un nombre différent de lignes, indiquez le nombre de lignes à imprimer.

Comme expliqué dans Limitation du nombre de lignes dans un DataFrame, les résultats sont non déterministes. Si vous voulez que les résultats soient déterministes, appelez cette méthode sur un DataFrame trié (df.sort().show()).

Par exemple :

import com.snowflake.snowpark.functions_

val df = session.table("sample_product_data")
df.sort(col("name")).show()
Copy

Mise à jour, suppression et fusion des lignes d’une table

Note

Cette fonctionnalité a été introduite dans Snowpark 0.7.0.

Lorsque vous appelez Session.table pour créer un objet DataFrame pour une table, la méthode renvoie un objet Updatable qui étend DataFrame avec des méthodes supplémentaires pour mettre à jour et supprimer des données dans la table. (Voir Updatable.)

Si vous devez mettre à jour ou supprimer des lignes dans une table, vous pouvez utiliser les méthodes suivantes de la classe Updatable :

Mise à jour des lignes dans une table

Pour la méthode update , on transmet un Map qui associe les colonnes à mettre à jour et les valeurs correspondantes à affecter à ces colonnes. update renvoie un objet UpdateResult qui contient le nombre de lignes mises à jour. (Voir UpdateResult.)

Note

update est une méthode d’action, ce qui signifie que l’appel de la méthode envoie des instructions SQL au serveur pour exécution.

Par exemple, pour remplacer les valeurs de la colonne nommée count par la valeur 1 :

val updatableDf = session.table("sample_product_data")
val updateResult = updatableDf.update(Map("count" -> lit(1)))
println(s"Number of rows updated: ${updateResult.rowsUpdated}")
Copy

L’exemple ci-dessus utilise le nom de la colonne pour identifier la colonne. Vous pouvez également utiliser une expression de colonne :

val updateResult = updatableDf.update(Map(col("count") -> lit(1)))
Copy

Si la mise à jour ne doit être effectuée que lorsqu’une condition est remplie, vous pouvez spécifier cette condition comme argument. Par exemple, pour remplacer les valeurs de la colonne nommée count pour les lignes dans lesquelles la colonne category_id a la valeur 20 :

val updateResult = updatableDf.update(Map(col("count") -> lit(1)), col("category_id") === 20)
Copy

Si vous devez baser la condition sur une jointure avec un objet DataFrame différent, vous pouvez transmettre ce DataFrame en tant qu’argument et utiliser ce DataFrame dans la condition. Par exemple, pour remplacer les valeurs de la colonne nommée count pour les lignes dans lesquelles la colonne category_id correspond à category_id dans les DataFrame dfParts :

val updatableDf = session.table("sample_product_data")
val dfParts = session.table("parts")
val updateResult = updatableDf.update(Map(col("count") -> lit(1)), updatableDf("category_id") === dfParts("category_id"), dfParts)
Copy

Suppression de lignes dans une table

Pour la méthode delete vous pouvez spécifier une condition qui identifie les lignes à supprimer, et vous pouvez baser cette condition sur une jointure avec un autre DataFrame. delete renvoie un objet DeleteResult qui contient le nombre de lignes supprimées. (Voir DeleteResult.)

Note

delete est une méthode d’action, ce qui signifie que l’appel de la méthode envoie des instructions SQL au serveur pour exécution.

Par exemple, pour supprimer les lignes qui ont la valeur 1 dans la colonne category_id :

val updatableDf = session.table("sample_product_data")
val deleteResult = updatableDf.delete(updatableDf("category_id") === 1)
println(s"Number of rows deleted: ${deleteResult.rowsDeleted}")
Copy

Si la condition fait référence à des colonnes dans un DataFrame différent, transmettez ce DataFrame comme deuxième argument. Par exemple, pour supprimer les lignes dans lesquelles la colonne category_id correspond à category_id dans DataFrame dfParts, transmettez dfParts comme second argument :

val updatableDf = session.table("sample_product_data")
val deleteResult = updatableDf.delete(updatableDf("category_id") === dfParts("category_id"), dfParts)
println(s"Number of rows deleted: ${deleteResult.rowsDeleted}")
Copy

Fusion de lignes dans une table

Pour insérer, mettre à jour et supprimer des lignes dans une table en fonction des valeurs d’une deuxième table ou d’une sous-requête (l’équivalent de la commande MERGE en SQL), procédez comme suit :

  1. Dans l’objet Updatable de la table dans laquelle vous voulez fusionner les données, appelez la méthode merge en transmettant l’objet DataFrame de l’autre table et l’expression de la colonne pour la condition de jointure.

    Le résultat est un objet MergeBuilder que vous pouvez utiliser pour spécifier les actions à entreprendre (par exemple, insérer, mettre à jour ou supprimer) sur les lignes qui correspondent et celles qui ne correspondent pas. Voir (MergeBuilder.)

  2. En utilisant l’objet MergeBuilder :

    • Pour spécifier la mise à jour ou la suppression qui doit être effectuée sur les lignes correspondantes, appelez la méthode whenMatched.

      Si vous devez spécifier une condition supplémentaire pour que les lignes soient mises à jour ou supprimées, vous pouvez transmettre une expression de colonne pour cette condition.

      Cette méthode renvoie un objet MatchedClauseBuilder que vous pouvez utiliser pour spécifier l’action à exécuter. (Voir MatchedClauseBuilder.)

      Appelez la méthode update ou delete dans l’objet MatchedClauseBuilder pour spécifier l’action de mise à jour ou de suppression qui doit être exécutée sur les lignes correspondantes. Ces méthodes renvoient un objet MergeBuilder que vous pouvez utiliser pour spécifier des clauses supplémentaires.

    • Pour spécifier l’insertion qui doit être effectuée lorsque des lignes ne correspondent pas, appelez la méthode whenNotMatched.

      Si vous devez spécifier une condition supplémentaire pour l’insertion de lignes, vous pouvez transmettre une expression de colonne pour cette condition.

      Cette méthode renvoie un objet NotMatchedClauseBuilder que vous pouvez utiliser pour spécifier l’action à exécuter. (Voir NotMatchedClauseBuilder.)

      Appelez la méthode insert dans l’objet NotMatchedClauseBuilder pour spécifier l’action d’insertion qui doit être exécutée lorsque les lignes ne correspondent pas. Ces méthodes renvoient un objet MergeBuilder que vous pouvez utiliser pour spécifier des clauses supplémentaires.

  3. Lorsque vous avez fini de spécifier les insertions, mises à jour et suppressions à effectuer, appelez la méthode collect de l’objet MergeBuilder pour effectuer les insertions, mises à jour et suppressions spécifiées sur la table.

    collect renvoie un objet MergeResult qui contient le nombre de lignes qui ont été insérées, mises à jour et supprimées. (Voir MergeResult.)

L’exemple suivant insère une ligne avec les colonnes id et value de la table source dans la table target si la table target ne contient pas de ligne avec un ID correspondant :

val mergeResult = target.merge(source, target("id") === source("id"))
                      .whenNotMatched.insert(Seq(source("id"), source("value")))
                      .collect()
Copy

L’exemple suivant met à jour une ligne de la table target avec la valeur de la colonne value de la ligne de la table source qui a le même ID :

val mergeResult = target.merge(source, target("id") === source("id"))
                      .whenMatched.update(Map("value" -> source("value")))
                      .collect()
Copy

Sauvegarde des données dans une table

Vous pouvez enregistrer le contenu d’un DataFrame dans une table nouvelle ou une table existante. Pour ce faire, vous devez disposer des privilèges suivants :

  • Privilèges CREATE TABLE sur le schéma, si la table n’existe pas.

  • Privilèges INSERT sur la table.

Pour enregistrer le contenu d’un DataFrame dans une table :

  1. Appelez la méthode DataFrame.write pour obtenir un objet DataFrameWriter.

  2. Appelez la méthode DataFrameWriter.mode en lui transmettant un objet SaveMode qui spécifie vos préférences pour l’écriture dans la table :

    • Pour insérer des lignes, effectuez le transfert dans SaveMode.Append.

    • Pour écraser la table existante, effectuez le transfert dans SaveMode.Overwrite.

    Cette méthode renvoie le même objet DataFrameWriter configuré avec le mode spécifié.

  3. Si vous insérez des lignes dans une table existante (SaveMode.Append) et que les noms de colonne dans les DataFrame correspondent aux noms de colonne dans la table, appelez la méthode DataFrameWriter.option en transmettant "columnOrder" et "name" comme arguments.

    Note

    Cette méthode a été introduite dans Snowpark 1.4.0.

    Par défaut, l’option columnOrder est définie sur "index", ce qui signifie que l’option DataFrameWriter insère les valeurs dans l’ordre d’apparition des colonnes. Par exemple, la DataFrameWriter insère la valeur de la première colonne de la DataFrame dans la première colonne de la table, la deuxième colonne de la DataFrame dans la deuxième colonne de la table, etc.

    Cette méthode renvoie le même objet DataFrameWriter configuré avec l’option spécifiée.

  4. Appelez DataFrameWriter.saveAsTable pour enregistrer le contenu du DataFrame dans une table spécifiée.

    Vous n’avez pas besoin d’appeler une méthode distincte (par exemple collect) pour exécuter l’instruction SQL qui enregistre les données dans la table. saveAsTable est une méthode d’action qui exécute l’instruction SQL.

L’exemple suivant écrase une table existante (identifiée par la variable tableName) avec le contenu de la variable DataFrame df :

df.write.mode(SaveMode.Overwrite).saveAsTable(tableName)
Copy

L’exemple suivant insère des lignes de la variable DataFrame df dans une table existante (identifiée par la variable tableName). Dans cet exemple, la table et le DataFrame contiennent tous deux les colonnes c1 et c2.

L’exemple montre la différence entre la définition de l’option columnOrder sur "name" (qui insère des valeurs dans les colonnes de la table portant les mêmes noms que les colonnes DataFrame) et l’utilisation de l’option columnOrder par défaut (qui insère des valeurs dans les colonnes de la table en fonction de l’ordre des colonnes dans DataFrame).

val df = session.sql("SELECT 1 AS c2, 2 as c1")
// With the columnOrder option set to "name", the DataFrameWriter uses the column names
// and inserts a row with the values (2, 1).
df.write.mode(SaveMode.Append).option("columnOrder", "name").saveAsTable(tableName)
// With the default value of the columnOrder option ("index"), the DataFrameWriter the uses column positions
// and inserts a row with the values (1, 2).
df.write.mode(SaveMode.Append).saveAsTable(tableName)
Copy

Création d’une vue à partir d’un DataFrame

Pour créer une vue à partir d’un DataFrame, appelez la méthode DataFrame.createOrReplaceView :

df.createOrReplaceView("db.schema.viewName")
Copy

Notez que l’appel à createOrReplaceView crée immédiatement la nouvelle vue. Plus important encore, il ne provoque pas l’évaluation du DataFrame. (Le DataFrame lui-même n’est pas évalué avant que vous réalisiez une action).

Les vues que vous créez en appelant createOrReplaceView sont persistantes. Si vous n’avez plus besoin de cette vue, vous pouvez détruire la vue manuellement.

Si vous devez créer une vue temporaire uniquement pour la session, appelez la méthode DataFrame.createOrReplaceTempView à la place :

df.createOrReplaceTempView("db.schema.viewName")
Copy

Mise en cache d’un DataFrame

Dans certains cas, vous pouvez avoir besoin d’exécuter une requête complexe et de conserver les résultats pour les utiliser dans des opérations ultérieures (plutôt que d’exécuter à nouveau la même requête). Dans ces cas, vous pouvez mettre en cache le contenu d’un DataFrame en appelant la méthode DataFrame.cacheResult.

Cette méthode :

  • Exécute la requête.

    Il n’est pas nécessaire d’appeler une méthode d’action distincte pour récupérer les résultats avant d’appeler cacheResult. cacheResult est une méthode d’action qui exécute la requête.

  • Enregistre les résultats dans un tableau temporaire

    Comme cacheResult crée une table temporaire, vous devez disposer du privilège CREATE TABLE sur le schéma utilisé.

  • Renvoie un objet HasCachedResult qui donne accès aux résultats dans la table temporaire.

    Puisque HasCachedResult étend DataFrame, vous pouvez effectuer certaines des mêmes opérations sur ces données mises en cache que celles que vous pouvez effectuer sur un DataFrame.

Note

Comme cacheResult exécute la requête et enregistre les résultats dans une table, cette méthode peut entraîner une augmentation des coûts de calcul et de stockage.

Par exemple :

import com.snowflake.snowpark.functions_

// Set up a DataFrame to query a table.
val df = session.table("sample_product_data").filter(col("category_id") > 10)
// Retrieve the results and cache the data.
val cachedDf = df.cacheResult()
// Create a DataFrame containing a subset of the cached data.
val dfSubset = cachedDf.filter(col("category_id") === lit(20)).select(col("name"), col("category_id"))
dfSubset.show()
Copy

Notez que le DataFrame d’origine n’est pas affecté lorsque vous appelez cette méthode. Par exemple, supposons que dfTable est un DataFrame pour la table sample_product_data :

val dfTempTable = dfTable.cacheResult()
Copy

Après avoir appelé cacheResult, dfTable pointe toujours vers la table sample_product_data et vous pouvez continuer à utiliser dfTable pour interroger et mettre à jour cette table.

Pour utiliser les données mises en cache dans la table temporaire, vous utilisez dfTempTable (l’objet HasCachedResult renvoyé par cacheResult).

Travailler avec des fichiers dans une zone de préparation

La bibliothèque Snowpark fournit des classes et des méthodes que vous pouvez utiliser pour charger des données dans Snowflake et décharger des données de Snowflake en utilisant des fichiers dans des zones de préparation.

Note

Afin d’utiliser ces classes et méthodes sur une zone de préparation, vous devez avoir les privilèges requis pour travailler avec la zone de préparation.

Les sections suivantes expliquent comment utiliser ces classes et méthodes :

Chargement et téléchargement de fichiers dans une zone de préparation

Pour charger et télécharger des fichiers dans une zone de préparation, utilisez l’objet FileOperation :

Chargement de fichiers vers une zone de préparation

Pour charger des fichiers vers une zone de préparation :

  1. Vérifiez que vous avez les privilèges nécessaires pour charger des fichiers vers la zone de préparation.

  2. Utilisez Session.file pour accéder à l’objet FileOperation de la session.

  3. Appelez la méthode FileOperation.put pour charger les fichiers vers une zone de préparation.

    Cette méthode exécute une commande SQL PUT.

    • Pour spécifier tout paramètre facultatif pour la commande PUT, créez un Map des paramètres et des valeurs, et transférez le Map comme argument options. Par exemple :

      // Upload a file to a stage without compressing the file.
      val putOptions = Map("AUTO_COMPRESS" -> "FALSE")
      val putResults = session.file.put("file:///tmp/myfile.csv", "@myStage", putOptions)
      
      Copy
    • Dans l’argument localFilePath vous pouvez utiliser des caractères génériques (* et ?) pour identifier un ensemble de fichiers à charger. Par exemple :

      // Upload the CSV files in /tmp with names that start with "file".
      // You can use the wildcard characters "*" and "?" to match multiple files.
      val putResults = session.file.put("file:///tmp/file*.csv", "@myStage/prefix2")
      
      Copy
  4. Vérifiez le Array des objets PutResult renvoyés par la méthode put pour déterminer si les fichiers ont été chargés avec succès. Par exemple, pour afficher le nom du fichier et le statut de l’opération PUT pour ce fichier :

    // Print the filename and the status of the PUT operation.
    putResults.foreach(r => println(s"  ${r.sourceFileName}: ${r.status}"))
    
    Copy

Téléchargement de fichiers à partir d’une zone de préparation

Pour télécharger des fichiers à partir d’une zone de préparation :

  1. Vérifiez que vous avez les privilèges nécessaires pour télécharger des fichiers depuis la zone de préparation.

  2. Utilisez Session.file pour accéder à l’objet FileOperation de la session.

  3. Appelez la méthode FileOperation.get pour télécharger les fichiers d’une zone de préparation.

    Cette méthode exécute une commande SQL GET.

    Pour spécifier tout paramètre facultatif pour la commande GET, créez un Map des paramètres et des valeurs, et transférez le Map comme argument options. Par exemple :

    // Download files with names that match a regular expression pattern.
    val getOptions = Map("PATTERN" -> s"'.*file_.*.csv.gz'")
    val getResults = session.file.get("@myStage", "file:///tmp", getOptions)
    
    Copy
  4. Vérifiez le Array des objets GetResult renvoyés par la méthode get pour déterminer si les fichiers ont été téléchargés avec succès. Par exemple, pour afficher le nom du fichier et le statut de l’opération GET pour ce fichier :

    // Print the filename and the status of the GET operation.
    getResults.foreach(r => println(s"  ${r.fileName}: ${r.status}"))
    
    Copy

Utilisation des flux d’entrée pour charger et télécharger des données dans une zone de préparation

Note

Cette fonctionnalité a été introduite dans Snowpark 1.4.0.

Pour utiliser les flux d’entrée afin de charger des données dans un fichier sur une zone de préparation et de télécharger des données d’un fichier sur une zone de préparation, utilisez les méthodes uploadStream et downloadStream de l’objet FileOperation :

Utilisation d’un flux d’entrée pour télécharger des données vers un fichier dans une zone de préparation

Pour télécharger les données d’un objet java.io.InputStream vers un fichier sur une zone de préparation :

  1. Vérifiez que vous avez les privilèges nécessaires pour charger des fichiers vers la zone de préparation.

  2. Utilisez Session.file pour accéder à l’objet FileOperation de la session.

  3. Appelez la méthode FileOperation.uploadStream.

    Indiquez le chemin complet du fichier sur la zone de préparation où les données doivent être écrites et l’objet InputStream. En outre, utilisez l’argument compress pour indiquer si les données doivent être compressées ou non avant d’être téléchargées.

Par exemple :

import java.io.InputStream
...
val compressData = true
val pathToFileOnStage = "@myStage/path/file"
session.file.uploadStream(pathToFileOnStage, new ByteArrayInputStream(fileContent.getBytes()), compressData)
Copy

Utilisation d’un flux d’entrée pour télécharger des données à partir d’un fichier sur une zone de préparation

Pour télécharger des données d’un fichier sur une zone de préparation vers un objet java.io.InputStream :

  1. Vérifiez que vous avez les privilèges nécessaires pour télécharger des fichiers depuis la zone de préparation.

  2. Utilisez Session.file pour accéder à l’objet FileOperation de la session.

  3. Appelez la méthode FileOperation.downloadStream.

    Indiquez le chemin complet du fichier sur la zone de préparation contenant les données à télécharger. Utilisez l’argument decompress pour spécifier si les données du fichier sont compressées ou non.

Par exemple :

import java.io.InputStream
...
val isDataCompressed = true
val pathToFileOnStage = "@myStage/path/file"
val is = session.file.downloadStream(pathToFileOnStage, isDataCompressed)
Copy

Configuration d’un DataFrame pour des fichiers dans une zone de préparation

Cette section explique comment configurer un DataFrame pour des fichiers dans une zone de préparation Snowflake. Une fois que vous avez créé ce DataFrame, vous pouvez utiliser le DataFrame pour :

Pour configurer un DataFrame pour des fichiers dans une zone de préparation Snowflake, utilisez la classe DataFrameReader :

  1. Vérifiez que vous disposez des privilèges suivants :

  2. Appelez la méthode read de la classe Session pour accéder à un objet DataFrameReader.

  3. Si les fichiers sont au format CSV, décrivez les champs du fichier. Pour ce faire :

    1. Créez un objet StructType composé d’une séquence d’objets StructField qui décrivent les champs du fichier.

    2. Pour chaque objet StructField indiquez ce qui suit :

      • Nom du champ.

      • Le type de données du champ (spécifié comme un objet dans le paquet com.snowflake.snowpark.types).

      • Indique si le champ peut être « Null » ou non.

      Par exemple :

      import com.snowflake.snowpark.types._
      
      val schemaForDataFile = StructType(
          Seq(
              StructField("id", StringType, true),
              StructField("name", StringType, true)))
      
      Copy
    3. Appelez la méthode schema dans l’objet DataFrameReader, en transmettant l’objet StructType.

      Par exemple :

      var dfReader = session.read.schema(schemaForDataFile)
      
      Copy

      La méthode schema renvoie un objet DataFrameReader qui est configuré pour lire les fichiers contenant les champs spécifiés.

      Notez que vous n’avez pas besoin de faire cela pour les fichiers dans d’autres formats (tels que JSON). Pour ces fichiers, le DataFrameReader traite les données comme un champ unique de type VARIANT avec le nom de champ $1.

  4. Si vous devez spécifier des informations supplémentaires sur la manière dont les données doivent être lues (par exemple, que les données sont compressées ou qu’un fichier CSV utilise un point-virgule au lieu d’une virgule pour délimiter les champs), appelez la méthode DataFrameReader.option ou la méthode DataFrameReader.options.

    Saisissez le nom et la valeur de l’option que vous souhaitez définir. Vous pouvez définir les types d’options suivants :

    L’exemple suivant configure l’objet DataFrameReader pour interroger les données d’un fichier CSV qui n’est pas compressé et qui utilise un point-virgule comme délimiteur de champ.

    dfReader = dfReader.option("field_delimiter", ";").option("COMPRESSION", "NONE")
    
    Copy

    La méthode option renvoie un objet DataFrameReader qui est configuré avec l’option spécifiée.

    Pour définir plusieurs options, vous pouvez soit enchaîner les appels à la méthode option (comme dans l’exemple ci-dessus), soit appeler la méthode DataFrameReader.options, en transmettant un Map des noms et valeurs des options.

  5. Appelez la méthode correspondant au format des fichiers. Vous pouvez appeler l’une des méthodes suivantes :

    Lorsque vous appelez ces méthodes, transmettez l’emplacement de zone de préparation des fichiers à lire. Par exemple :

    val df = dfReader.csv("@s3_ts_stage/emails/data_0_0_0.csv")
    
    Copy

    Pour spécifier plusieurs fichiers qui commencent par le même préfixe, spécifiez le préfixe après le nom de la zone de préparation. Par exemple, pour charger les fichiers qui ont le préfixe csv_ à partir de la zone de préparation @mystage :

    val df = dfReader.csv("@mystage/csv_")
    
    Copy

    Les méthodes correspondant au format d’un fichier renvoient un objet CopyableDataFrame pour ce fichier. CopyableDataFrame étend DataFrame et fournit des méthodes supplémentaires pour exploiter les données dans des fichiers en zone de préparation.

  6. Appelez une méthode d’action pour :

    Comme c’est le cas avec les DataFrames pour les tables, les données ne sont pas récupérées dans les DataFrame avant que vous n’appeliez une méthode d’action.

Chargement de données à partir de fichiers dans un DataFrame

Après avoir configuré un DataFrame pour des fichiers dans une zone de préparation, vous pouvez charger les données des fichiers dans DataFrame :

  1. Utilisez les méthodes de l’objet DataFrame pour effectuer toute transformation nécessaire sur l’ensemble de données (par exemple, sélection de champs spécifiques, filtrage des lignes, etc.)

    Par exemple, pour extraire l’élément color d’un fichier JSON nommé data.json dans la zone de préparation nommée mystage :

    val df = session.read.json("@mystage/data.json").select(col("$1")("color"))
    
    Copy

    Comme expliqué précédemment, pour les fichiers dans des formats autres que CSV (par exemple JSON), le DataFrameReader traite les données du fichier comme une seule colonne VARIANT portant le nom $1.

  2. Appelez la méthode DataFrame.collect pour charger les données. Par exemple :

    val results = df.collect()
    
    Copy

Copier les données d’un fichier dans une table

Après avoir configuré un DataFrame pour des fichiers dans une zone de préparation, vous pouvez appeler la méthode CopyableDataFrame.copyInto pour copier les données dans une table. Cette méthode exécute la commande COPY INTO <table>.

Note

Il n’est pas nécessaire d’appeler la méthode collect avant d’appeler copyInto. Les données des fichiers ne doivent pas nécessairement se trouver dans le DataFrame avant que vous n’appeliez copyInto.

Par exemple, le code suivant charge les données du fichier CSV spécifié par myFileStage dans la table mytable. Comme les données se trouvent dans un fichier CSV, le code doit également décrire les champs du fichier. Pour ce faire, l’exemple appelle la méthode DataFrameReader.schema et transmet un objet StructType (csvFileSchema) contenant une séquence d’objets StructField qui décrivent les champs.

val df = session.read.schema(csvFileSchema).csv(myFileStage)
df.copyInto("mytable")
Copy

Sauvegarde d’un DataFrame dans des fichiers sur une zone de préparation

Note

Cette fonctionnalité a été introduite dans Snowpark 1.5.0.

Si vous devez enregistrer un DataFrame dans des fichiers dans une zone de préparation, vous pouvez appeler la méthode DataFrameWriter correspondant au format du fichier (par exemple, la méthode csv pour écrire dans un fichier CSV), en indiquant l’emplacement de la zone de préparation où les fichiers doivent être enregistrés. Ces méthodes DataFrameWriter exécutent la commande COPY INTO <emplacement>.

Note

Il n’est pas nécessaire d’appeler la méthode collect avant d’appeler les méthodes DataFrameWriter. Les données du fichier ne doivent pas nécessairement se trouver dans le DataFrame avant que vous n’appeliez ces méthodes.

Pour enregistrer le contenu d’un DataFrame dans des fichiers dans une zone de préparation :

  1. Appelez la méthode DataFrame.write pour obtenir un objet DataFrameWriter. Par exemple, pour obtenir l’objet DataFrameWriter pour un DataFrame qui représente la table nommée sample_product_data :

    dfWriter = session.table("sample_product_data").write
    
    Copy
  2. Si vous voulez écraser le contenu du fichier (si le fichier existe), appelez la méthode DataFrameWriter.mode, en transmettant SaveMode.Overwrite.

    Sinon, par défaut, DataFrameWriter signale une erreur si le fichier spécifié dans la zone de préparation existe déjà.

    La méthode mode renvoie le même objet DataFrameWriter configuré avec le mode spécifié.

    Par exemple, pour spécifier que DataFrameWriter doit écraser le fichier dans la zone de préparation :

    dfWriter = dfWriter.mode(SaveMode.Overwrite)
    
    Copy
  3. Si vous devez spécifier des informations supplémentaires sur la manière dont les données doivent être enregistrées (par exemple, que les données doivent être compressées ou que vous souhaitez utiliser un point-virgule pour délimiter les champs dans un fichier CSV), appelez la méthode DataFrameWriter.option ou la méthode DataFrameWriter.options.

    Saisissez le nom et la valeur de l’option que vous souhaitez définir. Vous pouvez définir les types d’options suivants :

    Notez que vous ne pouvez pas utiliser la méthode option pour définir les options suivantes :

    • L’option de type de format TYPE.

    • Option de copie OVERWRITE. Pour définir cette option, appelez plutôt la méthode mode (comme indiqué à l’étape précédente).

    L’exemple suivant configure l’objet DataFrameWriter pour enregistrer des données dans un fichier CSV sous forme non compressée, en utilisant un point-virgule (plutôt qu’une virgule) comme délimiteur de champ.

    dfWriter = dfWriter.option("field_delimiter", ";").option("COMPRESSION", "NONE")
    
    Copy

    La méthode option renvoie un objet DataFrameWriter qui est configuré avec l’option spécifiée.

    Pour définir plusieurs options, vous pouvez enchaîner les appels à la méthode option (comme dans l’exemple ci-dessus), ou appeler la méthode DataFrameWriter.options, en transmettant un Map des noms et valeurs des options.

  4. Pour obtenir des détails sur chaque fichier sauvegardé, définissez DETAILED_OUTPUT l’option de copie sur TRUE.

    Par défaut, DETAILED_OUTPUT est FALSE, ce qui signifie que la méthode renvoie une seule ligne de sortie contenant les champs "rows_unloaded", "input_bytes" et "output_bytes".

    Lorsque vous définissez DETAILED_OUTPUT sur TRUE, la méthode renvoie une ligne de sortie pour chaque fichier enregistré. Chaque ligne contient les champs FILE_NAME, FILE_SIZE, et ROW_COUNT.

  5. Appelez la méthode correspondant au format du fichier pour enregistrer les données dans le fichier. Vous pouvez appeler l’une des méthodes suivantes :

    Lorsque vous appelez ces méthodes, transmettez l’emplacement de la zone de préparation du fichier où les données doivent être écrites (par exemple, @mystage).

    Par défaut, la méthode enregistre les données dans des noms de fichiers avec le préfixe data_ (par exemple, @mystage/data_0_0_0.csv). Si vous voulez que les fichiers soient nommés avec un préfixe différent, spécifiez le préfixe après le nom de la zone de préparation. Par exemple :

    val writeFileResult = dfWriter.csv("@mystage/saved_data")
    
    Copy

    Cet exemple enregistre le contenu du DataFrame dans des fichiers qui commencent par le préfixe saved_data (par exemple, @mystage/saved_data_0_0_0.csv).

  6. Vérifiez l’objet WriteFileResult retourné pour obtenir des informations sur la quantité de données écrites dans le fichier.

    À partir de l’objet WriteFileResult, vous pouvez accéder à la sortie produite par la commande COPY INTO <emplacement> :

    • Pour accéder aux lignes de sortie sous la forme d’un tableau d’objets Row, utilisez le membre valeur rows.

    • Pour déterminer les champs présents dans les lignes, utilisez le membre valeur schema, qui est un StructType décrivant les champs de la ligne.

    Par exemple, pour imprimer les noms des champs et des valeurs dans les lignes de sortie :

    val writeFileResult = dfWriter.csv("@mystage/saved_data")
    for ((row, index) <- writeFileResult.rows.zipWithIndex) {
      (writeFileResult.schema.fields, writeFileResult.rows(index).toSeq).zipped.foreach {
        (structField, element) => println(s"${structField.name}: $element")
      }
    }
    
    Copy

L’exemple suivant utilise un DataFrame pour enregistrer le contenu de la table nommée car_sales dans des fichiers JSON avec le préfixe saved_data dans la zone de préparation @mystage (par exemple @mystage/saved_data_0_0_0.json). L’exemple de code :

  • Écrase le fichier, si le fichier existe déjà dans la zone de préparation.

  • Renvoie une sortie détaillée sur l’opération de sauvegarde.

  • Enregistre les données non compressées.

Enfin, l’exemple de code imprime chaque champ et chaque valeur dans les lignes de sortie renvoyées :

val df = session.table("car_sales")
val writeFileResult = df.write.mode(SaveMode.Overwrite).option("DETAILED_OUTPUT", "TRUE").option("compression", "none").json("@mystage/saved_data")
for ((row, index) <- writeFileResult.rows.zipWithIndex) {
  println(s"Row: $index")
  (writeFileResult.schema.fields, writeFileResult.rows(index).toSeq).zipped.foreach {
    (structField, element) => println(s"${structField.name}: $element")
  }
}
Copy

Utilisation de données semi-structurées

En utilisant un DataFrame, vous pouvez interroger et accéder à des données semi-structurées (par exemple, des données JSON). Les sections suivantes expliquent comment travailler avec des données semi-structurées dans un DataFrame.

Note

Les exemples de ces sections utilisent les données d’exemple dans Échantillon de données utilisé dans des exemples.

Parcours de données semi-structurées

Pour faire référence à un champ ou à un élément spécifique dans des données semi-structurées, utilisez les méthodes suivantes de l’objet Column :

  • Utilisez Column.apply(« <field_name> ») pour retourner un objet Column pour un champ dans un OBJECT (ou un VARIANT qui contient un OBJECT).

  • Utilisez Column.apply(<index>) pour renvoyer un objet Column pour un élément dans un ARRAY (ou un VARIANT qui contient un ARRAY).

Note

Si le nom du champ ou les éléments du chemin sont irréguliers et rendent difficile l’utilisation des méthodes Column.apply, vous pouvez utiliser les fonctions get, get_ignore_case ou get_path comme alternative.

Comme mentionné dans Utilisation de la méthode apply pour faire référence à une colonne, vous pouvez omettre le nom de la méthode apply :

col("column_name")("field_name")
col("column_name")(index)
Copy

Par exemple, le code suivant sélectionne le champ dealership dans les objets de la colonne src des données d’exemple :

val df = session.table("car_sales")
df.select(col("src")("dealership")).show()
Copy

Le code imprime la sortie suivante :

----------------------------
|"""SRC""['DEALERSHIP']"   |
----------------------------
|"Valley View Auto Sales"  |
|"Tindel Toyota"           |
----------------------------
Copy

Note

Les valeurs contenues dans le DataFrame sont délimitées par des guillemets doubles car elles sont renvoyées sous forme de littéraux de chaînes. Pour convertir ces valeurs en un type spécifique, voir Conversion explicite des valeurs en données semi-structurées.

Vous pouvez également enchaîner les appels de méthode pour parcourir un chemin vers un champ ou un élément spécifique.

Par exemple, le code suivant sélectionne le champ name dans l’objet salesperson :

val df = session.table("car_sales")
df.select(col("src")("salesperson")("name")).show()
Copy

Le code imprime la sortie suivante :

------------------------------------
|"""SRC""['SALESPERSON']['NAME']"  |
------------------------------------
|"Frank Beasley"                   |
|"Greg Northrup"                   |
------------------------------------
Copy

Autre exemple, le code suivant sélectionne le premier élément du champ vehicle , qui contient un tableau de véhicules. L’exemple sélectionne également le champ price du premier élément.

val df = session.table("car_sales")
df.select(col("src")("vehicle")(0)).show()
df.select(col("src")("vehicle")(0)("price")).show()
Copy

Le code imprime la sortie suivante :

---------------------------
|"""SRC""['VEHICLE'][0]"  |
---------------------------
|{                        |
|  "extras": [            |
|    "ext warranty",      |
|    "paint protection"   |
|  ],                     |
|  "make": "Honda",       |
|  "model": "Civic",      |
|  "price": "20275",      |
|  "year": "2017"         |
|}                        |
|{                        |
|  "extras": [            |
|    "ext warranty",      |
|    "rust proofing",     |
|    "fabric protection"  |
|  ],                     |
|  "make": "Toyota",      |
|  "model": "Camry",      |
|  "price": "23500",      |
|  "year": "2017"         |
|}                        |
---------------------------

------------------------------------
|"""SRC""['VEHICLE'][0]['PRICE']"  |
------------------------------------
|"20275"                           |
|"23500"                           |
------------------------------------
Copy

Comme alternative à la méthode apply, vous pouvez utiliser les fonctions get, get_ignore_case ou get_path si le nom du champ ou les éléments du chemin sont irréguliers et rendent difficile l’utilisation des méthodes Column.apply.

Par exemple, les lignes de code suivantes impriment toutes deux la valeur d’un champ spécifié dans un objet :

df.select(get(col("src"), lit("dealership"))).show()
df.select(col("src")("dealership")).show()
Copy

De même, les lignes de code suivantes impriment toutes deux la valeur d’un champ à un chemin spécifié dans un objet :

df.select(get_path(col("src"), lit("vehicle[0].make"))).show()
df.select(col("src")("vehicle")(0)("make")).show()
Copy

Conversion explicite des valeurs en données semi-structurées

Par défaut, les valeurs des champs et des éléments sont renvoyées sous forme de littéraux de chaînes (y compris les guillemets), comme le montrent les exemples ci-dessus.

Pour éviter les résultats inattendus, appelez la méthode cast pour convertir la valeur en un type spécifique. Par exemple, le code suivant imprime les valeurs sans et avec conversion :

// Import the objects for the data types, including StringType.
import com.snowflake.snowpark.types._
...
val df = session.table("car_sales")
df.select(col("src")("salesperson")("id")).show()
df.select(col("src")("salesperson")("id").cast(StringType)).show()
Copy

Le code imprime la sortie suivante :

----------------------------------
|"""SRC""['SALESPERSON']['ID']"  |
----------------------------------
|"55"                            |
|"274"                           |
----------------------------------

---------------------------------------------------
|"CAST (""SRC""['SALESPERSON']['ID'] AS STRING)"  |
---------------------------------------------------
|55                                               |
|274                                              |
---------------------------------------------------
Copy

Aplatissement d’un tableau d’objets en lignes

Si vous devez « aplatir » des données semi-structurées dans un DataFrame (par exemple, produire une ligne pour chaque objet d’un tableau), appelez la méthode DataFrame.flatten. Cette méthode est équivalente à la fonction FLATTEN SQL. Si vous transmettez un chemin d’accès à un objet ou à un tableau, la méthode renvoie un DataFrame qui contient une ligne pour chaque champ ou élément de l’objet ou du tableau.

Par exemple, dans les données d’exemple, src:customer est un tableau d’objets qui contient des informations sur un client. Chaque objet contient un champ name et address.

Si vous transmettez ce chemin à la fonction flatten :

val df = session.table("car_sales")
df.flatten(col("src")("customer")).show()
Copy

cette méthode renvoie un DataFrame :

----------------------------------------------------------------------------------------------------------------------------------------------------------
|"SRC"                                      |"SEQ"  |"KEY"  |"PATH"  |"INDEX"  |"VALUE"                            |"THIS"                               |
----------------------------------------------------------------------------------------------------------------------------------------------------------
|{                                          |1      |NULL   |[0]     |0        |{                                  |[                                    |
|  "customer": [                            |       |       |        |         |  "address": "San Francisco, CA",  |  {                                  |
|    {                                      |       |       |        |         |  "name": "Joyce Ridgely",         |    "address": "San Francisco, CA",  |
|      "address": "San Francisco, CA",      |       |       |        |         |  "phone": "16504378889"           |    "name": "Joyce Ridgely",         |
|      "name": "Joyce Ridgely",             |       |       |        |         |}                                  |    "phone": "16504378889"           |
|      "phone": "16504378889"               |       |       |        |         |                                   |  }                                  |
|    }                                      |       |       |        |         |                                   |]                                    |
|  ],                                       |       |       |        |         |                                   |                                     |
|  "date": "2017-04-28",                    |       |       |        |         |                                   |                                     |
|  "dealership": "Valley View Auto Sales",  |       |       |        |         |                                   |                                     |
|  "salesperson": {                         |       |       |        |         |                                   |                                     |
|    "id": "55",                            |       |       |        |         |                                   |                                     |
|    "name": "Frank Beasley"                |       |       |        |         |                                   |                                     |
|  },                                       |       |       |        |         |                                   |                                     |
|  "vehicle": [                             |       |       |        |         |                                   |                                     |
|    {                                      |       |       |        |         |                                   |                                     |
|      "extras": [                          |       |       |        |         |                                   |                                     |
|        "ext warranty",                    |       |       |        |         |                                   |                                     |
|        "paint protection"                 |       |       |        |         |                                   |                                     |
|      ],                                   |       |       |        |         |                                   |                                     |
|      "make": "Honda",                     |       |       |        |         |                                   |                                     |
|      "model": "Civic",                    |       |       |        |         |                                   |                                     |
|      "price": "20275",                    |       |       |        |         |                                   |                                     |
|      "year": "2017"                       |       |       |        |         |                                   |                                     |
|    }                                      |       |       |        |         |                                   |                                     |
|  ]                                        |       |       |        |         |                                   |                                     |
|}                                          |       |       |        |         |                                   |                                     |
|{                                          |2      |NULL   |[0]     |0        |{                                  |[                                    |
|  "customer": [                            |       |       |        |         |  "address": "New York, NY",       |  {                                  |
|    {                                      |       |       |        |         |  "name": "Bradley Greenbloom",    |    "address": "New York, NY",       |
|      "address": "New York, NY",           |       |       |        |         |  "phone": "12127593751"           |    "name": "Bradley Greenbloom",    |
|      "name": "Bradley Greenbloom",        |       |       |        |         |}                                  |    "phone": "12127593751"           |
|      "phone": "12127593751"               |       |       |        |         |                                   |  }                                  |
|    }                                      |       |       |        |         |                                   |]                                    |
|  ],                                       |       |       |        |         |                                   |                                     |
|  "date": "2017-04-28",                    |       |       |        |         |                                   |                                     |
|  "dealership": "Tindel Toyota",           |       |       |        |         |                                   |                                     |
|  "salesperson": {                         |       |       |        |         |                                   |                                     |
|    "id": "274",                           |       |       |        |         |                                   |                                     |
|    "name": "Greg Northrup"                |       |       |        |         |                                   |                                     |
|  },                                       |       |       |        |         |                                   |                                     |
|  "vehicle": [                             |       |       |        |         |                                   |                                     |
|    {                                      |       |       |        |         |                                   |                                     |
|      "extras": [                          |       |       |        |         |                                   |                                     |
|        "ext warranty",                    |       |       |        |         |                                   |                                     |
|        "rust proofing",                   |       |       |        |         |                                   |                                     |
|        "fabric protection"                |       |       |        |         |                                   |                                     |
|      ],                                   |       |       |        |         |                                   |                                     |
|      "make": "Toyota",                    |       |       |        |         |                                   |                                     |
|      "model": "Camry",                    |       |       |        |         |                                   |                                     |
|      "price": "23500",                    |       |       |        |         |                                   |                                     |
|      "year": "2017"                       |       |       |        |         |                                   |                                     |
|    }                                      |       |       |        |         |                                   |                                     |
|  ]                                        |       |       |        |         |                                   |                                     |
|}                                          |       |       |        |         |                                   |                                     |
----------------------------------------------------------------------------------------------------------------------------------------------------------
Copy

À partir de ce DataFrame, vous pouvez sélectionner les champs name et address de chaque objet dans le champ VALUE :

df.flatten(col("src")("customer")).select(col("value")("name"), col("value")("address")).show()
Copy
-------------------------------------------------
|"""VALUE""['NAME']"   |"""VALUE""['ADDRESS']"  |
-------------------------------------------------
|"Joyce Ridgely"       |"San Francisco, CA"     |
|"Bradley Greenbloom"  |"New York, NY"          |
-------------------------------------------------
Copy

Le code suivant complète l’exemple précédent en convertissant les valeurs en un type spécifique et en modifiant les noms des colonnes :

df.flatten(col("src")("customer")).select(col("value")("name").cast(StringType).as("Customer Name"), col("value")("address").cast(StringType).as("Customer Address")).show()
Copy
-------------------------------------------
|"Customer Name"     |"Customer Address"  |
-------------------------------------------
|Joyce Ridgely       |San Francisco, CA   |
|Bradley Greenbloom  |New York, NY        |
-------------------------------------------
Copy

Exécution d’instructions SQL

Pour exécuter une instruction SQL que vous spécifiez, appelez la méthode sql de la classe Session, et transmettez l’instruction à exécuter. Cette méthode renvoie un DataFrame.

Notez que l’instruction SQL ne sera pas exécutée tant que vous n’aurez pas appelé une méthode d’action.

// Get the list of the files in a stage.
// The collect() method causes this SQL statement to be executed.
val dfStageFiles = session.sql("ls @myStage")
val files = dfStageFiles.collect()
files.foreach(println)

// Resume the operation of a warehouse.
// Note that you must call the collect method in order to execute
// the SQL statement.
session.sql("alter warehouse if exists myWarehouse resume if suspended").collect()

val tableDf = session.table("table").select(col("a"), col("b"))
// Get the count of rows from the table.
val numRows = tableDf.count()
println("Count: " + numRows);
Copy

Si vous voulez appeler des méthodes pour transformer les DataFrame (par exemple, filtrer, sélectionner, etc.), notez que ces méthodes ne fonctionnent que si l’instruction SQL sous-jacente est une instruction SELECT. Les méthodes de transformation ne sont pas prises en charge pour les autres types d’instructions SQL.

val df = session.sql("select id, category_id, name from sample_product_data where id > 10")
// Because the underlying SQL statement for the DataFrame is a SELECT statement,
// you can call the filter method to transform this DataFrame.
val results = df.filter(col("category_id") < 10).select(col("id")).collect()
results.foreach(println)

// In this example, the underlying SQL statement is not a SELECT statement.
val dfStageFiles = session.sql("ls @myStage")
// Calling the filter method results in an error.
dfStageFiles.filter(...)
Copy