Verwenden von DataFrames in Snowpark

In Snowpark erfolgt die Abfrage und Verarbeitung von Daten hauptsächlich über einen DataFrame. Unter diesem Thema wird erklärt, wie Sie DataFrames verwenden.

Unter diesem Thema:

Um Daten abrufen und ändern zu können, verwenden Sie die Klasse DataFrame. Ein DataFrame repräsentiert ein relationales Dataset, das im Lazy-Modus ausgewertet wird: Es wird nur ausgeführt, wenn eine bestimmte Aktion ausgelöst wird. In gewissem Sinne ist ein DataFrame wie eine Abfrage, die ausgewertet werden muss, um Daten abzurufen.

So rufen Sie Daten in einem DataFrame ab:

  1. Konstruieren Sie ein DataFrame, das die Datenquelle für das Dataset angibt.

    Sie können z. B. ein DataFrame erstellen, das Daten aus einer Tabelle, Daten einer externen CSV-Datei oder die Ausführung einer SQL-Anweisung enthält.

  2. Geben Sie an, wie das Dataset in den DataFrame transformiert werden soll.

    Sie können z. B. angeben, welche Spalten ausgewählt werden sollen, wie die Zeilen gefiltert werden sollen, wie die Ergebnisse sortiert und gruppiert werden sollen usw.

  3. Führen Sie die Anweisung aus, um die Daten in den DataFrame abzurufen.

    Um die Daten in den DataFrame abzurufen, müssen Sie eine Methode aufrufen, die eine Aktion ausführt (z. B. die collect()-Methode).

In den nächsten Abschnitten werden diese Schritte näher erläutert.

Erstellen eines DataFrame

Um ein DataFrame zu erstellen, können Sie Methoden der Session-Klasse verwenden. Jede der folgenden Methoden erstellt einen DataFrame aus einem anderen Typ von Datenquelle:

  • Um einen DataFrame aus Daten in einer Tabelle, in einer Ansicht oder eines Streams zu erstellen, rufen Sie die table-Methode auf:

    // Create a DataFrame from the data in the "products" table.
    val dfTable = session.table("products")
    

    Bemerkung

    Die Methode session.table gibt ein Updatable-Objekt zurück. Updatable erweitert DataFrame und bietet zusätzliche Methoden für die Verwendung der Daten in der Tabelle (z. B. Methoden zum Aktualisieren und Löschen von Daten). Siehe Aktualisieren, Löschen und Zusammenführen von Zeilen in einer Tabelle.

  • Um einen DataFrame aus einer Sequenz von Werten zu erstellen, rufen Sie die createDataFrame-Methode auf:

    // Create a DataFrame containing a sequence of values.
    // In the DataFrame, name the columns "i" and "s".
    val dfSeq = session.createDataFrame(Seq((1, "one"), (2, "two"))).toDF("i", "s")
    
  • Um einen DataFrame zu erstellen, der einen Wertebereich enthält, rufen Sie die range-Methode auf:

    // Create a DataFrame from a range
    val dfRange = session.range(1, 10, 2)
    
  • Um einen DataFrame zu erstellen, der die Daten aus einer in einem Stagingbereich befindlichen Datei enthält, verwenden Sie read zum Abrufen eines DataFrameReader-Objekts. Im DataFrameReader-Objekt rufen Sie die Methode auf, die dem Format der Daten in der Datei entspricht:

    // Create a DataFrame from data in a stage.
    val dfJson = session.read.json("@mystage2/data1.json")
    
  • Um einen DataFrame zu erstellen, der die Ergebnisse einer SQL-Abfrage enthält, rufen Sie die sql-Methode auf:

    // Create a DataFrame from a SQL query
    val dfSql = session.sql("SELECT name from products")
    

    Hinweis: Obwohl Sie mit dieser Methode SELECT-Anweisungen ausführen können, die Daten aus Tabellen und Stagingdateien abrufen, sollten Sie dafür eher die Methoden table und read verwenden. Methoden wie table und read sorgen in Entwicklungstools für eine bessere Hervorhebung der Syntax und von Fehlern sowie eine intelligente Codevervollständigung.

Festlegen der Transformation des Datasets

Um festzulegen, welche Spalten ausgewählt und wie die Ergebnisse gefiltert, sortiert, gruppiert usw. werden sollen, rufen Sie die DataFrame-Methoden auf, die das Dataset transformieren. Um die Spalten in diesen Methoden zu identifizieren, verwenden Sie die Funktion col oder einen Ausdruck, der eine Spalte ergibt (siehe Angeben von Spalten und Ausdrücken).

Beispiel:

  • Zur Angabe der Zeilen, die zurückgegeben werden sollen, rufen Sie die filter-Methode auf:

    // Create a DataFrame for the rows with the ID 1
    // in the "products" table.
    //
    // This example uses the === operator of the Column object to perform an
    // equality check.
    val dfProductIdOne = dfProductInfo.filter(col("id") === 1)
    
  • Zur Angabe der Spalten, die ausgewählt werden sollen, rufen Sie die select-Methode auf:

    // Import the col function from the functions object.
    import com.snowflake.snowpark.functions._
    
    // Create a DataFrame object for the "products" table.
    val dfProductInfo = session.table("products")
    
    // Create a DataFrame that contains the id, name, and serial_number
    // columns.
    val dfProductSerialNo =
        dfProductInfo.select(col("id"), col("name"), col("serial_number"))
    

Jede Methode gibt ein neues DataFrame-Objekt zurück, das transformiert wurde. (Die Methode hat keine Auswirkungen auf das ursprüngliche DataFrame-Objekt.) Wenn Sie also mehrere Transformationen anwenden möchten, können Sie Methodenaufrufe verketten, sodass jede nachfolgende Transformationsmethode auf dem neuen DataFrame-Objekt ausgeführt wird, das vom vorherigen Methodenaufruf zurückgegeben wurde.

Beachten Sie, dass mit diesen Transformationsmethoden keine Daten aus der Snowflake-Datenbank abgerufen werden können. (Die unter Ausführen einer Aktion zum Auswerten eines DataFrame beschriebenen Aktionsmethoden führen den Datenabruf durch.) Die Transformationsmethoden geben einfach an, wie die SQL-Anweisung aufgebaut sein soll.

Verknüpfen von DataFrames

Zum Verknüpfen von DataFrame-Objekten rufen Sie die join-Methode auf:

// Create a DataFrame that joins two other DataFrames
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("key") === dfRhs.col("key"))

Beachten Sie, dass im Beispiel die Methode DataFrame.col verwendet wird, um die in der Verknüpfung zu verwendenden Spalten anzugeben. Weitere Informationen zu dieser Methode finden Sie unter Angeben von Spalten und Ausdrücken.

Wenn Sie eine Tabelle mit sich selbst über verschiedene Spalten verknüpfen müssen, können Sie die Selbstverknüpfung (Self-Join) nicht mit einem einzelnen DataFrame durchführen. Im folgenden Beispiel wird ein einzelnes DataFrame in einer Selbstverknüpfung verwendet. Diese Ausführung schlägt fehl, weil die Spaltenausdrücke für "id" sowohl auf der linken als auch der rechten Seite der Verknüpfung vorhanden sind:

val dfJoined = df.join(df, col("id") === col("parent_id"))

val dfJoined = df.join(df, df("id") === df("parent_id"))

Beide Beispiele scheitern mit der folgenden Ausnahme:

Exception in thread "main" com.snowflake.snowpark.SnowparkClientException:
  Joining a DataFrame to itself can lead to incorrect results due to ambiguity of column references.
  Instead, join this DataFrame to a clone() of itself.

Verwenden Sie stattdessen die Methode DataFrame.clone(), um einen Klon des DataFrame-Objekts zu erstellen, und verwenden Sie dann die beiden DataFrame-Objekte für die Verknüpfung:

// Create a DataFrame object for the "products" table for the left-hand side of the join.
val dfLhs = session.table("products")
// Clone the DataFrame object to use as the right-hand side of the join.
val dfRhs = dfLhs.clone()

// Create a DataFrame that joins the two DataFrames
// for the "products" table on the "key" column.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id") === dfRhs.col("parent_id"))

Wenn Sie eine Selbstverknüpfung auf derselben Spalte ausführen möchten, rufen Sie die join-Methode auf, die eine Seq von Spaltenausdrücken für die USING-Klausel übergibt:

// Create a DataFrame that performs a self-join on
// the DataFrame for the "products" table using the "key" column.
val dfJoined = df.join(df, Seq("key"))

Angeben von Spalten und Ausdrücken

Wenn Sie diese Transformationsmethoden aufrufen, müssen Sie möglicherweise Spalten angeben oder Ausdrücke, die Spalten verwenden. Wenn Sie z. B. die select-Methode aufrufen, müssen Sie die Spalten angeben, die ausgewählt werden sollen.

Um auf eine Spalte zu verweisen, erstellen Sie ein Column-Objekt durch Aufruf der Funktion col im com.snowflake.snowpark.functions-Objekt.

// Import the col function from the functions object.
import com.snowflake.snowpark.functions._

val dfProductInfo = session.table("products").select(col("id"), col("name"))

Bemerkung

Informationen zum Erstellen eines Column-Objekts für ein Literal finden Sie unter Verwenden von Literalen als Spaltenobjekte.

Wenn Sie einen Filter, eine Projektion, eine Verknüpfungsbedingung usw. angeben müssen, können Sie Column-Objekte in einem Ausdruck verwenden. Im folgenden Beispiel werden Column-Objekte in Ausdrücken für folgende Aufgaben verwendet:

  • Abrufen der Zeilen, in denen der Wert in der Spalte id gleich 20 ist und die Summe der Werte in den Spalten a und b kleiner als 10 ist.

  • Rückgabe des Wertes von b multipliziert mit 10 in der Spalte c. c ist ein Spaltenalias, der in der nächsten Anweisung zum Verknüpfen des DataFrame verwendet wird.

  • Verknüpfen von DataFrame df mit dem berechneten DataFrame dfCompute.

val dfCompute = session.table("T").filter(col("id") === 20).filter((col("a") + col("b")) < 10).select((col("b") * 10) as "c")
val df2 = df.join(dfCompute, col("a") === col("c") && col("a") === col("d"))

Wenn Sie auf Spalten in zwei verschiedenen DataFrame-Objekten verweisen müssen, die denselben Namen haben (z. B. beim Verknüpfen von DataFrames über diese Spalte), können Sie die DataFrame.col-Methode in einem der beiden DataFrame-Objekte verwenden, um sich auf eine Spalte in diesem Objekt zu beziehen (z. B. df1.col("name") und df2.col("name")).

Im folgenden Beispiel wird gezeigt, wie Sie mit der DataFrame.col-Methode auf eine Spalte in einem bestimmten DataFrame verweisen. Im Beispiel werden zwei DataFrame-Objekte verknüpft, die beide eine Spalte mit dem Namen key haben. Dabei wird im Beispiel die Column.as-Methode verwendet, um die Namen der Spalten im neu erstellten DataFrame zu ändern.

// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.col method to refer to the columns used in the join.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("key") === dfRhs.col("key")).select(dfLhs.col("value").as("L"), dfRhs.col("value").as("R"))

Alternativ zur Methode DataFrame.col können Sie die Methode DataFrame.apply verwenden, um auf eine Spalte in einem bestimmten DataFrame zu verweisen. Wie die Methode DataFrame.col akzeptiert auch die Methode DataFrame.apply einen Spaltennamen als Eingabe und gibt ein Column-Objekt zurück.

Beachten Sie, dass bei einem Objekt, das eine apply-Methode in Scala hat, die apply-Methode verwendet werden kann, indem das Objekt wie eine Funktion aufgerufen wird. Um z. B. df.apply("column_name") aufzurufen, können Sie einfach df("column_name") schreiben. Die folgenden Aufrufe sind gleichwertig:

  • df.col("<Spaltenname>")

  • df.apply("<Spaltenname>")

  • df("<Spaltenname>")

Das folgende Beispiel entspricht dem vorherigen Beispiel, verwendet aber die Methode DataFrame.apply, um auf die Spalten in einer Verknüpfungsoperation zu verweisen:

// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.apply method to refer to the columns used in the join.
// Note that dfLhs("key") is shorthand for dfLhs.apply("key").
val dfJoined = dfLhs.join(dfRhs, dfLhs("key") === dfRhs("key")).select(dfLhs("value").as("L"), dfRhs("value").as("R"))

Verwenden von Kurzschrift für Spaltenobjekte

Alternativ zur Funktion col können Sie auf eine Spalte auch wie folgt verweisen:

  • Verwenden Sie vor dem Spaltennamen in Anführungszeichen ein Dollarzeichen ($"column_name").

  • Verwenden Sie vor dem nicht in Anführungszeichen gesetzten Spaltennamen ein Hochkomma (ein einfaches Anführungszeichen) ('column_name).

Führen Sie dazu nach dem Erstellen eines Session-Objekts einen Import der Namen aus dem implicits-Objekt durch:

val session = Session.builder.configFile("/path/to/properties").create

// Import this after you create the session.
import session.implicits._

// Use the $ (dollar sign) shorthand.
val df = session.table("T").filter($"id" === 10).filter(($"a" + $"b") < 10)

// Use ' (apostrophe) shorthand.
val df = session.table("T").filter('id === 10).filter(('a + 'b) < 10).select('b * 10)

Verwenden von doppelten Anführungszeichen um Objektbezeichner (Tabellennamen, Spaltennamen usw.)

Die Namen von Datenbanken, Schemas, Tabellen und Stagingbereichen, die Sie angeben, müssen den Snowflake-Anforderungen an Bezeichner entsprechen. Wenn Sie einen Namen angeben, geht Snowflake davon aus, dass der Name in Großbuchstaben geschrieben ist. Daher sind z. B. die folgenden Aufrufe gleichwertig:

// The following calls are equivalent:
df.select(col("id123"))
df.select(col("ID123"))

Wenn der Name nicht den Anforderungen für Bezeichner entspricht, müssen Sie den Namen in doppelte Anführungszeichen (") setzen. Verwenden Sie einen Backslash (\) als Escape-Zeichen zum Umschließen von doppelten Anführungszeichen innerhalb von Scala-Zeichenfolgenliteralen. Der folgende Tabellenname beginnt z. B. nicht mit einem Buchstaben oder einem Unterstrich, sodass Sie den Namen in doppelte Anführungszeichen setzen müssen:

val df = session.table("\"10tablename\"")

Beachten Sie, dass Sie bei der Angabe des Namens einer Spalte keine doppelten Anführungszeichen um den Namen verwenden müssen. Die Snowpark-Bibliothek schließt Spaltennamen automatisch in doppelte Anführungszeichen ein, wenn der Name nicht den Bezeichneranforderungen entspricht:

// The following calls are equivalent:
df.select(col("3rdID"))
df.select(col("\"3rdID\"))

// The following calls are equivalent:
df.select(col("id with space"))
df.select(col("\"id with space\""))

Wenn Sie bereits doppelte Anführungszeichen um einen Spaltennamen hinzugefügt haben, fügt die Bibliothek keine weiteren doppelten Anführungszeichen um den Namen ein.

In einigen Fällen kann der Spaltenname doppelte Anführungszeichen enthalten:

describe table quoted;
+------------------------+ ...
| name                   | ...
|------------------------+ ...
| name_with_"air"_quotes | ...
| "column_name_quoted"   | ...
+------------------------+ ...

Wie unter Anforderungen an Bezeichner erläutert, müssen Sie für jedes doppelte Anführungszeichen innerhalb eines in Anführungszeichen geschriebenen Bezeichners zwei doppelte Anführungszeichen verwenden (z. B. "name_with_""air""_quotes" und """column_name_quoted"""):

val dfTable = session.table("quoted")
dfTable.select("\"name_with_\"\"air\"\"_quotes\"").show()
dfTable.select("\"\"\"column_name_quoted\"\"\"").show()

Wenn ein Bezeichner in doppelte Anführungszeichen eingeschlossen ist (unabhängig davon, ob Sie die Anführungszeichen explizit hinzugefügt haben oder die Bibliothek die Anführungszeichen für Sie hinzugefügt hat), berücksichtigt Snowflake die Groß-/Kleinschreibung des Bezeichners:

// The following calls are NOT equivalent!
// The Snowpark library adds double quotes around the column name,
// which makes Snowflake treat the column name as case-sensitive.
df.select(col("id with space"))
df.select(col("ID WITH SPACE"))

Verwenden von Literalen als Spaltenobjekte

Um ein Literal in einer Methode zu verwenden, die ein Column-Objekt übergibt, erstellen Sie ein Column-Objekt für das Literal, indem Sie das Literal an die lit-Funktion im com.snowflake.snowpark.functions-Objekt übergeben. Beispiel:

// Import for the lit and col functions.
import com.snowflake.snowpark.functions._

// Show the first 10 rows in which num_items is greater than 5.
// Use `lit(5)` to create a Column object for the literal 5.
df.filter(col("num_items").gt(lit(5))).show()

Wenn das Literal in Scala ein Gleitkomma- oder Double-Wert ist (z. B. 0.05 wird standardmäßig als Double verarbeitet), generiert die Snowpark-Bibliothek SQL-Code, der den Wert implizit in den entsprechenden Snowpark-Datentyp umwandelt (z. B. 0.05::DOUBLE). Dies kann zu einem Näherungswert führen, der von der genau angegebenen Zahl abweicht.

Der Code im folgenden Beispiel zeigt keine übereinstimmenden Zeilen an, obwohl der Filter (der für Werte größer oder gleich 0.05 gilt) mit den Zeilen in DataFrame übereinstimmen sollte:

// Create a DataFrame that contains the value 0.05.
val df = session.sql("select 0.05 :: Numeric(5, 2) as a")

// Applying this filter results in no matching rows in the DataFrame.
df.filter(col("a") <= lit(0.06) - lit(0.01)).show()

Das Problem ist, dass lit(0.06) und lit(0.01) Näherungswerte für 0.06 und 0.01 liefern und nicht die genauen Werte.

Um dieses Problem zu vermeiden, können Sie eine der folgenden Methoden anwenden:

  • Option 1: Wandeln Sie das Literal in den Snowpark-Typ um, den Sie verwenden möchten. Verwenden Sie zum Beispiel einen NUMBER mit einer Genauigkeit von 5 und einem Maßstab von 2 zu verwenden:

    df.filter(col("a") <= lit(0.06).cast(new DecimalType(5, 2)) - lit(0.01).cast(new DecimalType(5, 2))).show()
    
  • Option 2: Wandeln Sie den Wert in den gewünschten Typ umwandeln, bevor der Wert an die Funktion lit übergeben wird. So können Sie zum Beispiel den Typ BigDecimal verwenden:

    df.filter(col("a") <= lit(BigDecimal(0.06)) - lit(BigDecimal(0.01))).show()
    

Umwandeln eines Spaltenobjekts in einen bestimmten Typ

Um ein column-Objekt in einen bestimmten Typ umzuwandeln, rufen Sie die cast-Methode auf und übergeben ein Typobjekt aus dem com.snowflake.snowpark.types-Paket. So können Sie zum Beispiel ein Literal als NUMBER mit einer Genauigkeit von 5 und einer Skala von 2 darzustellen:

// Import for the lit function.
import com.snowflake.snowpark.functions._
// Import for the DecimalType class..
import com.snowflake.snowpark.types._

val decimalValue = lit(0.05).cast(new DecimalType(5,2))

Verketten von Methodenaufrufen

Da jede Methode, die ein DataFrame-Objekt transformiert, ein neues DataFrame-Objekt zurückgibt, auf das die Transformation angewendet wurde, können Sie Methodenaufrufe verketten, um ein neues DataFrame zu erstellen, das auf eine andere Weise transformiert wird.

Im folgenden Beispiel wird ein DataFrame zurückgegeben, der für folgende Aufgaben konfiguriert ist:

  • Abfragen der Tabelle products

  • Rückgabe der Zeile mit id = 1

  • Auswahl der Spalten name und serial_number

val dfProductInfo = session.table("products").filter(col("id") === 1).select(col("name"), col("serial_number"))

In diesem Beispiel:

  • session.table("products") gibt einen DataFrame für die Tabelle products zurück.

    Der DataFrame enthält zwar noch nicht die Daten aus der Tabelle, aber das Objekt enthält die Definitionen der Spalten in der Tabelle.

  • filter(col("id") === 1) gibt ein DataFrame für die Tabelle products zurück, die so eingerichtet ist, dass sie die Zeile mit id = 1 zurückgibt.

    Beachten Sie erneut, dass DataFrame noch nicht die übereinstimmende Zeile aus der Tabelle enthält. Die übereinstimmende Zeile wird erst abgerufen, wenn Sie eine Aktionsmethode aufrufen.

  • select(col("name"), col("serial_number")) gibt ein DataFrame zurück, das die Spalten name und serial_number für die Zeile in der Tabelle products enthält, die id = 1 hat.

Beachten Sie beim Verketten von Methodenaufrufen, dass die Reihenfolge der Aufrufe wichtig ist. Jeder Methodenaufruf gibt ein DataFrame zurück, das transformiert wurde. Stellen Sie sicher, dass nachfolgende Aufrufe das transformierte DataFrame verwenden.

Im folgenden Beispiel gibt die Methode select ein DataFrame zurück, das nur zwei Spalten enthält: name und serial_number. Der Aufruf der filter-Methode auf diesem DataFrame schlägt fehl, weil dieser die Spalte id verwendet, die im transformierten DataFrame nicht vorhanden ist.

// This fails with the error "invalid identifier 'ID'."
val dfProductInfo = session.table("products").select(col("name"), col("serial_number")).filter(col("id") === 1)

Im Gegensatz dazu wird der folgende Code erfolgreich ausgeführt, weil die filter()-Methode auf einem DataFrame aufgerufen wird, der alle Spalten der Tabelle products enthält (einschließlich der Spalte id):

// This succeeds because the DataFrame returned by the table() method
// includes the id column.
val dfProductInfo = session.table("products").filter(col("id") === 1).select(col("name"), col("serial_number"))

Beachten Sie, dass Sie die Methodenaufrufe select und filter möglicherweise in einer anderen Reihenfolge ausführen müssen, als Sie die entsprechenden Schlüsselwörter (SELECT und WHERE) in einer SQL-Anweisung verwenden würden.

Abrufen von Spaltendefinitionen

Um die Definition der Spalten im Dataset für DataFrame abzurufen, verwenden Sie die schema-Methode. Diese Methode gibt ein StructType-Objekt zurück, das ein Array von StructField-Objekten enthält. Jedes StructField-Objekt enthält die Definition einer Spalte.

// Get the StructType object that describes the columns in the
// underlying rowset.
val dfDefinition = session.table("products").schema

Im zurückgegebenen StructType-Objekt sind die Spaltennamen immer normalisiert. Bezeichner, die nicht mit Anführungszeichen umschlossen sind, werden in Großbuchstaben zurückgegeben. Bezeichner in Anführungszeichen werden genau in der Schreibweise zurückgegeben, in der sie definiert wurden.

Das folgende Beispiel gibt ein DataFrame zurück, das die Spalten mit den Namen ID und 3rd enthält. Für den Spaltennamen 3rd schließt die Snowpark-Bibliothek den Namen automatisch in doppelte Anführungszeichen ein ("3rd"), da der Name nicht den Anforderungen an einen Bezeichner entspricht.

Im Beispiel wird erst die schema-Methode aufgerufen und dann die names-Methode auf dem zurückgegebenen Objekt StructType, um eine Seq von Spaltennamen zu erhalten. Die Namen werden in dem von der schema-Methode zurückgegebenen StructType normalisiert.

// This returns Seq("ID", "\"3rd\"")
df.select(col("id"), col("3rd")).schema.names.toSeq

Ausführen einer Aktion zum Auswerten eines DataFrame

Wie bereits erwähnt, wird der DataFrame im Lazy-Modus ausgewertet, d. h. die SQL-Anweisung wird erst dann zur Ausführung an den Server gesendet, wenn Sie eine Aktion ausführen. Eine Aktion löst die Auswertung des DataFrame aus und sendet die entsprechende SQL-Anweisung zur Ausführung an den Server.

Ab diesem Release führen die folgenden Methoden eine Aktion aus:

Klasse

Methode

Beschreibung

DataFrame

collect

Wertet den DataFrame aus und gibt das resultierende Dataset als ein Array von Row-Objekten zurück.

DataFrame

count

Wertet den DataFrame aus und gibt die Anzahl der Zeilen zurück.

DataFrame

show

Wertet den DataFrame aus und gibt die Zeilen auf der Konsole aus. Beachten Sie, dass bei dieser Methode die Anzahl der Zeilen (standardmäßig) auf 10 eingeschränkt ist.

DataFrameWriter

saveAsTable

Speichert die im DataFrame enthaltenen Daten in die angegebene Tabelle. Siehe Speichern von Daten in eine Tabelle.

Updatable

delete

Löscht Zeilen aus der angegebenen Tabelle. Siehe Aktualisieren, Löschen und Zusammenführen von Zeilen in einer Tabelle.

Updatable

update

Aktualisiert Zeilen in der angegebenen Tabelle. Siehe Aktualisieren, Löschen und Zusammenführen von Zeilen in einer Tabelle.

MergeBuilder

collect

Führt Zeilen in der angegebenen Tabelle zusammen. Siehe Aktualisieren, Löschen und Zusammenführen von Zeilen in einer Tabelle.

Beispiel: Um eine Abfrage auf einer Tabelle auszuführen und die Ergebnisse zurückzugeben, rufen Sie die collect-Methode auf:

// Create a DataFrame for the row in the "products" table with the id 1.
// This does not execute the query.
val dfProductIdOne = session.table("products").filter(col("id") === 1)

// Send the query to the server for execution and
// return an Array of Rows containing the results.
val results = dfProductIdOne.collect()

Um die Abfrage auszuführen und die Anzahl der Ergebnisse zurückzugeben, rufen Sie die count-Methode auf:

// Create a DataFrame for the "products" table.
val dfProducts = session.table("products")

// Send the query to the server for execution and
// return the count of rows in the table.
val resultCount = dfProducts.count()

Um eine Abfrage auszuführen und die Ergebnisse über die Konsole auszugeben, rufen Sie die show-Methode auf:

// Create a DataFrame for the "products" table.
val dfProducts = session.table("products")

// Send the query to the server for execution and
// print the results to the console.
// The query limits the number of rows to 10 by default.
dfProducts.show()

// Limit the number of rows to 20, rather than 10.
dfProducts.show(20)

Hinweis: Wenn Sie die schema-Methode aufrufen, um die Definitionen der Spalten im DataFrame zu erhalten, müssen Sie keine Aktionsmethode aufrufen.

Aktualisieren, Löschen und Zusammenführen von Zeilen in einer Tabelle

Bemerkung

Diese Funktion wurde in Snowpark 0.7.0 eingeführt.

Wenn Sie Session.table aufrufen, um ein DataFrame-Objekt für eine Tabelle zu erstellen, gibt die Methode ein Updatable-Objekt zurück, das DataFrame um zusätzliche Methoden zum Aktualisieren und Löschen von Daten in der Tabelle erweitert. (Siehe Aktualisierbar.)

Wenn Sie Zeilen in einer Tabelle aktualisieren oder löschen müssen, können Sie die folgenden Methoden der Klasse Updatable verwenden:

Aktualisieren von Zeilen in einer Tabelle

Übergeben Sie der Methode update ein Map-Objekt, das die zu aktualisierenden Spalten und die entsprechenden Werte assoziiert, die diesen Spalten zugewiesen werden sollen. update gibt ein UpdateResult-Objekt zurück, das die Anzahl der aktualisierten Zeilen enthält. (siehe UpdateResult).

Bemerkung

update ist eine Aktionsmethode, was bedeutet, dass der Aufruf der Methode SQL-Anweisungen zur Ausführung an den Server sendet.

Im folgenden Beispiel werden die Werte in der Spalte mit dem Namen count durch den Wert 1 ersetzt:

val updatableDf = session.table("products")
val updateResult = updatableDf.update(Map("count" -> lit(1)))
println(s"Number of rows updated: ${updateResult.rowsUpdated}")

Im obigen Beispiel wird der Name der Spalte verwendet, um die Spalte zu identifizieren. Sie können auch einen Spaltenausdruck verwenden:

val updateResult = updatableDf.update(Map(col("count") -> lit(1)))

Wenn die Aktualisierung nur erfolgen soll, wenn eine Bedingung erfüllt ist, können Sie diese Bedingung als Argument angeben. Im folgenden Beispiel werden die Werte in der Spalte mit dem Namen count für Zeilen ersetzt, in denen die Spalte category_id den Wert 20 hat:

val updateResult = updatableDf.update(Map(col("count") -> lit(1)), col("category_id") === 20)

Wenn die Bedingung auf einer Verknüpfung mit einem anderen DataFrame-Objekt basieren soll, können Sie dieses DataFrame als Argument übergeben und dieses DataFrame in der Bedingung verwenden. Im folgenden Beispiel werden die Werte in der Spalte mit dem Namen count für Zeilen ersetzt, in denen die Spalte category_id mit der category_id in der DataFrame-Spalte dfPart übereinstimmt:

val updatableDf = session.table("products")
val dfParts = session.table("parts")
val updateResult = updatableDf.update(Map(col("count") -> lit(1)), updatableDf("category_id") === dfParts("category_id"))

Löschen von Zeilen aus einer Tabelle

Sie können für die Methode delete eine Bedingung angeben, die die zu löschenden Zeilen identifiziert, und diese Bedingung kann auf einer Verknüpfung (Join) mit einem anderen DataFrame basieren. delete gibt ein DeleteResult-Objekt zurück, das die Anzahl der gelöschten Zeilen enthält. (siehe DeleteResult).

Bemerkung

delete ist eine Aktionsmethode, was bedeutet, dass der Aufruf der Methode SQL-Anweisungen zur Ausführung an den Server sendet.

Im folgenden Beispiel werden die Zeilen gelöscht, in denen die Spalte category_id mit der category_id in der DataFrame-Spalte dfPart übereinstimmt:

val updatableDf = session.table("products")
val deleteResult = updatableDf.delete(updatableDf("category_id") === dfParts("category_id"))
println(s"Number of rows deleted: ${deleteResult.rowsDeleted}")

Zusammenführen (Merge) von Zeilen in einer Tabelle

Wenn Sie Zeilen einer Tabelle einfügen, aktualisieren oder löschen möchten, die auf Werten einer zweiten Tabelle oder einer Unterabfrage basieren (das Äquivalent des MERGE-Befehls in SQL), gehen Sie wie folgt vor:

  1. Rufen Sie im Updatable-Objekt der Tabelle, in der die Daten zusammengeführt werden sollen, die Methode merge auf, und übergeben Sie dabei das DataFrame-Objekt für die andere Tabelle und den Spaltenausdruck für die Verknüpfungsbedingung.

    Dies gibt ein MergeBuilder-Objekt zurück, das Sie verwenden können, um die Aktionen (z. B. Einfügen, Aktualisieren, Löschen) für die übereinstimmenden und die nicht übereinstimmenden Zeilen anzugeben (siehe MergeBuilder).

  2. Verwendung des Objekts MergeBuilder:

    • Um die Aktualisierung oder Löschung von übereinstimmenden Zeilen anzugeben, rufen Sie die Methode whenMatched auf.

      Wenn Sie eine zusätzliche Bedingung angeben müssen, ob Zeilen aktualisiert oder gelöscht werden sollen, können Sie einen Spaltenausdruck für diese Bedingung übergeben.

      Diese Methode gibt ein MatchedClauseBuilder-Objekt zurück, das Sie zur Angabe der auszuführenden Aktion verwenden können (siehe MatchedClauseBuilder).

      Rufen Sie die Methode update oder delete im Objekt MatchedClauseBuilder auf, um die Aktualisierungs- oder Löschaktion anzugeben, die für die übereinstimmenden Zeilen durchgeführt werden soll. Diese Methoden geben ein MergeBuilder-Objekt zurück, das Sie zur Angabe zusätzlicher Klauseln verwenden können.

    • Um die Einfügung anzugeben, die ausgeführt werden soll, wenn Zeilen nicht übereinstimmen, rufen Sie die Methode whenNotMatched auf.

      Wenn Sie für das Einfügen einer Zeile eine zusätzliche Bedingung angeben müssen, können Sie einen Spaltenausdruck für diese Bedingung übergeben.

      Diese Methode gibt ein NotMatchedClauseBuilder-Objekt zurück, das Sie zur Angabe der auszuführenden Aktion verwenden können (siehe NotMatchedClauseBuilder).

      Rufen Sie die Methode insert im Objekt NotMatchedClauseBuilder auf, um die Einfügeaktion festzulegen, die durchgeführt werden soll, wenn Zeilen nicht übereinstimmen. Diese Methoden geben ein MergeBuilder-Objekt zurück, das Sie zur Angabe zusätzlicher Klauseln verwenden können.

  3. Wenn Sie die auszuführenden Einfügungen, Aktualisierungen und Löschungen angegeben haben, rufen Sie die Methode collect des Objekts MergeBuilder auf, um die angegebenen Einfügungen, Aktualisierungen und Löschungen in der Tabelle auszuführen.

    collect gibt ein MergeResult-Objekt zurück, das die Anzahl der Zeilen enthält, die eingefügt, aktualisiert und gelöscht wurden (siehe MergeResult).

Das folgende Beispiel fügt eine Zeile mit den Spalten id und value aus der Tabelle source in die Tabelle target ein, wenn die Tabelle target keine Zeile mit einer übereinstimmenden ID enthält:

val mergeResult = target.merge(source, target("id") === source("id"))
                      .whenNotMatched.insert(Seq(source("id"), source("value")))
                      .collect()

Im folgenden Beispiel wird eine Zeile in der Tabelle target mit dem Wert der Spalte value aus der Zeile in der Tabelle source aktualisiert, die dieselbe ID hat:

val mergeResult = target.merge(source, target("id") === source("id"))
                      .whenMatched.update(Map("value" -> source("value")))
                      .collect()

Speichern von Daten in eine Tabelle

So speichern Sie den Inhalt eines DataFrame in einer Tabelle:

  1. Rufen Sie die Methode write auf, um ein DataFrameWriter-Objekt zu erhalten.

  2. Rufen Sie die Methode mode im DataFrameWriter-Objekt auf, und geben Sie an, ob Sie Zeilen in die Tabelle einfügen oder Zeilen in der Tabelle aktualisieren möchten. Diese Methode gibt ein neues DataFrameWriter-Objekt zurück, das mit dem angegebenen Modus konfiguriert ist.

  3. Rufen Sie die Methode saveToTable im DataFrameWriter-Objekt auf, um den Inhalt des DataFrame in der angegebenen Tabelle zu speichern.

Beachten Sie, dass Sie keine separate Methode (z. B. collect) aufrufen müssen, um die SQL-Anweisung auszuführen, mit der Daten in der Tabelle gespeichert werden.

Beispiel:

df.write.mode(SaveMode.Overwrite).saveAsTable(tableName)

Erstellen einer Ansicht aus einem DataFrame

Um eine Ansicht aus einem DataFrame zu erstellen, rufen Sie die createOrReplaceView-Methode auf:

df.createOrReplaceView("db.schema.viewName")

Beachten Sie, dass bei Aufruf von createOrReplaceView die neue Ansicht sofort erstellt wird. Noch wichtiger ist, dass der Aufruf nicht dazu führt, dass der DataFrame ausgewertet wird. (Der DataFrame selbst wird erst ausgewertet, wenn Sie eine Aktion ausführen.)

Ansichten, die Sie durch Aufruf von createOrReplaceView erstellen, sind persistent. Wenn Sie diese Ansicht nicht mehr benötigen, können Sie die Ansicht manuell löschen.

Verwenden von Dateien in Stagingbereichen

In diesem Abschnitt wird erläutert, wie Sie Daten in einer Datei abfragen, die sich in einem Snowflake-Stagingbereich befindet. Für andere Operationen auf Dateien verwenden Sie SQL-Anweisungen.

Um Daten in Dateien abzufragen, die sich in einem Snowflake-Stagingbereich befinden, verwenden Sie die Klasse DataFrameReader:

  1. Rufen Sie die Methode read der Session-Klasse auf, um auf ein DataFrameReader-Objekt zuzugreifen.

  2. Wenn die Dateien im CSV-Format sind, beschreiben Sie die Felder in der Datei. Gehen Sie dabei wie folgt vor:

    1. Erstellen Sie ein StructType-Objekt, das aus einer Sequenz von StructField-Objekten besteht, die die Felder in der Datei beschreiben.

    2. Geben Sie für jedes StructField-Objekt Folgendes an:

      • Name des Feldes

      • Datentyp des Feldes (angegeben als Objekt im com.snowflake.snowpark.types-Paket)

      • Ob das Feld nullwertfähig ist oder nicht

      Beispiel:

      import com.snowflake.snowpark.types._
      
      val schemaForDataFile = StructType(
          Seq(
              StructField("id", StringType, true),
              StructField("name", StringType, true)))
      
    3. Rufen Sie die Methode schema im Objekt DataFrameReader-Objekt auf, und übergeben Sie dabei das StructType-Objekt.

      Beispiel:

      var dfReader = session.read.schema(schemaForDataFile)
      

      Die Methode schema gibt ein DataFrameReader-Objekt zurück, das so konfiguriert ist, dass es Dateien liest, die die angegebenen Felder enthalten.

      Beachten Sie, dass Sie dies für Dateien in anderen Formaten (z. B. JSON) nicht tun müssen. Für solche Dateien behandelt der DataFrameReader die Daten als ein einzelnes Feld vom Typ VARIANT mit dem Feldnamen $1.

  3. Wenn Sie zusätzliche Informationen darüber angeben müssen, wie die Daten gelesen werden sollen (z. B. dass die Daten komprimiert sind oder dass eine CSV-Datei ein Semikolon statt eines Kommas zum Begrenzen von Feldern verwendet), rufen Sie die Methode options des DataFrameReader-Objekts auf.

    Geben Sie den Namen und den Wert der Option ein, die Sie einstellen möchten. Die Namen und Werte der Dateiformatoptionen finden Sie in der Dokumentation zu CREATE FILE FORMAT.

    Sie können auch die in der COPY INTO TABLE-Dokumentation beschriebenen Kopieroptionen einstellen. Beachten Sie, dass die Einstellung von Kopieroptionen zu einer teureren Ausführungsstrategie führen kann, wenn Sie die Daten in den DataFrame abrufen.

    Im folgenden Beispiel wird das DataFrameReader-Objekt für die Abfrage von Daten einer CSV-Datei eingerichtet, die nicht komprimiert ist und die ein Semikolon als Feldbegrenzer verwendet.

    dfReader = dfReader.option("field_delimiter", ";").option("COMPRESSION", "NONE")
    

    Die Methode options gibt ein DataFrameReader-Objekt zurück, das mit den angegebenen Optionen konfiguriert ist.

  4. Rufen Sie die Methode auf, die dem Format der Datei entspricht (z. B. die Methode csv), und übergeben Sie dabei den Speicherort der Datei.

    val df = dfReader.csv("@s3_ts_stage/emails/data_0_0_0.csv")
    

    Die Methoden, die dem Format einer Datei entsprechen, geben ein DataFrame-Objekt zurück, das so konfiguriert ist, dass es die Daten aus dieser Datei enthält.

  5. Verwenden Sie die DataFrame-Objektmethoden, um alle Transformationen auszuführen, die für das Dataset benötigt werden (Auswahl bestimmter Felder, Filtern von Zeilen usw.).

    Im folgenden Beispiel wird das Element color aus einer JSON-Datei extrahiert, die sich im Stagingbereich mystage befindet:

    // Import the sqlExpr function from the functions object.
    import com.snowflake.snowpark.functions._
    
    val df = session.read.json("@mystage").select(sqlExpr("$1:color"))
    

    Wie bereits erläutert, werden Daten in Dateien, die nicht im CSV-Format sind (z. B. JSON), von DataFrameReader als eine einzelne VARIANT-Spalte mit dem Namen $1 behandelt.

    In diesem Beispiel wird die Funktion sqlExpr im com.snowflake.snowpark.functions-Objekt verwendet, um den Pfad zum Element color anzugeben.

    Beachten Sie, dass die Funktion sqlExpr das Eingangsargument weder interpretiert noch verändert. Mit der Funktion können Sie lediglich Ausdrücke und Snippets in SQL konstruieren, die noch nicht von der Snowpark-API unterstützt werden.

  6. Rufen Sie eine Aktionsmethode auf, um die Daten in der Datei abzufragen.

    Wie bei DataFrames für Tabellen werden die Daten erst dann in den DataFrame abgerufen, wenn Sie eine Aktionsmethode aufrufen.

Ausführen von SQL-Anweisungen

Um eine von Ihnen angegebene SQL-Anweisung auszuführen, rufen Sie die Methode sql in der Klasse Session auf und übergeben die auszuführende Anweisung. Diese Methode gibt einen DataFrame zurück.

Beachten Sie, dass die SQL-Anweisung erst ausgeführt wird, wenn Sie eine Aktionsmethode aufrufen.

// Get the list of the files in a stage.
// The collect() method causes this SQL statement to be executed.
val stageFilesDf = session.sql("ls @myStage").collect()

// Resume the operation of a warehouse.
// Note that you must call the collect method in order to execute
// the SQL statement.
session.sql("alter warehouse if exists myWarehouse resume if suspended").collect()

val tableDf = session.table("table").select(col("a"), col("b"))
// Get the count of rows from the table.
val numRows = tableDf.count()

// Set up a SQL statement to copy data from a stage to a table.
val copyDf = session.sql("copy into myTable from @myStage file_format=(type = csv)").collect()

Wenn Sie Methoden zur Transformation des DataFrame aufrufen möchten (z. B. Filtern, Auswahl usw.), beachten Sie, dass diese Methoden nur funktionieren, wenn die zugrunde liegende SQL-Anweisung eine SELECT-Anweisung ist. Die Transformationsmethoden werden für andere Arten von SQL-Anweisungen nicht unterstützt.

val df = session.sql("select a, c from table where b < 1")
// Because the underlying SQL statement for the DataFrame is a SELECT statement,
// you can call the filter method to transform this DataFrame.
val results = df.filter(col("c") < 10).select(col("a")).collect()

// In this example, the underlying SQL statement is not a SELECT statement.
val df = session.sql("ls @myStage")
// Calling the filter method results in an error.
df.filter(...)