Verwenden von DataFrames in Snowpark

In Snowpark erfolgt die Abfrage und Verarbeitung von Daten hauptsächlich über einen DataFrame. Unter diesem Thema wird erklärt, wie Sie DataFrames verwenden.

Unter diesem Thema:

Um Daten abrufen und ändern zu können, verwenden Sie die Klasse DataFrame. Ein DataFrame repräsentiert ein relationales Dataset, das im Lazy-Modus ausgewertet wird: Es wird nur ausgeführt, wenn eine bestimmte Aktion ausgelöst wird. In gewissem Sinne ist ein DataFrame wie eine Abfrage, die ausgewertet werden muss, um Daten abzurufen.

So rufen Sie Daten in einem DataFrame ab:

  1. Konstruieren Sie ein DataFrame, das die Datenquelle für das Dataset angibt.

    Sie können z. B. ein DataFrame erstellen, das Daten aus einer Tabelle, Daten einer externen CSV-Datei oder die Ausführung einer SQL-Anweisung enthält.

  2. Geben Sie an, wie das Dataset in den DataFrame transformiert werden soll.

    Sie können z. B. angeben, welche Spalten ausgewählt werden sollen, wie die Zeilen gefiltert werden sollen, wie die Ergebnisse sortiert und gruppiert werden sollen usw.

  3. Führen Sie die Anweisung aus, um die Daten in den DataFrame abzurufen.

    Um die Daten in den DataFrame abzurufen, müssen Sie eine Methode aufrufen, die eine Aktion ausführt (z. B. die collect()-Methode).

In den nächsten Abschnitten werden diese Schritte näher erläutert.

Erstellen eines DataFrame

Um ein DataFrame zu erstellen, können Sie Methoden der Session-Klasse verwenden. Jede der folgenden Methoden erstellt einen DataFrame aus einem anderen Typ von Datenquelle:

  • Um einen DataFrame aus Daten in einer Tabelle, in einer Ansicht oder eines Streams zu erstellen, rufen Sie die table-Methode auf:

    // Create a DataFrame from the data in the "products" table.
    val dfTable = session.table("products")
    
  • Um einen DataFrame aus einer Sequenz von Werten zu erstellen, rufen Sie die createDataFrame-Methode auf:

    // Create a DataFrame containing a sequence of values.
    // In the DataFrame, name the columns "i" and "s".
    val dfSeq = session.createDataFrame(Seq((1, "one"), (2, "two"))).toDF("i", "s")
    
  • Um einen DataFrame zu erstellen, der einen Wertebereich enthält, rufen Sie die range-Methode auf:

    // Create a DataFrame from a range
    val dfRange = session.range(1, 10, 2)
    
  • Um einen DataFrame zu erstellen, der die Daten aus einer in einem Stagingbereich befindlichen Datei enthält, verwenden Sie read zum Abrufen eines DataFrameReader-Objekts. Im DataFrameReader-Objekt rufen Sie die Methode auf, die dem Format der Daten in der Datei entspricht:

    // Create a DataFrame from data in a stage.
    val dfJson = session.read.json("@mystage2/data1.json")
    
  • Um einen DataFrame zu erstellen, der die Ergebnisse einer SQL-Abfrage enthält, rufen Sie die sql-Methode auf:

    // Create a DataFrame from a SQL query
    val dfSql = session.sql("SELECT name from products")
    

    Hinweis: Obwohl Sie mit dieser Methode SELECT-Anweisungen ausführen können, die Daten aus Tabellen und Stagingdateien abrufen, sollten Sie dafür eher die Methoden table und read verwenden. Methoden wie table und read sorgen in Entwicklungstools für eine bessere Hervorhebung der Syntax und von Fehlern sowie eine intelligente Codevervollständigung.

Festlegen der Transformation des Datasets

Um festzulegen, welche Spalten ausgewählt und wie die Ergebnisse gefiltert, sortiert, gruppiert usw. werden sollen, rufen Sie die DataFrame-Methoden auf, die das Dataset transformieren. Um die Spalten in diesen Methoden zu identifizieren, verwenden Sie die Funktion col oder einen Ausdruck, der eine Spalte ergibt (siehe Angeben von Spalten und Ausdrücken).

Beispiel:

  • Zur Angabe der Zeilen, die zurückgegeben werden sollen, rufen Sie die filter-Methode auf:

    // Create a DataFrame for the rows with the ID 1
    // in the "products" table.
    //
    // This example uses the === operator of the Column object to perform an
    // equality check.
    val dfProductIdOne = dfProductInfo.filter(col("id") === 1)
    
  • Zur Angabe der Spalten, die ausgewählt werden sollen, rufen Sie die select-Methode auf:

    // Import the col function from the functions object.
    import com.snowflake.snowpark.functions._
    
    // Create a DataFrame object for the "products" table.
    val dfProductInfo = session.table("products")
    
    // Create a DataFrame that contains the id, name, and serial_number
    // columns.
    val dfProductSerialNo =
        dfProductInfo.select(col("id"), col("name"), col("serial_number"))
    

Jede Methode gibt ein neues DataFrame-Objekt zurück, das transformiert wurde. (Die Methode hat keine Auswirkungen auf das ursprüngliche DataFrame-Objekt.) Wenn Sie also mehrere Transformationen anwenden möchten, können Sie Methodenaufrufe verketten, sodass jede nachfolgende Transformationsmethode auf dem neuen DataFrame-Objekt ausgeführt wird, das vom vorherigen Methodenaufruf zurückgegeben wurde.

Beachten Sie, dass mit diesen Transformationsmethoden keine Daten aus der Snowflake-Datenbank abgerufen werden können. (Die unter Ausführen einer Aktion zum Auswerten eines DataFrame beschriebenen Aktionsmethoden führen den Datenabruf durch.) Die Transformationsmethoden geben einfach an, wie die SQL-Anweisung aufgebaut sein soll.

Verknüpfen von DataFrames

Zum Verknüpfen von DataFrame-Objekten rufen Sie die join-Methode auf:

// Create a DataFrame that joins two other DataFrames
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("key") === dfRhs.col("key"))

Beachten Sie, dass im Beispiel die Methode DataFrame.col verwendet wird, um die in der Verknüpfung zu verwendenden Spalten anzugeben. Weitere Informationen zu dieser Methode finden Sie unter Angeben von Spalten und Ausdrücken.

Wenn Sie eine Tabelle mit sich selbst über verschiedene Spalten verknüpfen müssen, können Sie die Selbstverknüpfung (Self-Join) nicht mit einem einzelnen DataFrame durchführen. Im folgenden Beispiel wird ein einzelnes DataFrame in einer Selbstverknüpfung verwendet. Diese Ausführung schlägt fehl, weil die Spaltenausdrücke für "id" sowohl auf der linken als auch der rechten Seite der Verknüpfung vorhanden sind:

val dfJoined = df.join(df, col("id") === col("parent_id"))

val dfJoined = df.join(df, df("id") === df("parent_id"))

Beide Beispiele scheitern mit der folgenden Ausnahme:

Exception in thread "main" com.snowflake.snowpark.SnowparkClientException:
  Joining a DataFrame to itself can lead to incorrect results due to ambiguity of column references.
  Instead, join this DataFrame to a clone() of itself.

Verwenden Sie stattdessen die Methode DataFrame.clone(), um einen Klon des DataFrame-Objekts zu erstellen, und verwenden Sie dann die beiden DataFrame-Objekte für die Verknüpfung:

// Create a DataFrame object for the "products" table for the left-hand side of the join.
val dfLhs = session.table("products")
// Clone the DataFrame object to use as the right-hand side of the join.
val dfRhs = dfLhs.clone()

// Create a DataFrame that joins the two DataFrames
// for the "products" table on the "key" column.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id") === dfRhs.col("parent_id"))

Wenn Sie eine Selbstverknüpfung auf derselben Spalte ausführen möchten, rufen Sie die join-Methode auf, die eine Seq von Spaltenausdrücken für die USING-Klausel übergibt:

// Create a DataFrame that performs a self-join on
// the DataFrame for the "products" table using the "key" column.
val dfJoined = df.join(df, Seq("key"))

Angeben von Spalten und Ausdrücken

Wenn Sie diese Transformationsmethoden aufrufen, müssen Sie möglicherweise Spalten angeben oder Ausdrücke, die Spalten verwenden. Wenn Sie z. B. die select-Methode aufrufen, müssen Sie die Spalten angeben, die ausgewählt werden sollen.

Um auf eine Spalte zu verweisen, erstellen Sie ein Column-Objekt durch Aufruf der Funktion col im com.snowflake.snowpark.functions-Objekt.

// Import the col function from the functions object.
import com.snowflake.snowpark.functions._

val dfProductInfo = session.table("products").select(col("id"), col("name"))

Wenn Sie einen Filter, eine Projektion, eine Verknüpfungsbedingung usw. angeben müssen, können Sie Column-Objekte in einem Ausdruck verwenden. Im folgenden Beispiel werden Column-Objekte in Ausdrücken für folgende Aufgaben verwendet:

  • Abrufen der Zeilen, in denen der Wert in der Spalte id gleich 20 ist und die Summe der Werte in den Spalten a und b kleiner als 10 ist.

  • Rückgabe des Wertes von b multipliziert mit 10 in der Spalte c. c ist ein Spaltenalias, der in der nächsten Anweisung zum Verknüpfen des DataFrame verwendet wird.

  • Verknüpfen von DataFrame df mit dem berechneten DataFrame dfCompute.

val dfCompute = session.table("T").filter(col("id") === 20).filter((col("a") + col("b")) < 10).select((col("b") * 10) as "c")
val df2 = df.join(dfCompute, col("a") === col("c") && col("a") === col("d"))

Wenn Sie auf Spalten in zwei verschiedenen DataFrame-Objekten verweisen müssen, die denselben Namen haben (z. B. beim Verknüpfen von DataFrames über diese Spalte), können Sie die DataFrame.col-Methode in einem der beiden DataFrame-Objekte verwenden, um sich auf eine Spalte in diesem Objekt zu beziehen (z. B. df1.col("name") und df2.col("name")).

Im folgenden Beispiel wird gezeigt, wie Sie mit der DataFrame.col-Methode auf eine Spalte in einem bestimmten DataFrame verweisen. Im Beispiel werden zwei DataFrame-Objekte verknüpft, die beide eine Spalte mit dem Namen key haben. Dabei wird im Beispiel die Column.as-Methode verwendet, um die Namen der Spalten im neu erstellten DataFrame zu ändern.

// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.col method to refer to the columns used in the join.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("key") === dfRhs.col("key")).select(dfLhs.col("value").as("L"), dfRhs.col("value").as("R"))

Alternativ zur Methode DataFrame.col können Sie die Methode DataFrame.apply verwenden, um auf eine Spalte in einem bestimmten DataFrame zu verweisen. Wie die Methode DataFrame.col akzeptiert auch die Methode DataFrame.apply einen Spaltennamen als Eingabe und gibt ein Column-Objekt zurück.

Beachten Sie, dass bei einem Objekt, das eine apply-Methode in Scala hat, die apply-Methode verwendet werden kann, indem das Objekt wie eine Funktion aufgerufen wird. Um z. B. df.apply("column_name") aufzurufen, können Sie einfach df("column_name") schreiben. Die folgenden Aufrufe sind gleichwertig:

  • df.col("<Spaltenname>")

  • df.apply("<Spaltenname>")

  • df("<Spaltenname>")

Das folgende Beispiel entspricht dem vorherigen Beispiel, verwendet aber die Methode DataFrame.apply, um auf die Spalten in einer Verknüpfungsoperation zu verweisen:

// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.apply method to refer to the columns used in the join.
// Note that dfLhs("key") is shorthand for dfLhs.apply("key").
val dfJoined = dfLhs.join(dfRhs, dfLhs("key") === dfRhs("key")).select(dfLhs("value").as("L"), dfRhs("value").as("R"))

Verwenden von Kurzschrift für Spaltenobjekte

Alternativ zur Funktion col können Sie auf eine Spalte auch wie folgt verweisen:

  • Verwenden Sie vor dem Spaltennamen in Anführungszeichen ein Dollarzeichen ($"column_name").

  • Verwenden Sie vor dem nicht in Anführungszeichen gesetzten Spaltennamen ein Hochkomma (ein einfaches Anführungszeichen) ('column_name).

Führen Sie dazu nach dem Erstellen eines Session-Objekts einen Import der Namen aus dem implicits-Objekt durch:

val session = Session.builder.configFile("/path/to/properties").create

// Import this after you create the session.
import session.implicits._

// Use the $ (dollar sign) shorthand.
val df = session.table("T").filter($"id" === 10).filter(($"a" + $"b") < 10)

// Use ' (apostrophe) shorthand.
val df = session.table("T").filter('id === 10).filter(('a + 'b) < 10).select('b * 10)

Verwenden von doppelten Anführungszeichen um Objektbezeichner (Tabellennamen, Spaltennamen usw.)

Die Namen von Datenbanken, Schemas, Tabellen und Stagingbereichen, die Sie angeben, müssen den Snowflake-Anforderungen an Bezeichner entsprechen. Wenn Sie einen Namen angeben, geht Snowflake davon aus, dass der Name in Großbuchstaben geschrieben ist. Daher sind z. B. die folgenden Aufrufe gleichwertig:

// The following calls are equivalent:
df.select(col("id123"))
df.select(col("ID123"))

Wenn der Name nicht den Anforderungen für Bezeichner entspricht, müssen Sie den Namen in doppelte Anführungszeichen (") setzen. Verwenden Sie einen Backslash (\) als Escape-Zeichen zum Umschließen von doppelten Anführungszeichen innerhalb von Scala-Zeichenfolgenliteralen. Der folgende Tabellenname beginnt z. B. nicht mit einem Buchstaben oder einem Unterstrich, sodass Sie den Namen in doppelte Anführungszeichen setzen müssen:

val df = session.table("\"10tablename\"")

Beachten Sie, dass Sie bei der Angabe des Namens einer Spalte keine doppelten Anführungszeichen um den Namen verwenden müssen. Die Snowpark-Bibliothek schließt Spaltennamen automatisch in doppelte Anführungszeichen ein, wenn der Name nicht den Bezeichneranforderungen entspricht:

// The following calls are equivalent:
df.select(col("3rdID"))
df.select(col("\"3rdID\"))

// The following calls are equivalent:
df.select(col("id with space"))
df.select(col("\"id with space\""))

Wenn Sie bereits doppelte Anführungszeichen um einen Spaltennamen hinzugefügt haben, fügt die Bibliothek keine weiteren doppelten Anführungszeichen um den Namen ein.

In einigen Fällen kann der Spaltenname doppelte Anführungszeichen enthalten:

describe table quoted;
+------------------------+ ...
| name                   | ...
|------------------------+ ...
| name_with_"air"_quotes | ...
| "column_name_quoted"   | ...
+------------------------+ ...

Wie unter Anforderungen an Bezeichner erläutert, müssen Sie für jedes doppelte Anführungszeichen innerhalb eines in Anführungszeichen geschriebenen Bezeichners zwei doppelte Anführungszeichen verwenden (z. B. "name_with_""air""_quotes" und """column_name_quoted"""):

val dfTable = session.table("quoted")
dfTable.select("\"name_with_\"\"air\"\"_quotes\"").show()
dfTable.select("\"\"\"column_name_quoted\"\"\"").show()

Wenn ein Bezeichner in doppelte Anführungszeichen eingeschlossen ist (unabhängig davon, ob Sie die Anführungszeichen explizit hinzugefügt haben oder die Bibliothek die Anführungszeichen für Sie hinzugefügt hat), berücksichtigt Snowflake die Groß- und Kleinschreibung des Bezeichners:

// The following calls are NOT equivalent!
// The Snowpark library adds double quotes around the column name,
// which makes Snowflake treat the column name as case-sensitive.
df.select(col("id with space"))
df.select(col("ID WITH SPACE"))

Verketten von Methodenaufrufen

Da jede Methode, die ein DataFrame-Objekt transformiert, ein neues DataFrame-Objekt zurückgibt, auf das die Transformation angewendet wurde, können Sie Methodenaufrufe verketten, um ein neues DataFrame zu erstellen, das auf eine andere Weise transformiert wird.

Im folgenden Beispiel wird ein DataFrame zurückgegeben, der für folgende Aufgaben konfiguriert ist:

  • Abfragen der Tabelle products

  • Rückgabe der Zeile mit id = 1

  • Auswahl der Spalten name und serial_number

val dfProductInfo = session.table("products").filter(col("id") === 1).select(col("name"), col("serial_number"))

In diesem Beispiel:

  • session.table("products") gibt einen DataFrame für die Tabelle products zurück.

    Der DataFrame enthält zwar noch nicht die Daten aus der Tabelle, aber das Objekt enthält die Definitionen der Spalten in der Tabelle.

  • filter(col("id") === 1) gibt ein DataFrame für die Tabelle products zurück, die so eingerichtet ist, dass sie die Zeile mit id = 1 zurückgibt.

    Beachten Sie erneut, dass DataFrame noch nicht die übereinstimmende Zeile aus der Tabelle enthält. Die übereinstimmende Zeile wird erst abgerufen, wenn Sie eine Aktionsmethode aufrufen.

  • select(col("name"), col("serial_number")) gibt ein DataFrame zurück, das die Spalten name und serial_number für die Zeile in der Tabelle products enthält, die id = 1 hat.

Beachten Sie beim Verketten von Methodenaufrufen, dass die Reihenfolge der Aufrufe wichtig ist. Jeder Methodenaufruf gibt ein DataFrame zurück, das transformiert wurde. Stellen Sie sicher, dass nachfolgende Aufrufe das transformierte DataFrame verwenden.

Im folgenden Beispiel gibt die Methode select ein DataFrame zurück, das nur zwei Spalten enthält: name und serial_number. Der Aufruf der filter-Methode auf diesem DataFrame schlägt fehl, weil dieser die Spalte id verwendet, die im transformierten DataFrame nicht vorhanden ist.

// This fails with the error "invalid identifier 'ID'."
val dfProductInfo = session.table("products").select(col("name"), col("serial_number")).filter(col("id") === 1)

Im Gegensatz dazu wird der folgende Code erfolgreich ausgeführt, weil die filter()-Methode auf einem DataFrame aufgerufen wird, der alle Spalten der Tabelle products enthält (einschließlich der Spalte id):

// This succeeds because the DataFrame returned by the table() method
// includes the id column.
val dfProductInfo = session.table("products").filter(col("id") === 1).select(col("name"), col("serial_number"))

Beachten Sie, dass Sie die Methodenaufrufe select und filter möglicherweise in einer anderen Reihenfolge ausführen müssen, als Sie die entsprechenden Schlüsselwörter (SELECT und WHERE) in einer SQL-Anweisung verwenden würden.

Abrufen von Spaltendefinitionen

Um die Definition der Spalten im Dataset für DataFrame abzurufen, verwenden Sie die schema-Methode. Diese Methode gibt ein StructType-Objekt zurück, das ein Array von StructField-Objekten enthält. Jedes StructField-Objekt enthält die Definition einer Spalte.

// Get the StructType object that describes the columns in the
// underlying rowset.
val dfDefinition = session.table("products").schema

Im zurückgegebenen StructType-Objekt sind die Spaltennamen immer normalisiert. Bezeichner, die nicht mit Anführungszeichen umschlossen sind, werden in Großbuchstaben zurückgegeben. Bezeichner in Anführungszeichen werden genau in der Schreibweise zurückgegeben, in der sie definiert wurden.

Das folgende Beispiel gibt ein DataFrame zurück, das die Spalten mit den Namen ID und 3rd enthält. Für den Spaltennamen 3rd schließt die Snowpark-Bibliothek den Namen automatisch in doppelte Anführungszeichen ein ("3rd"), da der Name nicht den Anforderungen an einen Bezeichner entspricht.

Im Beispiel wird erst die schema-Methode aufgerufen und dann die names-Methode auf dem zurückgegebenen Objekt StructType, um eine Seq von Spaltennamen zu erhalten. Die Namen werden in dem von der schema-Methode zurückgegebenen StructType normalisiert.

// This returns Seq("ID", "\"3rd\"")
df.select(col("id"), col("3rd")).schema.names.toSeq

Ausführen einer Aktion zum Auswerten eines DataFrame

Wie bereits erwähnt, wird der DataFrame im Lazy-Modus ausgewertet, d. h. die SQL-Anweisung wird erst dann zur Ausführung an den Server gesendet, wenn Sie eine Aktion ausführen. Eine Aktion löst die Auswertung des DataFrame aus und sendet die entsprechende SQL-Anweisung zur Ausführung an den Server.

Ab diesem Release führen die folgenden DataFrame-Methoden eine Aktion aus:

Methode

Beschreibung

collect

Wertet den DataFrame aus und gibt das resultierende Dataset als ein Array von Row-Objekten zurück.

count

Wertet den DataFrame aus und gibt die Anzahl der Zeilen zurück.

show

Wertet den DataFrame aus und gibt die Zeilen auf der Konsole aus. Beachten Sie, dass bei dieser Methode die Anzahl der Zeilen (standardmäßig) auf 10 eingeschränkt ist.

write.saveAsTable . (DataFrameWriter-Methode)

Speichert die im DataFrame enthaltenen Daten in die angegebene Tabelle.

Beispiel: Um eine Abfrage auf einer Tabelle auszuführen und die Ergebnisse zurückzugeben, rufen Sie die collect-Methode auf:

// Create a DataFrame for the row in the "products" table with the id 1.
// This does not execute the query.
val dfProductIdOne = session.table("products").filter(col("id") === 1)

// Send the query to the server for execution and
// return an Array of Rows containing the results.
val results = dfProductIdOne.collect()

Um die Abfrage auszuführen und die Anzahl der Ergebnisse zurückzugeben, rufen Sie die count-Methode auf:

// Create a DataFrame for the "products" table.
val dfProducts = session.table("products")

// Send the query to the server for execution and
// return the count of rows in the table.
val resultCount = dfProducts.count()

Um eine Abfrage auszuführen und die Ergebnisse über die Konsole auszugeben, rufen Sie die show-Methode auf:

// Create a DataFrame for the "products" table.
val dfProducts = session.table("products")

// Send the query to the server for execution and
// print the results to the console.
// The query limits the number of rows to 10 by default.
dfProducts.show()

// Limit the number of rows to 20, rather than 10.
dfProducts.show(20)

Hinweis: Wenn Sie die schema-Methode aufrufen, um die Definitionen der Spalten im DataFrame zu erhalten, müssen Sie keine Aktionsmethode aufrufen.

Speichern von Daten in einer Tabelle

So speichern Sie den Inhalt eines DataFrame in einer Tabelle:

  1. Rufen Sie die Methode write auf, um ein DataFrameWriter-Objekt zu erhalten.

  2. Rufen Sie die Methode mode im DataFrameWriter-Objekt auf, und geben Sie an, ob Sie Zeilen in die Tabelle einfügen oder Zeilen in der Tabelle aktualisieren möchten. Diese Methode gibt ein neues DataFrameWriter-Objekt zurück, das mit dem angegebenen Modus konfiguriert ist.

  3. Rufen Sie die Methode saveToTable im DataFrameWriter-Objekt auf, um den Inhalt des DataFrame in der angegebenen Tabelle zu speichern.

Beachten Sie, dass Sie keine separate Methode (z. B. collect) aufrufen müssen, um die SQL-Anweisung auszuführen, mit der Daten in der Tabelle gespeichert werden.

Beispiel:

df.write.mode(SaveMode.Overwrite).saveAsTable(tableName)

Erstellen einer Ansicht aus einem DataFrame

Um eine Ansicht aus einem DataFrame zu erstellen, rufen Sie die createOrReplaceView-Methode auf:

df.createOrReplaceView("db.schema.viewName")

Beachten Sie, dass bei Aufruf von createOrReplaceView die neue Ansicht sofort erstellt wird. Noch wichtiger ist, dass der Aufruf nicht dazu führt, dass der DataFrame ausgewertet wird. (Der DataFrame selbst wird erst ausgewertet, wenn Sie eine Aktion ausführen.)

Ansichten, die Sie durch Aufruf von createOrReplaceView erstellen, sind persistent. Wenn Sie diese Ansicht nicht mehr benötigen, können Sie die Ansicht manuell löschen.

Verwenden von Dateien in Stagingbereichen

In diesem Abschnitt wird erläutert, wie Sie Daten in einer Datei abfragen, die sich in einem Snowflake-Stagingbereich befindet. Für andere Operationen auf Dateien verwenden Sie SQL-Anweisungen.

Um Daten in Dateien abzufragen, die sich in einem Snowflake-Stagingbereich befinden, verwenden Sie die Klasse DataFrameReader:

  1. Rufen Sie die Methode read der Session-Klasse auf, um auf ein DataFrameReader-Objekt zuzugreifen.

  2. Wenn die Dateien im CSV-Format sind, beschreiben Sie die Felder in der Datei. Gehen Sie dabei wie folgt vor:

    1. Erstellen Sie ein StructType-Objekt, das aus einer Sequenz von StructField-Objekten besteht, die die Felder in der Datei beschreiben.

    2. Geben Sie für jedes StructField-Objekt Folgendes an:

      • Name des Feldes

      • Datentyp des Feldes (angegeben als Objekt im com.snowflake.snowpark.types-Paket)

      • Ob das Feld nullwertfähig ist oder nicht

      Beispiel:

      import com.snowflake.snowpark.types._
      
      val schemaForDataFile = StructType(
          Seq(
              StructField("id", StringType, true),
              StructField("name", StringType, true)))
      
    3. Rufen Sie die Methode schema im Objekt DataFrameReader-Objekt auf, und übergeben Sie dabei das StructType-Objekt.

      Beispiel:

      var dfReader = session.read.schema(schemaForDataFile)
      

      Die Methode schema gibt ein DataFrameReader-Objekt zurück, das so konfiguriert ist, dass es Dateien liest, die die angegebenen Felder enthalten.

      Beachten Sie, dass Sie dies für Dateien in anderen Formaten (z. B. JSON) nicht tun müssen. Für solche Dateien behandelt der DataFrameReader die Daten als ein einzelnes Feld vom Typ VARIANT mit dem Feldnamen $1.

  3. Wenn Sie zusätzliche Informationen darüber angeben müssen, wie die Daten gelesen werden sollen (z. B. dass die Daten komprimiert sind oder dass eine CSV-Datei ein Semikolon statt eines Kommas zum Begrenzen von Feldern verwendet), rufen Sie die Methode options des DataFrameReader-Objekts auf.

    Geben Sie den Namen und den Wert der Option ein, die Sie einstellen möchten. Die Namen und Werte der Dateiformatoptionen finden Sie in der Dokumentation zu CREATE FILE FORMAT.

    Sie können auch die in der COPY INTO TABLE-Dokumentation beschriebenen Kopieroptionen einstellen. Beachten Sie, dass die Einstellung von Kopieroptionen zu einer teureren Ausführungsstrategie führen kann, wenn Sie die Daten in den DataFrame abrufen.

    Im folgenden Beispiel wird das DataFrameReader-Objekt für die Abfrage von Daten einer CSV-Datei eingerichtet, die nicht komprimiert ist und die ein Semikolon als Feldbegrenzer verwendet.

    dfReader = dfReader.option("field_delimiter", ";").option("COMPRESSION", "NONE")
    

    Die Methode options gibt ein DataFrameReader-Objekt zurück, das mit den angegebenen Optionen konfiguriert ist.

  4. Rufen Sie die Methode auf, die dem Format der Datei entspricht (z. B. die Methode csv), und übergeben Sie dabei den Speicherort der Datei.

    val df = dfReader.csv("@s3_ts_stage/emails/data_0_0_0.csv")
    

    Die Methoden, die dem Format einer Datei entsprechen, geben ein DataFrame-Objekt zurück, das so konfiguriert ist, dass es die Daten aus dieser Datei enthält.

  5. Verwenden Sie die DataFrame-Objektmethoden, um alle Transformationen auszuführen, die für das Dataset benötigt werden (Auswahl bestimmter Felder, Filtern von Zeilen usw.).

    Im folgenden Beispiel wird das Element color aus einer JSON-Datei extrahiert, die sich im Stagingbereich mystage befindet:

    // Import the sqlExpr function from the functions object.
    import com.snowflake.snowpark.functions._
    
    val df = session.read.json("@mystage").select(sqlExpr("$1:color"))
    

    Wie bereits erläutert, werden Daten in Dateien, die nicht im CSV-Format sind (z. B. JSON), von DataFrameReader als eine einzelne VARIANT-Spalte mit dem Namen $1 behandelt.

    In diesem Beispiel wird die Funktion sqlExpr im com.snowflake.snowpark.functions-Objekt verwendet, um den Pfad zum Element color anzugeben.

    Beachten Sie, dass die Funktion sqlExpr das Eingangsargument weder interpretiert noch verändert. Mit der Funktion können Sie lediglich Ausdrücke und Snippets in SQL konstruieren, die noch nicht von der Snowpark-API unterstützt werden.

  6. Rufen Sie eine Aktionsmethode auf, um die Daten in der Datei abzufragen.

    Wie bei DataFrames für Tabellen werden die Daten erst dann in den DataFrame abgerufen, wenn Sie eine Aktionsmethode aufrufen.

Ausführen von SQL-Anweisungen

Um eine von Ihnen angegebene SQL-Anweisung auszuführen, rufen Sie die Methode sql in der Klasse Session auf und übergeben die auszuführende Anweisung. Diese Methode gibt einen DataFrame zurück.

Beachten Sie, dass die SQL-Anweisung erst ausgeführt wird, wenn Sie eine Aktionsmethode aufrufen.

// Get the list of the files in a stage.
// The collect() method causes this SQL statement to be executed.
val stageFilesDf = session.sql("ls @myStage").collect()

// Resume the operation of a warehouse.
// Note that you must call the collect method in order to execute
// the SQL statement.
session.sql("alter warehouse if exists myWarehouse resume if suspended").collect()

val tableDf = session.table("table").select(col("a"), col("b"))
// Get the count of rows from the table.
val numRows = tableDf.count()

// Set up a SQL statement to copy data from a stage to a table.
val copyDf = session.sql("copy into myTable from @myStage file_format=(type = csv)").collect()

Wenn Sie Methoden zur Transformation des DataFrame aufrufen möchten (z. B. Filtern, Auswahl usw.), beachten Sie, dass diese Methoden nur funktionieren, wenn die zugrunde liegende SQL-Anweisung eine SELECT-Anweisung ist. Die Transformationsmethoden werden für andere Arten von SQL-Anweisungen nicht unterstützt.

val df = session.sql("select a, c from table where b < 1")
// Because the underlying SQL statement for the DataFrame is a SELECT statement,
// you can call the filter method to transform this DataFrame.
val results = df.filter(col("c") < 10).select(col("a")).collect()

// In this example, the underlying SQL statement is not a SELECT statement.
val df = session.sql("ls @myStage")
// Calling the filter method results in an error.
df.filter(...)