Verwenden von DataFrames in Snowpark Scala

In Snowpark erfolgt die Abfrage und Verarbeitung von Daten hauptsächlich über einen DataFrame. Unter diesem Thema wird erklärt, wie Sie DataFrames verwenden.

Unter diesem Thema:

Um Daten abrufen und bearbeiten zu können, verwenden Sie die Klasse DataFrame. Ein DataFrame repräsentiert ein relationales Dataset, das im Lazy-Modus ausgewertet wird: Es wird nur ausgeführt, wenn eine bestimmte Aktion ausgelöst wird. In gewissem Sinne ist ein DataFrame wie eine Abfrage, die ausgewertet werden muss, um Daten abzurufen.

So rufen Sie Daten in einem DataFrame ab:

  1. Konstruieren Sie ein DataFrame, das die Datenquelle für das Dataset angibt.

    Sie können z. B. ein DataFrame erstellen, das Daten aus einer Tabelle, Daten einer externen CSV-Datei oder die Ausführung einer SQL-Anweisung enthält.

  2. Geben Sie an, wie das Dataset in den DataFrame transformiert werden soll.

    Sie können z. B. angeben, welche Spalten ausgewählt werden sollen, wie die Zeilen gefiltert werden sollen, wie die Ergebnisse sortiert und gruppiert werden sollen usw.

  3. Führen Sie die Anweisung aus, um die Daten in den DataFrame abzurufen.

    Um die Daten in den DataFrame abzurufen, müssen Sie eine Methode aufrufen, die eine Aktion ausführt (z. B. die collect()-Methode).

In den nächsten Abschnitten werden diese Schritte näher erläutert.

Einrichten der Beispiele für diesen Abschnitt

Einige der Beispiele in diesem Abschnitt verwenden einen DataFrame, um eine Tabelle namens sample_product_data abzufragen. Wenn Sie diese Beispiele ausführen möchten, können Sie diese Tabelle erstellen und mit einigen Daten füllen, indem Sie die folgenden SQL-Anweisungen ausführen:

CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT);
INSERT INTO sample_product_data VALUES
    (1, 0, 5, 'Product 1', 'prod-1', 1, 10),
    (2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20),
    (3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30),
    (4, 0, 10, 'Product 2', 'prod-2', 2, 40),
    (5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50),
    (6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60),
    (7, 0, 20, 'Product 3', 'prod-3', 3, 70),
    (8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80),
    (9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90),
    (10, 0, 50, 'Product 4', 'prod-4', 4, 100),
    (11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100),
    (12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100);
Copy

Um zu überprüfen, ob die Tabelle erstellt wurde, führen Sie folgende Anweisung aus:

SELECT * FROM sample_product_data;
Copy

Erstellen eines DataFrame

Um einen DataFrame zu erstellen, können Sie Methoden der Session-Klasse verwenden. Jede der folgenden Methoden erstellt einen DataFrame aus einem anderen Typ von Datenquelle:

  • Um einen DataFrame aus Daten einer Tabelle, einer Ansicht oder eines Streams zu erstellen, rufen Sie die table-Methode auf:

    // Create a DataFrame from the data in the "sample_product_data" table.
    val dfTable = session.table("sample_product_data")
    
    // To print out the first 10 rows, call:
    //   dfTable.show()
    
    Copy

    Bemerkung

    Die Methode session.table gibt ein Updatable-Objekt zurück. Updatable erweitert DataFrame und bietet zusätzliche Methoden für die Verwendung der Daten in der Tabelle (z. B. Methoden zum Aktualisieren und Löschen von Daten). Weitere Informationen dazu finden Sie unter Aktualisieren, Löschen und Zusammenführen von Zeilen in einer Tabelle.

  • Um einen DataFrame aus einer Sequenz von Werten zu erstellen, rufen Sie die createDataFrame-Methode auf:

    // Create a DataFrame containing a sequence of values.
    // In the DataFrame, name the columns "i" and "s".
    val dfSeq = session.createDataFrame(Seq((1, "one"), (2, "two"))).toDF("i", "s")
    
    Copy

    Bemerkung

    Von Snowflake reservierte Wörter können bei der Konstruktion eines DataFrame nicht als Spaltennamen verwendet werden. Eine Liste der reservierten Wörter finden Sie unter Reservierte und begrenzte Schlüsselwörter.

  • Um einen DataFrame zu erstellen, der einen Wertebereich enthält, rufen Sie die range-Methode auf:

    // Create a DataFrame from a range
    val dfRange = session.range(1, 10, 2)
    
    Copy
  • Um einen DataFrame zu erstellen, der die Daten aus einer in einem Stagingbereich befindlichen Datei enthält, verwenden Sie read zum Abrufen eines DataFrameReader-Objekts. Im DataFrameReader-Objekt rufen Sie die Methode auf, die dem Format der Daten in der Datei entspricht:

    // Create a DataFrame from data in a stage.
    val dfJson = session.read.json("@mystage2/data1.json")
    
    Copy
  • Um einen DataFrame zu erstellen, der die Ergebnisse einer SQL-Abfrage enthält, rufen Sie die sql-Methode auf:

    // Create a DataFrame from a SQL query
    val dfSql = session.sql("SELECT name from products")
    
    Copy

    Hinweis: Obwohl Sie mit dieser Methode SELECT-Anweisungen ausführen können, die Daten aus Tabellen und Stagingdateien abrufen, sollten Sie dafür eher die Methoden table und read verwenden. Methoden wie table und read bieten eine bessere Syntax- und Fehlerhervorhebung sowie intelligente Codevervollständigung in Entwicklungstools.

Festlegen der Transformation des Datasets

Um festzulegen, welche Spalten ausgewählt und wie die Ergebnisse gefiltert, sortiert, gruppiert usw. werden sollen, rufen Sie die DataFrame-Methoden auf, die das Dataset transformieren. Um die Spalten in diesen Methoden zu identifizieren, verwenden Sie die Funktion col oder einen Ausdruck, der eine Spalte ergibt. (Siehe Angeben von Spalten und Ausdrücken.)

Beispiel:

  • Zur Angabe der Zeilen, die zurückgegeben werden sollen, rufen Sie die filter-Methode auf:

    // Import the col function from the functions object.
    import com.snowflake.snowpark.functions._
    
    // Create a DataFrame for the rows with the ID 1
    // in the "sample_product_data" table.
    //
    // This example uses the === operator of the Column object to perform an
    // equality check.
    val df = session.table("sample_product_data").filter(col("id") === 1)
    df.show()
    
    Copy
  • Zur Angabe der Spalten, die ausgewählt werden sollen, rufen Sie die select-Methode auf:

    // Import the col function from the functions object.
    import com.snowflake.snowpark.functions._
    
    // Create a DataFrame that contains the id, name, and serial_number
    // columns in te "sample_product_data" table.
    val df = session.table("sample_product_data").select(col("id"), col("name"), col("serial_number"))
    df.show()
    
    Copy

Jede Methode gibt ein neues DataFrame-Objekt zurück, das transformiert wurde. (Die Methode hat keine Auswirkungen auf das ursprüngliche DataFrame-Objekt.) Wenn Sie also mehrere Transformationen anwenden möchten, können Sie Methodenaufrufe verketten, sodass jede nachfolgende Transformationsmethode auf dem neuen DataFrame-Objekt ausgeführt wird, das vom vorherigen Methodenaufruf zurückgegeben wurde.

Beachten Sie, dass mit diesen Transformationsmethoden keine Daten aus der Snowflake-Datenbank abgerufen werden können. (Die unter Ausführen einer Aktion zum Auswerten eines DataFrame beschriebenen Aktionsmethoden führen den Datenabruf durch.) Die Transformationsmethoden geben einfach an, wie die SQL-Anweisung aufgebaut sein soll.

Angeben von Spalten und Ausdrücken

Wenn Sie diese Transformationsmethoden aufrufen, müssen Sie möglicherweise Spalten angeben oder Ausdrücke, die Spalten verwenden. Wenn Sie z. B. die select-Methode aufrufen, müssen Sie die Spalten angeben, die ausgewählt werden sollen.

Um auf eine Spalte zu verweisen, erstellen Sie ein Column-Objekt durch Aufruf der col-Funktion im com.snowflake.snowpark.functions-Objekt.

// Import the col function from the functions object.
import com.snowflake.snowpark.functions._

val dfProductInfo = session.table("sample_product_data").select(col("id"), col("name"))
dfProductInfo.show()
Copy

Bemerkung

Weitere Informationen zum Erstellen eines Column-Objekts für ein Literal finden Sie unter Verwenden von Literalen als Spaltenobjekte.

Wenn Sie einen Filter, eine Projektion, eine Verknüpfungsbedingung usw. angeben müssen, können Sie Column-Objekte in einem Ausdruck verwenden. Beispiel:

  • Sie können Column-Objekte mit der Methode filter verwenden, um eine Filterbedingung anzugeben:

    // Specify the equivalent of "WHERE id = 20"
    // in an SQL SELECT statement.
    df.filter(col("id") === 20)
    
    Copy
    // Specify the equivalent of "WHERE a + b < 10"
    // in an SQL SELECT statement.
    df.filter((col("a") + col("b")) < 10)
    
    Copy
  • Sie können Column-Objekte mit der Methode select verwenden, um einen Alias zu definieren:

    // Specify the equivalent of "SELECT b * 10 AS c"
    // in an SQL SELECT statement.
    df.select((col("b") * 10) as "c")
    
    Copy
  • Sie können Column-Objekte mit der Methode join verwenden, um eine Verknüpfungsbedingung zu definieren:

    // Specify the equivalent of "X JOIN Y on X.a_in_X = Y.b_in_Y"
    // in an SQL SELECT statement.
    dfX.join(dfY, col("a_in_X") === col("b_in_Y"))
    
    Copy

Verweisen auf Spalten in verschiedenen DataFrames

Wenn Sie auf Spalten in zwei verschiedenen DataFrame-Objekten verweisen müssen, die denselben Namen haben (z. B. beim Verknüpfen von DataFrames über diese Spalte), können Sie die DataFrame.col-Methode in einem der beiden DataFrame-Objekte verwenden, um sich auf eine Spalte in diesem Objekt zu beziehen (z. B. df1.col("name") und df2.col("name")).

Im folgenden Beispiel wird gezeigt, wie Sie mit der DataFrame.col-Methode auf eine Spalte in einem bestimmten DataFrame verweisen. Im Beispiel werden zwei DataFrame-Objekte verknüpft, die beide eine Spalte mit dem Namen key haben. Dabei wird im Beispiel die Column.as-Methode verwendet, um die Namen der Spalten im neu erstellten DataFrame zu ändern.

// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.col method to refer to the columns used in the join.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("key") === dfRhs.col("key")).select(dfLhs.col("value").as("L"), dfRhs.col("value").as("R"))
Copy

Verwenden der apply-Methode zum Verweisen auf eine Spalte

Alternativ zur Methode DataFrame.col können Sie die Methode DataFrame.apply verwenden, um auf eine Spalte in einem bestimmten DataFrame zu verweisen. Wie die Methode DataFrame.col akzeptiert auch die Methode DataFrame.apply einen Spaltennamen als Eingabe und gibt ein Column-Objekt zurück.

Beachten Sie, dass bei einem Objekt, das eine apply-Methode in Scala hat, die apply-Methode verwendet werden kann, indem das Objekt wie eine Funktion aufgerufen wird. Um z. B. df.apply("column_name") aufzurufen, können Sie einfach df("column_name") schreiben. Die folgenden Aufrufe sind gleichwertig:

  • df.col("<Spaltenname>")

  • df.apply("<Spaltenname>")

  • df("<Spaltenname>")

Das folgende Beispiel entspricht dem vorherigen Beispiel, verwendet aber die Methode DataFrame.apply, um auf die Spalten in einer Verknüpfungsoperation zu verweisen:

// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.apply method to refer to the columns used in the join.
// Note that dfLhs("key") is shorthand for dfLhs.apply("key").
val dfJoined = dfLhs.join(dfRhs, dfLhs("key") === dfRhs("key")).select(dfLhs("value").as("L"), dfRhs("value").as("R"))
Copy

Verwenden von Kurzschrift für Spaltenobjekte

Alternativ zur Funktion col können Sie auf eine Spalte auch wie folgt verweisen:

  • Verwenden Sie vor dem Spaltennamen in Anführungszeichen ein Dollarzeichen ($"column_name").

  • Verwenden Sie vor dem nicht in Anführungszeichen gesetzten Spaltennamen ein Hochkomma (ein einfaches Anführungszeichen) ('column_name).

Führen Sie dazu nach dem Erstellen eines Session-Objekts einen Import der Namen aus dem implicits-Objekt durch:

val session = Session.builder.configFile("/path/to/properties").create

// Import this after you create the session.
import session.implicits._

// Use the $ (dollar sign) shorthand.
val df = session.table("T").filter($"id" === 10).filter(($"a" + $"b") < 10)

// Use ' (apostrophe) shorthand.
val df = session.table("T").filter('id === 10).filter(('a + 'b) < 10).select('b * 10)
Copy

Verwenden von doppelten Anführungszeichen um Objektbezeichner (Tabellennamen, Spaltennamen usw.)

Die Namen von Datenbanken, Schemas, Tabellen und Stagingbereichen, die Sie angeben, müssen den Snowflake-Anforderungen an Bezeichner entsprechen. Wenn Sie einen Namen angeben, geht Snowflake davon aus, dass der Name in Großbuchstaben geschrieben ist. Daher sind z. B. die folgenden Aufrufe gleichwertig:

// The following calls are equivalent:
df.select(col("id123"))
df.select(col("ID123"))
Copy

Wenn der Name nicht den Anforderungen für Bezeichner entspricht, müssen Sie den Namen in doppelte Anführungszeichen (") setzen. Verwenden Sie einen Backslash (\), um die doppelten Anführungszeichen innerhalb von Scala-Zeichenfolgenliteralen zu maskieren. Der folgende Tabellenname beginnt z. B. nicht mit einem Buchstaben oder einem Unterstrich, sodass Sie den Namen in doppelte Anführungszeichen setzen müssen:

val df = session.table("\"10tablename\"")
Copy

Beachten Sie, dass Sie bei der Angabe des Namens einer Spalte keine doppelten Anführungszeichen um den Namen verwenden müssen. Die Snowpark-Bibliothek schließt Spaltennamen automatisch in doppelte Anführungszeichen ein, wenn der Name nicht den Bezeichneranforderungen entspricht:

// The following calls are equivalent:
df.select(col("3rdID"))
df.select(col("\"3rdID\""))

// The following calls are equivalent:
df.select(col("id with space"))
df.select(col("\"id with space\""))
Copy

Wenn Sie bereits doppelte Anführungszeichen um einen Spaltennamen hinzugefügt haben, fügt die Bibliothek keine weiteren doppelten Anführungszeichen um den Namen ein.

In einigen Fällen kann der Spaltenname doppelte Anführungszeichen enthalten:

describe table quoted;
+------------------------+ ...
| name                   | ...
|------------------------+ ...
| name_with_"air"_quotes | ...
| "column_name_quoted"   | ...
+------------------------+ ...
Copy

Wie unter Anforderungen an Bezeichner erläutert, müssen Sie für jedes doppelte Anführungszeichen innerhalb eines in Anführungszeichen geschriebenen Bezeichners zwei doppelte Anführungszeichen verwenden (z. B. "name_with_""air""_quotes" und """column_name_quoted"""):

val dfTable = session.table("quoted")
dfTable.select("\"name_with_\"\"air\"\"_quotes\"").show()
dfTable.select("\"\"\"column_name_quoted\"\"\"").show()
Copy

Wenn ein Bezeichner in doppelte Anführungszeichen eingeschlossen ist (unabhängig davon, ob Sie die Anführungszeichen explizit hinzugefügt haben oder die Bibliothek die Anführungszeichen für Sie hinzugefügt hat), berücksichtigt Snowflake die Groß-/Kleinschreibung des Bezeichners:

// The following calls are NOT equivalent!
// The Snowpark library adds double quotes around the column name,
// which makes Snowflake treat the column name as case-sensitive.
df.select(col("id with space"))
df.select(col("ID WITH SPACE"))
Copy

Verwenden von Literalen als Spaltenobjekte

Um ein Literal in einer Methode zu verwenden, die ein Column-Objekt übergibt, erstellen Sie ein Column-Objekt für das Literal, indem Sie das Literal an die lit-Funktion im com.snowflake.snowpark.functions-Objekt übergeben. Beispiel:

// Import for the lit and col functions.
import com.snowflake.snowpark.functions._

// Show the first 10 rows in which num_items is greater than 5.
// Use `lit(5)` to create a Column object for the literal 5.
df.filter(col("num_items").gt(lit(5))).show()
Copy

Wenn das Literal in Scala ein Gleitkomma- oder Double-Wert ist (z. B. 0.05 wird standardmäßig als Double verarbeitet), generiert die Snowpark-Bibliothek SQL-Code, der den Wert implizit in den entsprechenden Snowpark-Datentyp umwandelt (z. B. 0.05::DOUBLE). Dies kann zu einem Näherungswert führen, der von der genau angegebenen Zahl abweicht.

Der Code im folgenden Beispiel zeigt keine übereinstimmenden Zeilen an, obwohl der Filter (der für Werte größer oder gleich 0.05 gilt) mit den Zeilen in DataFrame übereinstimmen sollte:

// Create a DataFrame that contains the value 0.05.
val df = session.sql("select 0.05 :: Numeric(5, 2) as a")

// Applying this filter results in no matching rows in the DataFrame.
df.filter(col("a") <= lit(0.06) - lit(0.01)).show()
Copy

Das Problem ist, dass lit(0.06) und lit(0.01) Näherungswerte für 0.06 und 0.01 liefern und nicht die genauen Werte.

Um dieses Problem zu vermeiden, können Sie eine der folgenden Methoden anwenden:

  • Option 1: Wandeln Sie das Literal in den Snowpark-Typ um, den Sie verwenden möchten. Verwenden Sie zum Beispiel einen Datentyp NUMBER mit einer Gesamtstellenzahl (precision) von 5 und einer Dezimalstellenzahl (scale) von 2:

    df.filter(col("a") <= lit(0.06).cast(new DecimalType(5, 2)) - lit(0.01).cast(new DecimalType(5, 2))).show()
    
    Copy
  • Option 2: Wandeln Sie den Wert in den gewünschten Typ umwandeln, bevor der Wert an die Funktion lit übergeben wird. So können Sie zum Beispiel den Typ BigDecimal verwenden:

    df.filter(col("a") <= lit(BigDecimal(0.06)) - lit(BigDecimal(0.01))).show()
    
    Copy

Umwandeln eines Spaltenobjekts in einen bestimmten Typ

Um ein Column-Objekt in einen bestimmten Typ umzuwandeln, rufen Sie die Column.cast-Methode auf, und übergeben Sie ein Typobjekt aus dem com.snowflake.snowpark.types package. So können Sie zum Beispiel ein Literal als NUMBER mit einer Gesamtstellenzahl (precision) von 5 und einer Dezimalstellenzahl (scale) von 2 darstellen:

// Import for the lit function.
import com.snowflake.snowpark.functions._
// Import for the DecimalType class..
import com.snowflake.snowpark.types._

val decimalValue = lit(0.05).cast(new DecimalType(5,2))
Copy

Verketten von Methodenaufrufen

Da jede Methode, die ein DataFrame-Objekt transformiert, ein neues DataFrame-Objekt zurückgibt, auf das die Transformation angewendet wurde, können Sie Methodenaufrufe verketten, um ein neues DataFrame zu erstellen, das auf eine andere Weise transformiert wird.

Im folgenden Beispiel wird ein DataFrame zurückgegeben, der für folgende Aufgaben konfiguriert ist:

  • Abfragen der Tabelle sample_product_data

  • Rückgabe der Zeile mit id = 1

  • Auswahl der Spalten name und serial_number

val dfProductInfo = session.table("sample_product_data").filter(col("id") === 1).select(col("name"), col("serial_number"))
dfProductInfo.show()
Copy

In diesem Beispiel:

  • session.table("sample_product_data") gibt einen DataFrame für die Tabelle sample_product_data zurück.

    Der DataFrame enthält zwar noch nicht die Daten aus der Tabelle, aber das Objekt enthält die Definitionen der Spalten in der Tabelle.

  • filter(col("id") === 1) gibt ein DataFrame für die Tabelle sample_product_data zurück, die so eingerichtet ist, dass sie die Zeile mit id = 1 zurückgibt.

    Beachten Sie erneut, dass der DataFrame noch nicht die übereinstimmende Zeile aus der Tabelle enthält. Die übereinstimmende Zeile wird erst abgerufen, wenn Sie eine Aktionsmethode aufrufen.

  • select(col("name"), col("serial_number")) gibt ein DataFrame zurück, das die Spalten name und serial_number für die Zeile in der Tabelle sample_product_data enthält, die id = 1 hat.

Beachten Sie beim Verketten von Methodenaufrufen, dass die Reihenfolge der Aufrufe wichtig ist. Jeder Methodenaufruf gibt ein DataFrame zurück, das transformiert wurde. Stellen Sie sicher, dass nachfolgende Aufrufe das transformierte DataFrame verwenden.

Im folgenden Beispiel gibt die Methode select ein DataFrame zurück, das nur zwei Spalten enthält: name und serial_number. Der Aufruf der filter-Methode auf diesem DataFrame schlägt fehl, weil dieser die Spalte id verwendet, die im transformierten DataFrame nicht vorhanden ist.

// This fails with the error "invalid identifier 'ID'."
val dfProductInfo = session.table("sample_product_data").select(col("name"), col("serial_number")).filter(col("id") === 1)
Copy

Im Gegensatz dazu wird der folgende Code erfolgreich ausgeführt, weil die filter()-Methode auf einem DataFrame aufgerufen wird, der alle Spalten der Tabelle sample_product_data enthält (einschließlich der Spalte id):

// This succeeds because the DataFrame returned by the table() method
// includes the "id" column.
val dfProductInfo = session.table("sample_product_data").filter(col("id") === 1).select(col("name"), col("serial_number"))
dfProductInfo.show()
Copy

Beachten Sie, dass Sie die Methodenaufrufe select und filter möglicherweise in einer anderen Reihenfolge ausführen müssen als bei Verwendung der entsprechenden Schlüsselwörter (SELECT und WHERE) in einer SQL-Anweisung.

Begrenzen der Anzahl von Zeilen in einem DataFrame

Um die Anzahl der Zeilen in einem DataFrame zu begrenzen, können Sie die Transformationsmethode DataFrame.limit verwenden.

Die Snowpark-API bietet auch folgende Aktionsmethoden zum Abrufen und Ausdrucken einer begrenzten Anzahl von Zeilen:

  • Aktionsmethode DataFrame.first (zum Ausführen von Abfragen und zum Zurückgeben der ersten n Zeilen)

  • Aktionsmethode DataFrame.show (zum Ausführen von Abfragen und zum Drucken der ersten n Zeilen)

Diese Methoden fügen der auszuführenden SQL-Anweisung im Endeffekt eine LIMIT-Klausel hinzu.

Wie in den Nutzungshinweisen zu LIMIT erläutert, sind die Ergebnisse nicht deterministisch, es sei denn, Sie geben in Verbindung mit LIMIT eine Sortierreihenfolge (ORDER BY) an.

Um die ORDER BY-Klausel zusammen mit der LIMIT-Klausel zu verwenden (z. B. damit ORDER BY nicht in einer separaten Unterabfrage steht), müssen Sie die Methode aufrufen, die die Ergebnisse auf den von der sort-Methode zurückgegebenen DataFrame beschränkt.

Wenn Sie beispielsweise Methodenaufrufe verketten:

// Limit the number of rows to 5, sorted by parent_id.
var dfSubset = df.sort(col("parent_id")).limit(5);

// Return the first 5 rows, sorted by parent_id.
var arrayOfRows = df.sort(col("parent_id")).first(5)

// Print the first 5 rows, sorted by parent_id.
df.sort(col("parent_id")).show(5)
Copy

Abrufen von Spaltendefinitionen

Um die Definition der Spalten im Dataset für DataFrame abzurufen, rufen Sie die Methode schema auf. Diese Methode gibt ein StructType-Objekt zurück, das ein Array von StructField-Objekten enthält. Jedes StructField-Objekt enthält die Definition einer Spalte.

// Get the StructType object that describes the columns in the
// underlying rowset.
val tableSchema = session.table("sample_product_data").schema
println("Schema for sample_product_data: " + tableSchema);
Copy

Im zurückgegebenen StructType-Objekt sind die Spaltennamen immer normalisiert. Bezeichner, die nicht mit Anführungszeichen umschlossen sind, werden in Großbuchstaben zurückgegeben. Bezeichner in Anführungszeichen werden genau in der Schreibweise zurückgegeben, in der sie definiert wurden.

Im folgenden Beispiel wird ein DataFrame erstellt, das die Spalten ID und 3rd enthält. Für den Spaltennamen 3rd schließt die Snowpark-Bibliothek den Namen automatisch in doppelte Anführungszeichen ein ("3rd"), da der Name nicht den Anforderungen an einen Bezeichner entspricht.

Im Beispiel wird erst die schema-Methode aufgerufen und dann die names-Methode auf dem zurückgegebenen Objekt StructType, um ein ArraySeq von Spaltennamen zu erhalten. Die Namen werden in dem von der schema-Methode zurückgegebenen StructType normalisiert.

// Create a DataFrame containing the "id" and "3rd" columns.
val dfSelectedColumns = session.table("sample_product_data").select(col("id"), col("3rd"))
// Print out the names of the columns in the schema. This prints out:
//   ArraySeq(ID, "3rd")
println(dfSelectedColumns.schema.names.toSeq)
Copy

Verknüpfen von DataFrames

Zum Verknüpfen (Join) von DataFrame-Objekten rufen Sie die DataFrame-Methode auf:

In den folgenden Abschnitten wird erläutert, wie DataFrames zur Durchführung einer Verknüpfung verwendet werden:

Einrichten der Beispieldaten für die Verknüpfungen

Die Beispiele in den nächsten Abschnitten verwenden Beispieldaten, die Sie durch Ausführen der folgenden SQL-Anweisungen einrichten können:

create or replace table sample_a (
  id_a integer,
  name_a varchar,
  value integer
);
insert into sample_a (id_a, name_a, value) values
  (10, 'A1', 5),
  (40, 'A2', 10),
  (80, 'A3', 15),
  (90, 'A4', 20)
;
create or replace table sample_b (
  id_b integer,
  name_b varchar,
  id_a integer,
  value integer
);
insert into sample_b (id_b, name_b, id_a, value) values
  (4000, 'B1', 40, 10),
  (4001, 'B2', 10, 5),
  (9000, 'B3', 80, 15),
  (9099, 'B4', null, 200)
;
create or replace table sample_c (
  id_c integer,
  name_c varchar,
  id_a integer,
  id_b integer
);
insert into sample_c (id_c, name_c, id_a, id_b) values
  (1012, 'C1', 10, null),
  (1040, 'C2', 40, 4000),
  (1041, 'C3', 40, 4001)
;
Copy

Angeben der Spalten für den Join

Mit der Methode DataFrame.join können Sie die zu verwendenden Spalten auf eine der folgenden Arten angeben:

  • Geben Sie einen Spaltenausdruck an, der die Verknüpfungsbedingung beschreibt.

  • Geben Sie eine oder mehrere Spalten an, die als gemeinsame Spalten im Join verwendet werden sollen.

Im folgenden Beispiel wird eine innere Verknüpfung (Inner Join) für die Spalte mit dem Namen id_a ausgeführt:

// Create a DataFrame that joins the DataFrames for the tables
// "sample_a" and "sample_b" on the column named "id_a".
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"))
dfJoined.show()
Copy

Beachten Sie, dass im Beispiel die Methode DataFrame.col verwendet wird, um die im Join zu verwendenden Spalten anzugeben. Weitere Informationen zu dieser Methode finden Sie unter Angeben von Spalten und Ausdrücken.

Dies gibt die folgende Ausgabe aus:

----------------------------------------------------------------------
|"ID_A"  |"NAME_A"  |"VALUE"  |"ID_B"  |"NAME_B"  |"ID_A"  |"VALUE"  |
----------------------------------------------------------------------
|10      |A1        |5        |4001    |B2        |10      |5        |
|40      |A2        |10       |4000    |B1        |40      |10       |
|80      |A3        |15       |9000    |B3        |80      |15       |
----------------------------------------------------------------------
Copy
Identische Spaltennamen, die im Join-Ergebnis dupliziert sind

In dem DataFrame, der aus einer Join-Verknüpfung resultiert, verwendet die Snowpark-Bibliothek die Spaltennamen, die in den verknüpften Tabellen gefunden wurden, selbst wenn die Spaltennamen in den Tabellen identisch sind. In diesem Fall werden diese Spaltennamen in dem aus der Verknüpfung resultierenden DataFrame dupliziert. Um auf eine duplizierte Spalte über den Namen zuzugreifen, rufen Sie die col-Methode auf dem DataFrame auf, der die Originaltabelle der Spalte repräsentiert. (Weitere Informationen zum Spezifizieren von Spalten finden Sie unter Verweisen auf Spalten in verschiedenen DataFrames.)

Der Code im folgenden Beispiel verknüpft zwei DataFrames und ruft dann die select-Methode auf dem verknüpften DataFrame auf. Die auszuwählenden Spalten werden angegeben, indem die Methode col über die Variable aufgerufen wird, die die entsprechenden DataFrame-Objekte dfRhs und dfLhs repräsentiert. Die as-Methode wird verwendet, um den von der select-Methode erstellten Spalten im DataFrame neue Namen zu geben.

val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"))
val dfSelected = dfJoined.select(dfLhs.col("value").as("LeftValue"), dfRhs.col("value").as("RightValue"))
dfSelected.show()
Copy

Dies gibt die folgende Ausgabe aus:

------------------------------
|"LEFTVALUE"  |"RIGHTVALUE"  |
------------------------------
|5            |5             |
|10           |10            |
|15           |15            |
------------------------------
Copy
Spalten vor dem Speichern oder Zwischenspeichern deduplizieren

Beachten Sie, dass Sie bei einem DataFrame, der aus einer Join-Verknüpfung resultiert und doppelte Spaltennamen enthält, die Spalten deduplizieren oder umbenennen müssen, um die Duplikate aus dem DataFrame zu entfernen, bevor Sie das Ergebnis in einer Tabelle speichern oder den DataFrame zwischenspeichern. Bei doppelten Spaltennamen in einem DataFrame, den Sie in einer Tabelle oder im Cache speichern, ersetzt die Snowpark-Bibliothek die doppelten Spaltennamen durch Aliasnamen, sodass diese nicht mehr doppelt vorkommen.

Das folgende Beispiel veranschaulicht, wie die Ausgabe eines zwischengespeicherten DataFrame aussehen könnte, wenn die Spaltennamen ID_A und VALUE in einer Join-Verknüpfung von zwei Tabellen dupliziert und dann vor dem Zwischenspeichern des Ergebnisses nicht dedupliziert oder umbenannt wurden.

--------------------------------------------------------------------------------------------------
|"l_ZSz7_ID_A"  |"NAME_A"  |"l_ZSz7_VALUE"  |"ID_B"  |"NAME_B"  |"r_heec_ID_A"  |"r_heec_VALUE"  |
--------------------------------------------------------------------------------------------------
|10             |A1        |5               |4001    |B2        |10             |5               |
|40             |A2        |10              |4000    |B1        |40             |10              |
|80             |A3        |15              |9000    |B3        |80             |15              |
--------------------------------------------------------------------------------------------------
Copy

Ausführen einer natürlichen Verknüpfung (Natural Join)

Um eine natürliche Verknüpfung (Natural Join) auszuführen, bei der DataFrames über Spalten mit demselben Namen verknüpft werden, rufen Sie die Methode DataFrame.naturalJoin auf.

Im folgenden Beispiel werden die DataFrames für die Tabellen sample_a und sample_b über ihre gemeinsamen Spalten (die Spalte id_a) verknüpft:

val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.naturalJoin(dfRhs)
dfJoined.show()
Copy

Dies gibt die folgende Ausgabe aus:

---------------------------------------------------
|"ID_A"  |"VALUE"  |"NAME_A"  |"ID_B"  |"NAME_B"  |
---------------------------------------------------
|10      |5        |A1        |4001    |B2        |
|40      |10       |A2        |4000    |B1        |
|80      |15       |A3        |9000    |B3        |
---------------------------------------------------
Copy

Geben Sie den Typ der Verknüpfung an:

Standardmäßig erstellt die Methode DataFrame.join eine innere Verknüpfung (Inner Join). Um einen anderen Verknüpfungstyp anzugeben, verwenden Sie für das Argument joinType einen der folgenden Werte:

Typ der Verknüpfung (Join)

joinType

Innere Verknüpfung (Inner Join)

inner (Standard)

Linke äußere Verknüpfung:

left

Rechte äußere Verknüpfung:

right

Vollständige äußere Verknüpfung:

full

Kreuzverknüpfung (Cross Join)

cross

Beispiel:

// Create a DataFrame that performs a left outer join on
// "sample_a" and "sample_b" on the column named "id_a".
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfLeftOuterJoin = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"), "left")
dfLeftOuterJoin.show()
Copy

Dies gibt die folgende Ausgabe aus:

----------------------------------------------------------------------
|"ID_A"  |"NAME_A"  |"VALUE"  |"ID_B"  |"NAME_B"  |"ID_A"  |"VALUE"  |
----------------------------------------------------------------------
|40      |A2        |10       |4000    |B1        |40      |10       |
|10      |A1        |5        |4001    |B2        |10      |5        |
|80      |A3        |15       |9000    |B3        |80      |15       |
|90      |A4        |20       |NULL    |NULL      |NULL    |NULL     |
----------------------------------------------------------------------
Copy

Mehrere Tabellen verknüpfen

So verknüpfen Sie mehrere Tabellen:

  1. Erstellen Sie einen DataFrame für jede Tabelle.

  2. Rufen Sie die Methode DataFrame.join für den ersten DataFrame auf, und übergeben Sie den zweiten DataFrame.

  3. Rufen Sie mit den von der Methode join zurückgegebenen DataFrame die Methode join auf und übergeben Sie den dritten DataFrame.

Sie können die join-Aufrufe wie unten gezeigt verketten:

val dfFirst = session.table("sample_a")
val dfSecond  = session.table("sample_b")
val dfThird = session.table("sample_c")
val dfJoinThreeTables = dfFirst.join(dfSecond, dfFirst.col("id_a") === dfSecond.col("id_a")).join(dfThird, dfFirst.col("id_a") === dfThird.col("id_a"))
dfJoinThreeTables.show()
Copy

Dies gibt die folgende Ausgabe aus:

------------------------------------------------------------------------------------------------------------
|"ID_A"  |"NAME_A"  |"VALUE"  |"ID_B"  |"NAME_B"  |"ID_A"  |"VALUE"  |"ID_C"  |"NAME_C"  |"ID_A"  |"ID_B"  |
------------------------------------------------------------------------------------------------------------
|10      |A1        |5        |4001    |B2        |10      |5        |1012    |C1        |10      |NULL    |
|40      |A2        |10       |4000    |B1        |40      |10       |1040    |C2        |40      |4000    |
|40      |A2        |10       |4000    |B1        |40      |10       |1041    |C3        |40      |4001    |
------------------------------------------------------------------------------------------------------------
Copy

Ausführen einer Selbstverknüpfung (Self-Join)

Wenn Sie eine Tabelle mit sich selbst über verschiedene Spalten verknüpfen müssen, können Sie die Selbstverknüpfung (Self-Join) nicht mit einem einzelnen DataFrame durchführen. Im folgenden Beispiel wird ein einzelnes DataFrame in einer Selbstverknüpfung verwendet. Diese Ausführung schlägt fehl, weil die Spaltenausdrücke für "id" sowohl auf der linken als auch der rechten Seite der Verknüpfung vorhanden sind:

// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
val df = session.table("sample_product_data");
val dfJoined = df.join(df, col("id") === col("parent_id"))
Copy
// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
val df = session.table("sample_product_data");
val dfJoined = df.join(df, df("id") === df("parent_id"))
Copy

Beide Beispiele scheitern mit der folgenden Ausnahme:

Exception in thread "main" com.snowflake.snowpark.SnowparkClientException:
  Joining a DataFrame to itself can lead to incorrect results due to ambiguity of column references.
  Instead, join this DataFrame to a clone() of itself.
Copy

Verwenden Sie stattdessen die Methode DataFrame.clone, um einen Klon des DataFrame-Objekts zu erstellen, und verwenden Sie dann die beiden DataFrame-Objekte für die Verknüpfung:

// Create a DataFrame object for the "sample_product_data" table for the left-hand side of the join.
val dfLhs = session.table("sample_product_data")
// Clone the DataFrame object to use as the right-hand side of the join.
val dfRhs = dfLhs.clone()

// Create a DataFrame that joins the two DataFrames
// for the "sample_product_data" table on the
// "id" and "parent_id" columns.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id") === dfRhs.col("parent_id"))
dfJoined.show()
Copy

Wenn Sie eine Selbstverknüpfung auf derselben Spalte ausführen möchten, rufen Sie die join-Methode auf, die eine Seq von Spaltenausdrücken für die USING-Klausel übergibt:

// Create a DataFrame that performs a self-join on a DataFrame
// using the column named "key".
val df = session.table("sample_product_data");
val dfJoined = df.join(df, Seq("key"))
Copy

Ausführen einer Aktion zum Auswerten eines DataFrame

Wie bereits erwähnt, wird der DataFrame im Lazy-Modus ausgewertet, d. h. die SQL-Anweisung wird erst dann zur Ausführung an den Server gesendet, wenn Sie eine Aktion ausführen. Eine Aktion löst die Auswertung des DataFrame aus und sendet die entsprechende SQL-Anweisung zur Ausführung an den Server.

In den folgenden Abschnitten wird erläutert, wie eine Aktion auf einem DataFrame synchron und asynchron ausgeführt werden kann:

Synchrones Ausführen einer Aktion

Um eine Aktion synchron auszuführen, rufen Sie eine der folgenden Aktionsmethoden auf:

Methode zur synchronen Ausführung einer Aktion

Beschreibung

DataFrame.collect

Wertet den DataFrame aus und gibt das resultierende Dataset als ein Array von Row-Objekten zurück. Siehe Alle Zeilen zurückgeben.

DataFrame.toLocalIterator

Wertet den DataFrame aus und gibt einen Iterator für Row-Objekte zurück. Verwenden Sie diese Methode bei einem großen Resultset, um zu vermeiden, dass alle Ergebnisse auf einmal in den Arbeitsspeicher geladen werden. Siehe Zurückgeben eines Iterators für die Zeilen.

DataFrame.count

Wertet den DataFrame aus und gibt die Anzahl der Zeilen zurück.

DataFrame.show

Wertet den DataFrame aus und gibt die Zeilen auf der Konsole aus. Beachten Sie, dass bei dieser Methode die Anzahl der Zeilen (standardmäßig) auf 10 beschränkt ist. Siehe Ausgeben der Zeilen eines DataFrame.

DataFrame.cacheResult

Führt die Abfrage aus, erstellt eine temporäre Tabelle und füllt die Tabelle mit den Ergebnissen. Die Methode gibt ein HasCachedResult-Objekt zurück, das Sie für den Zugriff auf die Daten in dieser temporären Tabelle verwenden können. Siehe Zwischenspeichern eines DataFrame.

DataFrame.write.saveAsTable

Speichert die im DataFrame enthaltenen Daten in die angegebene Tabelle. Siehe Speichern von Daten in eine Tabelle.

DataFrame.write.(csv |json| parquet)

Speichert einen DataFrame in einer angegebenen Datei in einem Stagingbereich. Siehe Speichern eines DataFrame in Dateien in einem Stagingbereich.

DataFrame.read.fileformat.copyInto('tableName')

Kopiert die im DataFrame enthaltenen Daten in die angegebene Tabelle. Siehe Kopieren von Daten aus Dateien in eine Tabelle.

Session.table('tableName').delete

Löscht Zeilen aus der angegebenen Tabelle. Siehe Aktualisieren, Löschen und Zusammenführen von Zeilen in einer Tabelle.

Session.table('tableName').update

Aktualisiert Zeilen in der angegebenen Tabelle. Siehe Aktualisieren, Löschen und Zusammenführen von Zeilen in einer Tabelle.

Session.table('tableName').merge.methods.collect

Führt Zeilen in der angegebenen Tabelle zusammen. Siehe Aktualisieren, Löschen und Zusammenführen von Zeilen in einer Tabelle.

Um die Abfrage auszuführen und die Anzahl der Ergebnisse zurückzugeben, rufen Sie die count-Methode auf:

// Create a DataFrame for the "sample_product_data" table.
val dfProducts = session.table("sample_product_data")

// Send the query to the server for execution and
// print the count of rows in the table.
println("Rows returned: " + dfProducts.count())
Copy

Sie können auch Aktionsmethoden aufrufen, um Folgendes zu tun:

Hinweis: Wenn Sie die schema-Methode aufrufen, um die Definitionen der Spalten im DataFrame zu erhalten, müssen Sie keine Aktionsmethode aufrufen.

Asynchrones Ausführen einer Aktion

Bemerkung

Dieses Feature wurde in Snowpark 0.11.0 eingeführt.

Zur asynchronen Ausführung einer Aktion müssen Sie die Methode async aufrufen, um ein Objekt für den „asynchronen Akteur“ (z. B. DataFrameAsyncActor) zurückzugeben, und dann eine asynchrone Aktionsmethode in diesem Objekt aufrufen.

Diese Aktionsmethoden eines asynchronen Akteursobjekts geben ein TypedAsyncJob-Objekt zurück, das Sie verwenden können, um den Status der asynchronen Aktion zu überprüfen und die Ergebnisse der Aktion abzurufen.

In den nächsten Abschnitten wird erläutert, wie Sie Aktionen asynchron ausführen und die Ergebnisse überprüfen können.

Erläuterungen zum grundlegenden Ablauf asynchroner Aktionen

Sie können die folgenden Methoden verwenden, um eine Aktion asynchron auszuführen:

Methode zur asynchronen Ausführung einer Aktion

Beschreibung

DataFrame.async.collect

Wertet den DataFrame asynchron aus, um das resultierende Dataset als Array von Row-Objekten abzurufen. Siehe Alle Zeilen zurückgeben.

DataFrame.async.toLocalIterator

Wertet den DataFrame asynchron aus, um einen Iterator für Row-Objekte abzurufen. Verwenden Sie diese Methode bei einem großen Resultset, um zu vermeiden, dass alle Ergebnisse auf einmal in den Arbeitsspeicher geladen werden. Siehe Zurückgeben eines Iterators für die Zeilen.

DataFrame.async.count

Wertet den DataFrame asynchron aus, um die Anzahl der Zeilen abzurufen.

DataFrame.write.async.saveAsTable

Speichert die im DataFrame enthaltenen Daten asynchron in die angegebene Tabelle. Siehe Speichern von Daten in eine Tabelle.

DataFrame.write.async.(csv |json| parquet)

Speichert einen DataFrame in einer angegebenen Datei in einem Stagingbereich. Siehe Speichern eines DataFrame in Dateien in einem Stagingbereich.

DataFrame.read.fileformat.async.copyInto('tableName')

Kopiert die im DataFrame enthaltenen Daten asynchron in die angegebene Tabelle. Siehe Kopieren von Daten aus Dateien in eine Tabelle.

Session.table('tableName').async.delete

Löscht Zeilen asynchron aus der angegebenen Tabelle. Siehe Aktualisieren, Löschen und Zusammenführen von Zeilen in einer Tabelle.

Session.table('tableName').async.update

Aktualisiert Zeilen asynchron in der angegebenen Tabelle. Siehe Aktualisieren, Löschen und Zusammenführen von Zeilen in einer Tabelle.

Session.table('tableName').merge.methods.async.collect

Führt Zeilen in der angegebenen Tabelle asynchron zusammen. Unterstützt in Version 1.3.0 oder höher. Siehe Aktualisieren, Löschen und Zusammenführen von Zeilen in einer Tabelle.

Mit dem zurückgegebenen Objekt TypedAsyncJob können Sie Folgendes tun:

  • Um festzustellen, ob die Aktion abgeschlossen ist, rufen Sie die Methode isDone auf.

  • Um die Abfrage-ID der zugehörigen Aktion zu erhalten, rufen Sie die Methode getQueryId auf.

  • Um die Ergebnisse der Aktion zurückzugeben (z. B. das Array von Row-Objekten für die Methode collect oder die Anzahl der Zeilen für die Methode count), rufen Sie die Methode getResult auf.

    Beachten Sie, dass getResult ein blockierender Aufruf ist.

  • Um die Aktion abzubrechen, rufen Sie die Methode cancel auf.

Um zum Beispiel eine Abfrage asynchron auszuführen und die Ergebnisse als Array von Row-Objekten abzurufen, rufen Sie DataFrame.async.collect auf:

// Create a DataFrame with the "id" and "name" columns from the "sample_product_data" table.
// This does not execute the query.
val df = session.table("sample_product_data").select(col("id"), col("name"))

// Execute the query asynchronously.
// This call does not block.
val asyncJob = df.async.collect()
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// Get an Array of Rows containing the results, and print the results.
// Note that getResult is a blocking call.
val results = asyncJob.getResult()
results.foreach(println)
Copy

Um die Abfrage asynchron auszuführen und die Anzahl der Ergebnisse abzurufen, rufen Sie DataFrame.async.count auf:

// Create a DataFrame for the "sample_product_data" table.
val dfProducts = session.table("sample_product_data")

// Execute the query asynchronously.
// This call does not block.
val asyncJob = df.async.count()
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// Print the count of rows in the table.
// Note that getResult is a blocking call.
println("Rows returned: " + asyncJob.getResult())
Copy

Festlegen der maximalen Anzahl der zu wartenden Sekunden

Beim Aufruf der Methode getResult können Sie mit dem Argument maxWaitTimeInSeconds die maximale Anzahl von Sekunden angeben, die auf den Abschluss der Abfrage gewartet werden soll, bevor versucht wird, die Ergebnisse abzurufen. Beispiel:

// Wait a maximum of 10 seconds for the query to complete before retrieving the results.
val results = asyncJob.getResult(10)
Copy

Wenn Sie dieses Argument weglassen, wartet die Methode auf die maximale Anzahl von Sekunden, die in der Konfigurationseigenschaft snowpark_request_timeout_in_seconds angegeben ist. (Dies ist eine Eigenschaft, die Sie beim Erstellen des Sitzungsobjekts festlegen können.)

Zugriff auf eine asynchrone Abfrage über die ID

Mithilfe der Abfrage-ID einer zuvor übermittelten asynchronen Abfrage können Sie die Methode Session.createAsyncJob aufrufen, um ein AsyncJob-Objekt zu erstellen. Mithilfe dieses Objekts können Sie dann den Status der Abfrage überprüfen, die Abfrageergebnisse abrufen oder die Abfrage abbrechen.

Beachten Sie, dass AsyncJob im Gegensatz zu TypedAsyncJob keine getResult-Methode zum Abrufen der Ergebnisse bereitstellt. Wenn Sie die Ergebnisse abrufen müssen, verwenden Sie stattdessen die Methode getRows oder getIterator.

Beispiel:

val asyncJob = session.createAsyncJob(myQueryId)
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// If you need to retrieve the results, call getRows to return an Array of Rows containing the results.
// Note that getRows is a blocking call.
val rows = asyncJob.getRows()
rows.foreach(println)
Copy

Abrufen von Zeilen in einem DataFrame

Nachdem Sie angegeben haben, wie der DataFrame transformiert werden soll, können Sie eine Aktionsmethode aufrufen, um eine Abfrage auszuführen und die Ergebnisse zurückzugeben. Sie können alle Zeilen in einem Array zurückgeben, oder Sie können einen Iterator zurückgeben, mit dem Sie die Ergebnisse Zeile für Zeile durchlaufen können. In letzterem Fall werden bei großen Datenmengen die Zeilen blockweise in den Arbeitsspeicher geladen, um das Laden einer großen Datenmenge in den Arbeitsspeicher zu vermeiden.

Alle Zeilen zurückgeben

Um alle Zeilen auf einmal zurückzugeben, rufen Sie die Methode DataFrame.collect auf. Diese Methode gibt ein Array von Row-Objekten zurück. Um die Werte aus der Zeile abzurufen, rufen Sie die Methode getType auf, wobei „Type“ der Datentyp ist (z. B. getString, getInt usw.).

Beispiel:

import com.snowflake.snowpark.functions_

val rows = session.table("sample_product_data").select(col("name"), col("category_id")).sort(col("name")).collect()
for (row <- rows) {
  println(s"Name: ${row.getString(0)}; Category ID: ${row.getInt(1)}")
}
Copy

Zurückgeben eines Iterators für die Zeilen

Wenn Sie einen Iterator verwenden möchten, um die Row-Objekte in den Ergebnissen zeilenweise zu durchlaufen, rufen Sie DataFrame.toLocalIterator auf. Wenn die Datenmenge in den Ergebnissen groß ist, lädt die Methode die Zeilen blockweise, um zu vermeiden, dass alle Zeilen auf einmal in den Arbeitsspeicher geladen werden.

Beispiel:

import com.snowflake.snowpark.functions_

while (rowIterator.hasNext) {
  val row = rowIterator.next()
  println(s"Name: ${row.getString(0)}; Category ID: ${row.getInt(1)}")
}
Copy

Zurückgeben der ersten n Zeilen

Um die ersten n Zeilen zurückzugeben, rufen Sie die Methode DataFrame.first auf und übergeben die Anzahl der zurückzugebenden Zeilen.

Wie unter Begrenzen der Anzahl von Zeilen in einem DataFrame erläutert, sind die Ergebnisse nicht deterministisch. Wenn die Ergebnisse deterministisch sein sollen, müssen Sie diese Methode auf einem sortierten DataFrame (df.sort().first()) aufrufen.

Beispiel:

import com.snowflake.snowpark.functions_

val df = session.table("sample_product_data")
val rows = df.sort(col("name")).first(5)
rows.foreach(println)
Copy

Ausgeben der Zeilen eines DataFrame

Um die ersten 10 Zeilen des DataFrame auf der Konsole auszugeben, rufen Sie die Methode DataFrame.show auf. Um eine andere Anzahl von Zeilen auszugeben, übergeben Sie die Anzahl der auszugebenden Zeilen.

Wie unter Begrenzen der Anzahl von Zeilen in einem DataFrame erläutert, sind die Ergebnisse nicht deterministisch. Wenn die Ergebnisse deterministisch sein sollen, müssen Sie diese Methode auf einem sortierten DataFrame (df.sort().show()) aufrufen.

Beispiel:

import com.snowflake.snowpark.functions_

val df = session.table("sample_product_data")
df.sort(col("name")).show()
Copy

Aktualisieren, Löschen und Zusammenführen von Zeilen in einer Tabelle

Bemerkung

Dieses Feature wurde in Snowpark 0.7.0 eingeführt.

Wenn Sie Session.table aufrufen, um ein DataFrame-Objekt für eine Tabelle zu erstellen, gibt die Methode ein Updatable-Objekt zurück, das DataFrame um zusätzliche Methoden zum Aktualisieren und Löschen von Daten in der Tabelle erweitert. (Siehe Updatable.)

Wenn Sie Zeilen in einer Tabelle aktualisieren oder löschen müssen, können Sie die folgenden Methoden der Klasse Updatable verwenden:

Aktualisieren von Zeilen in einer Tabelle

Übergeben Sie der Methode update ein Map-Objekt, das die zu aktualisierenden Spalten und die entsprechenden Werte verknüpft, die diesen Spalten zugewiesen werden sollen. update gibt ein UpdateResult-Objekt zurück, das die Anzahl der aktualisierten Zeilen enthält. (Siehe UpdateResult.)

Bemerkung

update ist eine Aktionsmethode, was bedeutet, dass bei Aufruf der Methode die SQL-Anweisungen zur Ausführung an den Server gesendet werden.

Im folgenden Beispiel werden die Werte in der Spalte mit dem Namen count durch den Wert 1 ersetzt:

val updatableDf = session.table("sample_product_data")
val updateResult = updatableDf.update(Map("count" -> lit(1)))
println(s"Number of rows updated: ${updateResult.rowsUpdated}")
Copy

Im obigen Beispiel wird der Name der Spalte verwendet, um die Spalte zu identifizieren. Sie können auch einen Spaltenausdruck verwenden:

val updateResult = updatableDf.update(Map(col("count") -> lit(1)))
Copy

Wenn die Aktualisierung nur erfolgen werden soll, wenn eine Bedingung erfüllt ist, können Sie diese Bedingung als Argument angeben. Im folgenden Beispiel werden die Werte in der Spalte mit dem Namen count für Zeilen ersetzt, in denen die Spalte category_id den Wert 20 hat:

val updateResult = updatableDf.update(Map(col("count") -> lit(1)), col("category_id") === 20)
Copy

Wenn die Bedingung auf einer Verknüpfung (Join) mit einem anderen DataFrame-Objekt basieren soll, können Sie diesen DataFrame als Argument übergeben und diesen DataFrame in der Bedingung verwenden. Im folgenden Beispiel werden die Werte in der Spalte mit dem Namen count bei Zeilen ersetzt, in denen die Spalte category_id mit der category_id in der DataFrame-Spalte dfParts übereinstimmt:

val updatableDf = session.table("sample_product_data")
val dfParts = session.table("parts")
val updateResult = updatableDf.update(Map(col("count") -> lit(1)), updatableDf("category_id") === dfParts("category_id"), dfParts)
Copy

Löschen von Zeilen aus einer Tabelle

Sie können für die Methode delete eine Bedingung angeben, die die zu löschenden Zeilen identifiziert, und diese Bedingung kann auf einer Verknüpfung (Join) mit einem anderen DataFrame basieren. delete gibt ein DeleteResult-Objekt zurück, das die Anzahl der gelöschten Zeilen enthält. (Siehe DeleteResult.)

Bemerkung

delete ist eine Aktionsmethode, was bedeutet, dass bei Aufruf der Methode die SQL-Anweisungen zur Ausführung an den Server gesendet werden.

Im folgenden Beispiel werden Zeilen gelöscht, die den Wert 1 in der Spalte category_id haben:

val updatableDf = session.table("sample_product_data")
val deleteResult = updatableDf.delete(updatableDf("category_id") === 1)
println(s"Number of rows deleted: ${deleteResult.rowsDeleted}")
Copy

Wenn sich die Bedingung auf Spalten in einem anderen DataFrame bezieht, übergeben Sie dieses DataFrame als zweites Argument. Im folgenden Beispiel werden die Zeilen gelöscht, in denen die Spalte category_id mit der Spalte category_id des DataFrame mit dem Namen dfParts übereinstimmt, wobei dfParts als zweites Argument übergeben wird:

val updatableDf = session.table("sample_product_data")
val deleteResult = updatableDf.delete(updatableDf("category_id") === dfParts("category_id"), dfParts)
println(s"Number of rows deleted: ${deleteResult.rowsDeleted}")
Copy

Zusammenführen (Merge) von Zeilen in einer Tabelle

Wenn Sie Zeilen einer Tabelle einfügen, aktualisieren oder löschen möchten, die auf Werten einer zweiten Tabelle oder einer Unterabfrage basieren (das Äquivalent des MERGE-Befehls in SQL), gehen Sie wie folgt vor:

  1. Rufen Sie im Updatable-Objekt der Tabelle, in der die Daten zusammengeführt werden sollen, die Methode merge auf, und übergeben Sie dabei das DataFrame-Objekt für die andere Tabelle und den Spaltenausdruck für die Verknüpfungsbedingung.

    Dies gibt ein MergeBuilder-Objekt zurück, das Sie verwenden können, um die Aktionen (z. B. Einfügen, Aktualisieren, Löschen) für die übereinstimmenden und die nicht übereinstimmenden Zeilen anzugeben. (Siehe MergeBuilder.)

  2. Verwendung des Objekts MergeBuilder:

    • Um die Aktualisierung oder Löschung von übereinstimmenden Zeilen anzugeben, rufen Sie die Methode whenMatched auf.

      Wenn Sie eine zusätzliche Bedingung angeben müssen, ob Zeilen aktualisiert oder gelöscht werden sollen, können Sie einen Spaltenausdruck für diese Bedingung übergeben.

      Diese Methode gibt ein MatchedClauseBuilder-Objekt zurück, das Sie zur Angabe der auszuführenden Aktion verwenden können. (Siehe MatchedClauseBuilder.)

      Rufen Sie die Methode update oder delete im Objekt MatchedClauseBuilder auf, um die Aktualisierungs- oder Löschaktion anzugeben, die für die übereinstimmenden Zeilen durchgeführt werden soll. Diese Methoden geben ein MergeBuilder-Objekt zurück, das Sie zur Angabe zusätzlicher Klauseln verwenden können.

    • Um die Einfügung anzugeben, die ausgeführt werden soll, wenn Zeilen nicht übereinstimmen, rufen Sie die Methode whenNotMatched auf.

      Wenn Sie für das Einfügen einer Zeile eine zusätzliche Bedingung angeben müssen, können Sie einen Spaltenausdruck für diese Bedingung übergeben.

      Diese Methode gibt ein NotMatchedClauseBuilder-Objekt zurück, das Sie zur Angabe der auszuführenden Aktion verwenden können. (Siehe NotMatchedClauseBuilder.)

      Rufen Sie die Methode insert im Objekt NotMatchedClauseBuilder auf, um die Einfügeaktion festzulegen, die durchgeführt werden soll, wenn Zeilen nicht übereinstimmen. Diese Methode gibt ein MergeBuilder-Objekt zurück, das Sie zur Angabe zusätzlicher Klauseln verwenden können.

  3. Wenn Sie die auszuführenden Einfügungen, Aktualisierungen und Löschungen angegeben haben, rufen Sie die Methode collect des Objekts MergeBuilder auf, um die angegebenen Einfügungen, Aktualisierungen und Löschungen in der Tabelle auszuführen.

    collect gibt ein MergeResult-Objekt zurück, das die Anzahl der Zeilen enthält, die eingefügt, aktualisiert und gelöscht wurden. (Siehe MergeResult.)

Das folgende Beispiel fügt eine Zeile mit den Spalten id und value aus der Tabelle source in die Tabelle target ein, wenn die Tabelle target keine Zeile mit einer übereinstimmenden ID enthält:

val mergeResult = target.merge(source, target("id") === source("id"))
                      .whenNotMatched.insert(Seq(source("id"), source("value")))
                      .collect()
Copy

Im folgenden Beispiel wird eine Zeile in der Tabelle target mit dem Wert der Spalte value aus der Zeile in der Tabelle source aktualisiert, die dieselbe ID hat:

val mergeResult = target.merge(source, target("id") === source("id"))
                      .whenMatched.update(Map("value" -> source("value")))
                      .collect()
Copy

Speichern von Daten in eine Tabelle

Sie können den Inhalt eines DataFrame in eine neue oder bestehende Tabelle speichern. Dazu müssen Sie über die folgenden Berechtigungen verfügen:

  • CREATE TABLE-Berechtigungen für das Schema, wenn die Tabelle nicht vorhanden ist.

  • INSERT-Berechtigungen für die Tabelle.

So speichern Sie den Inhalt eines DataFrame in eine Tabelle:

  1. Rufen Sie die Methode DataFrame.write auf, um ein DataFrameWriter-Objekt zu erhalten.

  2. Rufen Sie die Methode DataFrameWriter.mode auf, und übergeben Sie dabei ein SaveMode-Objekt, das Ihre Präferenzen für das Schreiben in die Tabelle angibt:

    • Um Zeilen einzufügen, übergeben Sie SaveMode.Append.

    • Um die vorhandene Tabelle zu überschreiben, übergeben Sie SaveMode.Overwrite.

    Diese Methode gibt das gleiche DataFrameWriter-Objekt zurück, das mit dem angegebenen Modus konfiguriert wurde.

  3. Wenn Sie Zeilen in eine bestehende Tabelle einfügen (SaveMode.Append) und die Spaltennamen im DataFrame mit den Spaltennamen in der Tabelle übereinstimmen, rufen Sie die DataFrameWriter.option-Methode auf und geben Sie "columnOrder" und "name" als Argumente an.

    Bemerkung

    Diese Methode wurde in Snowpark 1.4.0 eingeführt.

    Standardmäßig ist die Option columnOrder auf "index" gesetzt, was bedeutet, dass DataFrameWriter die Werte in der Reihenfolge einfügt, in der die Spalten erscheinen. So fügt DataFrameWriter beispielsweise den Wert aus der ersten Spalte des DataFrame in die erste Spalte der Tabelle ein, die zweite Spalte des DataFrame in die zweite Spalte der Tabelle usw.

    Diese Methode gibt das gleiche DataFrameWriter-Objekt zurück, das mit der angegebenen Option konfiguriert wurde.

  4. Rufen Sie die Methode DataFrameWriter.saveAsTable auf, um den Inhalt des DataFrame in die angegebene Tabelle zu speichern.

    Sie müssen keine separate Methode (z. B. collect) aufrufen, um die SQL-Anweisung auszuführen, mit der Daten in die Tabelle gespeichert werden. saveAsTable ist eine Aktionsmethode, die SQL-Anweisungen ausführt.

Im folgenden Beispiel wird eine bestehende Tabelle (identifiziert durch die Variable tableName) mit dem Inhalt von DataFrame df überschrieben:

df.write.mode(SaveMode.Overwrite).saveAsTable(tableName)
Copy

Im folgenden Beispiel wird eine Zeile aus dem DataFrame mit dem Namen df in die bestehende Tabelle (identifiziert durch die Variable tableName) eingefügt. In diesem Beispiel enthalten die Tabelle und das DataFrame beide die Spalten c1 und c2.

Das Beispiel zeigt den Unterschied zwischen der Einstellung der Option columnOrder auf "name" (fügt Werte in die Tabellenspalten mit denselben Namen wie die DataFrame-Spalten ein) und der Verwendung der Standardoption columnOrder (fügt Werte in die Tabellenspalten basierend auf der Reihenfolge der Spalten in DataFrame ein).

val df = session.sql("SELECT 1 AS c2, 2 as c1")
// With the columnOrder option set to "name", the DataFrameWriter uses the column names
// and inserts a row with the values (2, 1).
df.write.mode(SaveMode.Append).option("columnOrder", "name").saveAsTable(tableName)
// With the default value of the columnOrder option ("index"), the DataFrameWriter the uses column positions
// and inserts a row with the values (1, 2).
df.write.mode(SaveMode.Append).saveAsTable(tableName)
Copy

Erstellen einer Ansicht aus einem DataFrame

Um eine Ansicht aus einem DataFrame zu erstellen, rufen Sie die Methode DataFrame.createOrReplaceView auf:

df.createOrReplaceView("db.schema.viewName")
Copy

Beachten Sie, dass bei Aufruf von createOrReplaceView die neue Ansicht sofort erstellt wird. Noch wichtiger ist, dass der Aufruf nicht dazu führt, dass der DataFrame ausgewertet wird. (Der DataFrame selbst wird erst ausgewertet, wenn Sie eine Aktion ausführen.)

Ansichten, die Sie durch Aufruf von createOrReplaceView erstellen, sind persistent. Wenn Sie diese Ansicht nicht mehr benötigen, können Sie die Ansicht manuell löschen.

Wenn Sie eine temporäre Ansicht nur für die Sitzung erstellen müssen, rufen Sie stattdessen die Methode DataFrame.createOrReplaceTempView auf:

df.createOrReplaceTempView("db.schema.viewName")
Copy

Zwischenspeichern eines DataFrame

In einigen Fällen müssen Sie eine komplexe Abfrage ausführen und die Ergebnisse für spätere Operationen aufbewahren (um nicht dieselbe Abfrage erneut ausführen zu müssen). In diesen Fällen können Sie den Inhalt eines DataFrame zwischenspeichern, indem Sie die Methode DataFrame.cacheResult aufrufen.

Diese Methode ermöglicht Folgendes:

  • Ausführen der Abfrage.

    Es ist nicht erforderlich, vor dem Aufruf von cacheResult eine separate Aktionsmethode aufzurufen, um die Ergebnisse abzurufen. cacheResult ist eine Aktionsmethode, die die Abfrage ausführt.

  • Speichern der Ergebnisse in einer temporären Tabelle.

    Da cacheResult eine temporäre Tabelle erstellt, müssen Sie über die CREATE TABLE-Berechtigung für das verwendete Schema verfügen.

  • Zurückgeben eines HasCachedResult-Objekts, das den Zugriff auf die Ergebnisse in der temporären Tabelle ermöglicht.

    Da HasCachedResult den DataFrame erweitert, können Sie auf diesen zwischengespeicherten Daten einige der Operationen ausführen, die auch auf dem DataFrame ausgeführt werden können.

Bemerkung

Da cacheResult die Abfrage ausführt und die Ergebnisse in einer Tabelle speichert, kann diese Methode zu erhöhten Compute- und Speicherkosten führen.

Beispiel:

import com.snowflake.snowpark.functions_

// Set up a DataFrame to query a table.
val df = session.table("sample_product_data").filter(col("category_id") > 10)
// Retrieve the results and cache the data.
val cachedDf = df.cacheResult()
// Create a DataFrame containing a subset of the cached data.
val dfSubset = cachedDf.filter(col("category_id") === lit(20)).select(col("name"), col("category_id"))
dfSubset.show()
Copy

Beachten Sie, dass der ursprüngliche DataFrame durch den Aufruf dieser Methode nicht beeinflusst wird. Angenommen, dfTable ist ein DataFrame der Tabelle sample_product_data:

val dfTempTable = dfTable.cacheResult()
Copy

Nach Aufruf von cacheResult verweist dfTable immer noch auf die Tabelle sample_product_data, und Sie können dfTable weiterhin zur Abfrage und Aktualisierung dieser Tabelle verwenden.

Für die Weiterverarbeitung der in der temporären Tabelle zwischengespeicherten Daten verwenden Sie dfTempTable (das von cacheResult zurückgegebene HasCachedResult-Objekt).

Verwenden von Dateien in Stagingbereichen

Die Snowpark-Bibliothek bietet Klassen und Methoden, mit denen Sie Daten in Snowflake laden und Daten aus Snowflake entladen können, indem Sie Dateien in Stagingbereichen verwenden.

Bemerkung

Um diese Klassen und Methoden auf einem Stagingbereich anwenden zu können, müssen Sie über die erforderlichen Berechtigungen für die Verwendung des Stagingbereichs verfügen.

In den nächsten Abschnitten wird erläutert, wie diese Klassen und Methoden verwendet werden:

Hochladen und Herunterladen von Dateien in Stagingbereichen

Zum Hoch- und Herunterladen von Dateien in Stagingbereichen verwenden Sie das Objekt FileOperation:

Hochladen von Dateien in einen Stagingbereich

So laden Sie Dateien in einen Stagingbereich hoch:

  1. Überprüfen Sie, ob Sie über die erforderlichen Berechtigungen zum Hochladen von Dateien in den Stagingbereich verfügen.

  2. Verwenden Sie Session.file, um auf das FileOperation-Objekt für die Sitzung zuzugreifen.

  3. Rufen Sie die Methode FileOperation.put auf, um die Dateien in den Stagingbereich hochzuladen.

    Diese Methode führt den SQL-Befehl PUT aus.

    • Um alle optionalen Parameter für den PUT-Befehl anzugeben, erstellen Sie eine Map der Parameter und Werte und übergeben die Map als options-Argument. Beispiel:

      // Upload a file to a stage without compressing the file.
      val putOptions = Map("AUTO_COMPRESS" -> "FALSE")
      val putResults = session.file.put("file:///tmp/myfile.csv", "@myStage", putOptions)
      
      Copy
    • Im Argument localFilePath können Sie Platzhalter (* und ?) verwenden, um einen Satz von Dateien zu identifizieren, die hochgeladen werden sollen. Beispiel:

      // Upload the CSV files in /tmp with names that start with "file".
      // You can use the wildcard characters "*" and "?" to match multiple files.
      val putResults = session.file.put("file:///tmp/file*.csv", "@myStage/prefix2")
      
      Copy
  4. Überprüfen Sie das Array von PutResult-Objekten, die von der put-Methode zurückgegeben werden, um festzustellen, ob die Dateien erfolgreich hochgeladen wurden. Geben Sie zum Beispiel den Dateinamen und den Status der PUT-Operation für diese Datei aus:

    // Print the filename and the status of the PUT operation.
    putResults.foreach(r => println(s"  ${r.sourceFileName}: ${r.status}"))
    
    Copy

Herunterladen von Dateien aus Stagingbereichen

So laden Sie Dateien aus einem Stagingbereich herunter:

  1. Überprüfen Sie, ob Sie über die erforderlichen Berechtigungen zum Herunterladen von Dateien aus dem Stagingbereich verfügen.

  2. Verwenden Sie Session.file, um auf das FileOperation-Objekt für die Sitzung zuzugreifen.

  3. Rufen Sie die Methode FileOperation.get auf, um die Dateien aus einem Stagingbereich herunterzuladen.

    Diese Methode führt den SQL-Befehl GET aus.

    Um alle optionalen Parameter für den GET-Befehl anzugeben, erstellen Sie eine Map der Parameter und Werte und übergeben die Map als options-Argument. Beispiel:

    // Download files with names that match a regular expression pattern.
    val getOptions = Map("PATTERN" -> s"'.*file_.*.csv.gz'")
    val getResults = session.file.get("@myStage", "file:///tmp", getOptions)
    
    Copy
  4. Überprüfen Sie das Array von GetResult-Objekten, die von der get-Methode zurückgegeben werden, um festzustellen, ob die Dateien erfolgreich heruntergeladen wurden. Geben Sie zum Beispiel den Dateinamen und den Status der GET-Operation für diese Datei aus:

    // Print the filename and the status of the GET operation.
    getResults.foreach(r => println(s"  ${r.fileName}: ${r.status}"))
    
    Copy

Verwenden von Eingabestreams zum Hoch- und Herunterladen von Daten eines Stagingbereichs

Bemerkung

Dieses Feature wurde in Snowpark 1.4.0 eingeführt.

Um Eingabestreams zum Hochladen von Daten in eine Stagingdatei und zum Herunterladen von Daten aus einer Stagingdatei zu verwenden, verwenden Sie die Methoden uploadStream und downloadStream des Objekts FileOperation:

Verwenden eines Eingabestreams zum Hochladen von Daten in eine Datei in einem Stagingbereich

So laden Sie die Daten aus einem java.io.InputStream-Objekt in eine Datei in einem Stagingbereich hoch:

  1. Überprüfen Sie, ob Sie über die erforderlichen Berechtigungen zum Hochladen von Dateien in den Stagingbereich verfügen.

  2. Verwenden Sie Session.file, um auf das FileOperation-Objekt für die Sitzung zuzugreifen.

  3. Rufen Sie die Methode FileOperation.uploadStream auf.

    Übergeben Sie den vollständigen Pfad zu der Datei im Stagingbereich, in die die Daten geschrieben werden sollen, sowie das Objekt InputStream. Verwenden Sie außerdem das Argument compress, um anzugeben, ob die Daten vor dem Hochladen komprimiert werden sollen oder nicht.

Beispiel:

import java.io.InputStream
...
val compressData = true
val pathToFileOnStage = "@myStage/path/file"
session.file.uploadStream(pathToFileOnStage, new ByteArrayInputStream(fileContent.getBytes()), compressData)
Copy

Verwenden eines Eingabestreams zum Herunterladen von Daten aus einer Datei in einem Stagingbereich

So laden Sie die Daten aus einer Datei in einem Stagingbereich in ein java.io.InputStream-Objekt herunter:

  1. Überprüfen Sie, ob Sie über die erforderlichen Berechtigungen zum Herunterladen von Dateien aus dem Stagingbereich verfügen.

  2. Verwenden Sie Session.file, um auf das FileOperation-Objekt für die Sitzung zuzugreifen.

  3. Rufen Sie die Methode FileOperationdownloadStream auf.

    Übergeben Sie den vollständigen Pfad zu der Datei im Stagingbereich, die die herunterzuladenden Daten enthält. Verwenden Sie das Argument decompress, um anzugeben, ob die Daten in der Datei komprimiert sind oder nicht.

Beispiel:

import java.io.InputStream
...
val isDataCompressed = true
val pathToFileOnStage = "@myStage/path/file"
val is = session.file.downloadStream(pathToFileOnStage, isDataCompressed)
Copy

Einrichten eines DataFrame für Dateien in einem Stagingbereich

In diesem Abschnitt wird erläutert, wie Sie ein DataFrame-Objekt für Dateien einrichten, die sich in einem Snowflake-Stagingbereich befinden. Sobald Sie den DataFrame erstellt haben, können Sie den DataFrame für Folgendes verwenden:

Um einen DataFrame für eine Datei einzurichten, die sich ein einem Snowflake-Stagingbereich befindet, verwenden Sie die Klasse DataFrameReader:

  1. Überprüfen Sie, ob Sie über die folgenden Berechtigungen verfügen:

  2. Rufen Sie die Methode read der Session-Klasse auf, um auf ein DataFrameReader-Objekt zuzugreifen.

  3. Wenn die Dateien im CSV-Format sind, beschreiben Sie die Felder in der Datei. Gehen Sie dabei wie folgt vor:

    1. Erstellen Sie ein StructType-Objekt, das aus einer Sequenz von StructField-Objekten besteht, die die Felder in der Datei beschreiben.

    2. Geben Sie für jedes StructField-Objekt Folgendes an:

      • Name des Feldes

      • Datentyp des Feldes (angegeben als Objekt im com.snowflake.snowpark.types-Paket)

      • Ob das Feld nullwertfähig ist oder nicht

      Beispiel:

      import com.snowflake.snowpark.types._
      
      val schemaForDataFile = StructType(
          Seq(
              StructField("id", StringType, true),
              StructField("name", StringType, true)))
      
      Copy
    3. Rufen Sie die Methode schema im Objekt DataFrameReader-Objekt auf, und übergeben Sie dabei das StructType-Objekt.

      Beispiel:

      var dfReader = session.read.schema(schemaForDataFile)
      
      Copy

      Die Methode schema gibt ein DataFrameReader-Objekt zurück, das so konfiguriert ist, dass es Dateien liest, die die angegebenen Felder enthalten.

      Beachten Sie, dass Sie dies für Dateien in anderen Formaten (z. B. JSON) nicht tun müssen. Bei solchen Dateien behandelt der DataFrameReader die Daten als ein einzelnes Feld vom Typ VARIANT mit dem Feldnamen $1.

  4. Wenn Sie zusätzliche Informationen darüber angeben müssen, wie die Daten gelesen werden sollen (z. B. dass die Daten komprimiert sind oder dass eine CSV-Datei ein Semikolon statt eines Kommas zum Begrenzen von Feldern verwendet), rufen Sie die Methode DataFrameReader.option oder DataFrameReader.options auf.

    Übergeben Sie den Namen und den Wert der Option, die Sie einstellen möchten. Sie können die folgenden Typen von Optionen einstellen:

    Im folgenden Beispiel wird das DataFrameReader-Objekt für die Abfrage von Daten einer CSV-Datei eingerichtet, die nicht komprimiert ist und die ein Semikolon als Feldbegrenzer verwendet.

    dfReader = dfReader.option("field_delimiter", ";").option("COMPRESSION", "NONE")
    
    Copy

    Die Methode option gibt ein DataFrameReader-Objekt zurück, das mit den angegebenen Optionen konfiguriert ist.

    Um mehrere Optionen festzulegen, können Sie entweder verkettete Aufrufe mit der option-Methode verwenden (wie im obigen Beispiel gezeigt) oder die DataFrameReader.options-Methode aufrufen, wobei Sie ein Map-Objekt der Namen und Werte der Optionen übergeben.

  5. Rufen Sie die Methode auf, die dem Format der Dateien entspricht. Sie können eine der folgenden Methoden aufrufen:

    Beim Aufrufen dieser Methoden übergeben Sie den Speicherort des Stagingbereichs, in dem sich die zu lesenden Dateien befinden.

    val df = dfReader.csv("@s3_ts_stage/emails/data_0_0_0.csv")
    
    Copy

    Wenn Sie mehrere Dateien angeben möchten, die mit demselben Präfix beginnen, geben Sie das Präfix nach dem Namen des Stagingbereichs an. So laden Sie beispielsweise Dateien mit dem Präfix csv_ aus dem Stagingbereich @mystage:

    val df = dfReader.csv("@mystage/csv_")
    
    Copy

    Die Methoden, die dem Format einer Datei entsprechen, geben ein CopyableDataFrame-Objekt für diese Datei zurück. CopyableDataFrame erweitert DataFrame und bietet zusätzliche Methoden für die Verarbeitung der Daten in den Stagingdateien.

  6. Rufen Sie eine Aktionsmethode auf, um Folgendes auszuführen:

    Wie bei DataFrames für Tabellen werden die Daten erst dann in den DataFrame abgerufen, wenn Sie eine Aktionsmethode aufrufen.

Laden von Daten aus Dateien in einen DataFrame

Nachdem Sie einen DataFrame für Dateien in einem Stagingbereich eingerichtet haben, können Sie Daten aus den Dateien in den DataFrame laden:

  1. Verwenden Sie die DataFrame-Objektmethoden, um alle Transformationen auszuführen, die für das Dataset benötigt werden (Auswahl bestimmter Felder, Filtern von Zeilen usw.).

    Im folgenden Beispiel wird das Element color aus einer JSON-Datei namens data.json extrahiert, die sich im Stagingbereich mystage befindet:

    val df = session.read.json("@mystage/data.json").select(col("$1")("color"))
    
    Copy

    Wie bereits erläutert, werden Daten in Dateien, die nicht im CSV-Format sind (z. B. JSON), von DataFrameReader als eine einzelne VARIANT-Spalte mit dem Namen $1 behandelt.

  2. Rufen Sie die Methode DataFrame.collect auf, um die Daten zu laden. Beispiel:

    val results = df.collect()
    
    Copy

Kopieren von Daten aus Dateien in eine Tabelle

Nachdem das Einrichten eines DataFrame für Dateien in einem Stagingbereich abgeschlossen ist, können Sie die Methode CopyableDataFrame.copyInto aufrufen, um die Daten in eine Tabelle zu kopieren. Diese Methode führt den Befehl COPY INTO <Tabelle> aus.

Bemerkung

Sie dürfen die collect-Methode erst aufrufen, nachdem Sie copyInto aufgerufen haben. Die Daten aus der Datei dürfen sich erst im DataFrame befinden, nachdem Sie copyInto aufgerufen haben.

Mit dem folgenden Code werden beispielsweise Daten aus der durch myFileStage angegebenen CSV-Datei in die Tabelle mytable geladen. Da sich die Daten in einer CSV-Datei befinden, muss der Code auch die Felder in der Datei beschreiben. Im Beispiel wird dazu die Methode DataFrameReader.schema aufgerufen, und es wird ein StructType-Objekt (csvFileSchema) mit einer Sequenz von StructField-Objekten übergeben, die die Felder beschreiben.

val df = session.read.schema(csvFileSchema).csv(myFileStage)
df.copyInto("mytable")
Copy

Speichern eines DataFrame in Dateien in einem Stagingbereich

Bemerkung

Dieses Feature wurde in Snowpark 1.5.0 eingeführt.

Wenn Sie einen DataFrame in Dateien in einem Stagingbereich speichern müssen, können Sie die DataFrameWriter-Methode aufrufen, die dem Format der Datei entspricht (z. B. die csv-Methode, um in eine CSV-Datei zu schreiben), und dabei den Speichertort im Stagingbereich angeben, an dem die Dateien gespeichert werden sollen. Diese DataFrameWriter-Methoden führen den Befehl COPY INTO <Speicherort> aus.

Bemerkung

Sie dürfen die collect-Methode erst aufrufen, nachdem Sie DataFrameWriter aufgerufen haben. Die Daten aus der Datei dürfen sich erst im DataFrame befinden, nachdem Sie diese Methoden aufgerufen haben.

So speichern Sie den Inhalt eines DataFrame in Dateien in einem Stagingbereich:

  1. Rufen Sie die Methode DataFrame.write auf, um ein DataFrameWriter-Objekt zu erhalten. So erhalten Sie beispielsweise das DataFrameWriter-Objekt für einen DataFrame, der die Tabelle mit dem Namen sample_product_data repräsentiert:

    dfWriter = session.table("sample_product_data").write
    
    Copy
  2. Wenn Sie den Inhalt der Datei überschreiben möchten (falls die Datei existiert), rufen Sie die Methode DataFrameWriter.mode auf und übergeben SaveMode.Overwrite.

    Andernfalls meldet der DataFrameWriter-Objekt standardmäßig einen Fehler, wenn die angegebene Datei im Stagingbereich bereits vorhanden ist.

    Diese mode-Methode gibt das gleiche DataFrameWriter-Objekt zurück, das mit dem angegebenen Modus konfiguriert wurde.

    So geben Sie beispielsweise an, dass DataFrameWriter die im Stagingbereich befindliche Datei überschreiben soll:

    dfWriter = dfWriter.mode(SaveMode.Overwrite)
    
    Copy
  3. Wenn Sie zusätzliche Informationen darüber angeben müssen, wie die Daten gespeichert werden sollen (z. B. dass die Daten komprimiert werden sollen oder dass Sie ein Semikolon zur Abgrenzung von Feldern in einer CSV-Datei verwenden möchten), dann rufen Sie die Methode DataFrameWriter.option oder DataFrameWriter.options auf.

    Übergeben Sie den Namen und den Wert der Option, die Sie einstellen möchten. Sie können die folgenden Typen von Optionen einstellen:

    Beachten Sie, dass Sie die Methode option nicht zum Einstellen der folgenden Optionen verwenden können:

    • Die Option für den Formattyp TYPE.

    • Die Kopieroption OVERWRITE. Um diese Option festzulegen, rufen Sie stattdessen die Methode mode auf (wie im vorherigen Schritt erwähnt).

    Im folgenden Beispiel wird das DataFrameWriter-Objekt so eingerichtet, dass es Daten in einer CSV-Datei in unkomprimierter Form speichert, wobei ein Semikolon (statt eines Kommas) als Trennzeichen verwendet wird.

    dfWriter = dfWriter.option("field_delimiter", ";").option("COMPRESSION", "NONE")
    
    Copy

    Die Methode option gibt ein DataFrameWriter-Objekt zurück, das mit den angegebenen Optionen konfiguriert ist.

    Um mehrere Optionen zu setzen, können Sie verkettete Aufrufe mit der Methode option verwenden (wie im obigen Beispiel gezeigt) oder die Methode DataFrameWriter.options aufrufen und dabei ein Map-Objekt der Namen und Werte der Optionen übergeben.

  4. Um Details über jede gespeicherte Datei zu erhalten, setzen Sie die Kopieroption DETAILED_OUTPUT auf TRUE.

    Standardmäßig hat DETAILED_OUTPUT den Wert FALSE, was bedeutet, dass die Methode eine einzelne Ausgabezeile mit den Feldern "rows_unloaded", "input_bytes" und "output_bytes" zurückgibt.

    Wenn Sie DETAILED_OUTPUT auf TRUE setzen, gibt die Methode für jede gespeicherte Datei eine Zeile der Ausgabe zurück. Jede Zeile enthält die Felder FILE_NAME, FILE_SIZE und ROW_COUNT.

  5. Rufen Sie die Methode auf, die dem Format der Datei entspricht, um die Daten in der Datei zu speichern. Sie können eine der folgenden Methoden aufrufen:

    Beim Aufrufen dieser Methoden übergeben Sie den Speicherort des Stagingbereichs der Datei, in die die Daten geschrieben werden sollen (z. B. @mystage).

    Standardmäßig speichert die Methode die Daten in Dateinamen mit dem Präfix data_ (z. B. @mystage/data_0_0_0.csv). Wenn Sie möchten, dass die Dateien mit einem anderen Präfix benannt werden, geben Sie das Präfix nach dem Stagingbereichsnamen an. Beispiel:

    val writeFileResult = dfWriter.csv("@mystage/saved_data")
    
    Copy

    In diesem Beispiel wird der Inhalt des DataFrame-Objekts in Dateien gespeichert, die mit dem Präfix saved_data beginnen (z. B. @mystage/saved_data_0_0_0.csv).

  6. Überprüfen Sie das zurückgegebene WriteFileResult-Objekt auf Informationen zu der in die Datei geschriebene Datenmenge.

    Über das WriteFileResult-Objekt können Sie auf die vom Befehl COPY INTO <Speicherort> erzeugte Ausgabe zugreifen:

    • Um auf die Zeilen der Ausgabe als Array von Row-Objekten zuzugreifen, rufen Sie die Methode rows auf.

    • Um festzustellen, welche Felder in den Zeilen vorhanden sind, verwenden Sie das schema-Wertelement, das ein StructType-Objekt ist, das die Felder in der Zeile beschreibt.

    So drucken Sie beispielsweise die Namen der Felder und Werte in den Ausgabezeilen aus:

    val writeFileResult = dfWriter.csv("@mystage/saved_data")
    for ((row, index) <- writeFileResult.rows.zipWithIndex) {
      (writeFileResult.schema.fields, writeFileResult.rows(index).toSeq).zipped.foreach {
        (structField, element) => println(s"${structField.name}: $element")
      }
    }
    
    Copy

Im folgende Beispiel wird ein DataFrame-Objekt verwendet, um den Inhalt der Tabelle mit dem Namen car_sales in JSON-Dateien mit dem Präfix saved_data im Stagingbereich @mystage (z. B. @mystage/saved_data_0_0_0.json) zu speichern. Beispielcode:

  • Überschreibt die Datei, wenn die Datei bereits im Stagingbereich vorhanden ist.

  • Gibt eine detaillierte Ausgabe zur Speicheroperation zurück.

  • Speichert die Daten unkomprimiert.

Abschließend druckt der Beispielcode jedes Feld und jeden Wert in den zurückgegebenen Ausgabezeilen aus:

val df = session.table("car_sales")
val writeFileResult = df.write.mode(SaveMode.Overwrite).option("DETAILED_OUTPUT", "TRUE").option("compression", "none").json("@mystage/saved_data")
for ((row, index) <- writeFileResult.rows.zipWithIndex) {
  println(s"Row: $index")
  (writeFileResult.schema.fields, writeFileResult.rows(index).toSeq).zipped.foreach {
    (structField, element) => println(s"${structField.name}: $element")
  }
}
Copy

Verwenden von semistrukturierten Daten

Mit einem DataFrame können Sie semistrukturierte Daten (z. B. JSON-Daten) abfragen und darauf zugreifen. In den nächsten Abschnitten wird erläutert, wie semistrukturierte Daten in einem DataFrame verwendet werden:

Bemerkung

Die Beispiele in diesen Abschnitten verwenden die Beispieldaten aus In Beispielen verwendete Beispieldaten.

Durchsuchen semistrukturierter Daten

Um auf ein bestimmtes Feld oder Element in semistrukturierten Daten zu verweisen, verwenden Sie die folgenden Methoden des Column-Objekts:

  • Verwenden Sie Column.apply(„<Feldname>“), um ein Column-Objekt für ein Feld in einem OBJECT (oder ein VARIANT, das ein OBJECT enthält) zurückzugeben.

  • Verwenden Sie Column.apply(<Index>), um ein Column-Objekt für ein Element in einem ARRAY (oder ein VARIANT, das ein ARRAY enthält) zurückzugeben.

Bemerkung

Wenn der Feldname oder die Elemente im Pfad unregelmäßig sind und die Verwendung der Column.apply-Methoden erschweren, können Sie als Alternative die Funktionen get, get_ignore_case oder get_path verwenden.

Wie unter Verwenden der apply-Methode zum Verweisen auf eine Spalte erwähnt, können Sie den Methodennamen apply weglassen:

col("column_name")("field_name")
col("column_name")(index)
Copy

Im folgenden Code wird beispielsweise das Feld dealership in Objekten der Spalte src der Beispieldaten ausgewählt:

val df = session.table("car_sales")
df.select(col("src")("dealership")).show()
Copy

Der Code gibt Folgendes aus:

----------------------------
|"""SRC""['DEALERSHIP']"   |
----------------------------
|"Valley View Auto Sales"  |
|"Tindel Toyota"           |
----------------------------
Copy

Bemerkung

Die Werte im DataFrame sind in doppelte Anführungszeichen eingeschlossen, da diese Werte als Zeichenfolgenliterale zurückgegeben werden. Weitere Informationen zum Umwandeln dieser Werte in einen bestimmten Typ finden Sie unter Explizites Umwandeln von Werten semistrukturierter Daten.

Sie können auch Methodenaufrufe verketten, um einen Pfad zu einem bestimmten Feld oder Element zu durchlaufen.

Im folgende Code wird zum Beispiel das Feld name im Objekt salesperson ausgewählt:

val df = session.table("car_sales")
df.select(col("src")("salesperson")("name")).show()
Copy

Der Code gibt Folgendes aus:

------------------------------------
|"""SRC""['SALESPERSON']['NAME']"  |
------------------------------------
|"Frank Beasley"                   |
|"Greg Northrup"                   |
------------------------------------
Copy

In einem weiteren Codebeispiel wird das erste Element des Feldes vehicle ausgewählt, das eine Reihe von Fahrzeugen enthält. Im Beispiel wird auch das Feld price aus dem ersten Element ausgewählt.

val df = session.table("car_sales")
df.select(col("src")("vehicle")(0)).show()
df.select(col("src")("vehicle")(0)("price")).show()
Copy

Der Code gibt Folgendes aus:

---------------------------
|"""SRC""['VEHICLE'][0]"  |
---------------------------
|{                        |
|  "extras": [            |
|    "ext warranty",      |
|    "paint protection"   |
|  ],                     |
|  "make": "Honda",       |
|  "model": "Civic",      |
|  "price": "20275",      |
|  "year": "2017"         |
|}                        |
|{                        |
|  "extras": [            |
|    "ext warranty",      |
|    "rust proofing",     |
|    "fabric protection"  |
|  ],                     |
|  "make": "Toyota",      |
|  "model": "Camry",      |
|  "price": "23500",      |
|  "year": "2017"         |
|}                        |
---------------------------

------------------------------------
|"""SRC""['VEHICLE'][0]['PRICE']"  |
------------------------------------
|"20275"                           |
|"23500"                           |
------------------------------------
Copy

Als Alternative zur Methode apply können Sie die Funktionen get, get_ignore_case oder get_path verwenden, wenn der Feldname oder die Elemente im Pfad unregelmäßig sind und die Verwendung der Column.apply-Methoden erschweren.

Die folgenden Codezeilen geben zum Beispiel beide den Wert eines bestimmten Feldes in einem Objekt aus:

df.select(get(col("src"), lit("dealership"))).show()
df.select(col("src")("dealership")).show()
Copy

In ähnlicher Weise geben die folgenden Codezeilen beide den Wert eines Feldes für einen bestimmten Pfad in einem Objekt aus:

df.select(get_path(col("src"), lit("vehicle[0].make"))).show()
df.select(col("src")("vehicle")(0)("make")).show()
Copy

Explizites Umwandeln von Werten semistrukturierter Daten

Standardmäßig werden die Werte von Feldern und Elementen als Zeichenfolgenliterale (einschließlich der doppelten Anführungszeichen) zurückgegeben, wie in den obigen Beispielen gezeigt.

Zur Vermeidung unerwarteter Ergebnisse rufen Sie die Methode cast auf, um den Wert in einen ganz bestimmten Typ umzuwandeln. Mit dem folgenden Code werden zum Beispiel die Werte ohne und mit Umwandlung ausgegeben:

// Import the objects for the data types, including StringType.
import com.snowflake.snowpark.types._
...
val df = session.table("car_sales")
df.select(col("src")("salesperson")("id")).show()
df.select(col("src")("salesperson")("id").cast(StringType)).show()
Copy

Der Code gibt Folgendes aus:

----------------------------------
|"""SRC""['SALESPERSON']['ID']"  |
----------------------------------
|"55"                            |
|"274"                           |
----------------------------------

---------------------------------------------------
|"CAST (""SRC""['SALESPERSON']['ID'] AS STRING)"  |
---------------------------------------------------
|55                                               |
|274                                              |
---------------------------------------------------
Copy

Vereinfachen eines Arrays von Objekten zu Zeilen

Wenn Sie semistrukturierte Daten für einen DataFrame vereinfachen müssen (um z. B. für jedes Objekt in einem Array eine Zeile zu erstellen), rufen Sie die Methode DataFrame.flatten auf. Diese Methode entspricht der SQL-Funktion FLATTEN. Wenn Sie einen Pfad zu einem Objekt oder Array übergeben, gibt die Methode einen DataFrame zurück, der eine Zeile für jedes Feld oder Element in dem Objekt oder Array enthält.

In den Beispieldaten ist beispielsweise src:customer ein Array von Objekten, die Informationen über einen Kunden enthalten. Jedes Objekt enthält die Felder name und address.

Wenn Sie diesen Pfad an die Funktion flatten übergeben:

val df = session.table("car_sales")
df.flatten(col("src")("customer")).show()
Copy

Dann gibt diese Methode einen DataFrame zurück:

----------------------------------------------------------------------------------------------------------------------------------------------------------
|"SRC"                                      |"SEQ"  |"KEY"  |"PATH"  |"INDEX"  |"VALUE"                            |"THIS"                               |
----------------------------------------------------------------------------------------------------------------------------------------------------------
|{                                          |1      |NULL   |[0]     |0        |{                                  |[                                    |
|  "customer": [                            |       |       |        |         |  "address": "San Francisco, CA",  |  {                                  |
|    {                                      |       |       |        |         |  "name": "Joyce Ridgely",         |    "address": "San Francisco, CA",  |
|      "address": "San Francisco, CA",      |       |       |        |         |  "phone": "16504378889"           |    "name": "Joyce Ridgely",         |
|      "name": "Joyce Ridgely",             |       |       |        |         |}                                  |    "phone": "16504378889"           |
|      "phone": "16504378889"               |       |       |        |         |                                   |  }                                  |
|    }                                      |       |       |        |         |                                   |]                                    |
|  ],                                       |       |       |        |         |                                   |                                     |
|  "date": "2017-04-28",                    |       |       |        |         |                                   |                                     |
|  "dealership": "Valley View Auto Sales",  |       |       |        |         |                                   |                                     |
|  "salesperson": {                         |       |       |        |         |                                   |                                     |
|    "id": "55",                            |       |       |        |         |                                   |                                     |
|    "name": "Frank Beasley"                |       |       |        |         |                                   |                                     |
|  },                                       |       |       |        |         |                                   |                                     |
|  "vehicle": [                             |       |       |        |         |                                   |                                     |
|    {                                      |       |       |        |         |                                   |                                     |
|      "extras": [                          |       |       |        |         |                                   |                                     |
|        "ext warranty",                    |       |       |        |         |                                   |                                     |
|        "paint protection"                 |       |       |        |         |                                   |                                     |
|      ],                                   |       |       |        |         |                                   |                                     |
|      "make": "Honda",                     |       |       |        |         |                                   |                                     |
|      "model": "Civic",                    |       |       |        |         |                                   |                                     |
|      "price": "20275",                    |       |       |        |         |                                   |                                     |
|      "year": "2017"                       |       |       |        |         |                                   |                                     |
|    }                                      |       |       |        |         |                                   |                                     |
|  ]                                        |       |       |        |         |                                   |                                     |
|}                                          |       |       |        |         |                                   |                                     |
|{                                          |2      |NULL   |[0]     |0        |{                                  |[                                    |
|  "customer": [                            |       |       |        |         |  "address": "New York, NY",       |  {                                  |
|    {                                      |       |       |        |         |  "name": "Bradley Greenbloom",    |    "address": "New York, NY",       |
|      "address": "New York, NY",           |       |       |        |         |  "phone": "12127593751"           |    "name": "Bradley Greenbloom",    |
|      "name": "Bradley Greenbloom",        |       |       |        |         |}                                  |    "phone": "12127593751"           |
|      "phone": "12127593751"               |       |       |        |         |                                   |  }                                  |
|    }                                      |       |       |        |         |                                   |]                                    |
|  ],                                       |       |       |        |         |                                   |                                     |
|  "date": "2017-04-28",                    |       |       |        |         |                                   |                                     |
|  "dealership": "Tindel Toyota",           |       |       |        |         |                                   |                                     |
|  "salesperson": {                         |       |       |        |         |                                   |                                     |
|    "id": "274",                           |       |       |        |         |                                   |                                     |
|    "name": "Greg Northrup"                |       |       |        |         |                                   |                                     |
|  },                                       |       |       |        |         |                                   |                                     |
|  "vehicle": [                             |       |       |        |         |                                   |                                     |
|    {                                      |       |       |        |         |                                   |                                     |
|      "extras": [                          |       |       |        |         |                                   |                                     |
|        "ext warranty",                    |       |       |        |         |                                   |                                     |
|        "rust proofing",                   |       |       |        |         |                                   |                                     |
|        "fabric protection"                |       |       |        |         |                                   |                                     |
|      ],                                   |       |       |        |         |                                   |                                     |
|      "make": "Toyota",                    |       |       |        |         |                                   |                                     |
|      "model": "Camry",                    |       |       |        |         |                                   |                                     |
|      "price": "23500",                    |       |       |        |         |                                   |                                     |
|      "year": "2017"                       |       |       |        |         |                                   |                                     |
|    }                                      |       |       |        |         |                                   |                                     |
|  ]                                        |       |       |        |         |                                   |                                     |
|}                                          |       |       |        |         |                                   |                                     |
----------------------------------------------------------------------------------------------------------------------------------------------------------
Copy

Von diesem DataFrame aus können Sie die Felder name und address jedes Objekts im Feld VALUE auswählen:

df.flatten(col("src")("customer")).select(col("value")("name"), col("value")("address")).show()
Copy
-------------------------------------------------
|"""VALUE""['NAME']"   |"""VALUE""['ADDRESS']"  |
-------------------------------------------------
|"Joyce Ridgely"       |"San Francisco, CA"     |
|"Bradley Greenbloom"  |"New York, NY"          |
-------------------------------------------------
Copy

Der folgende Code ergänzt das vorherige Beispiel, indem die Werte in einen bestimmten Typ umgewandelt und die Namen der Spalten geändert werden:

df.flatten(col("src")("customer")).select(col("value")("name").cast(StringType).as("Customer Name"), col("value")("address").cast(StringType).as("Customer Address")).show()
Copy
-------------------------------------------
|"Customer Name"     |"Customer Address"  |
-------------------------------------------
|Joyce Ridgely       |San Francisco, CA   |
|Bradley Greenbloom  |New York, NY        |
-------------------------------------------
Copy

Ausführen von SQL-Anweisungen

Um eine von Ihnen angegebene SQL-Anweisung auszuführen, rufen Sie die Methode sql in der Klasse Session auf und übergeben die auszuführende Anweisung. Diese Methode gibt einen DataFrame zurück.

Beachten Sie, dass die SQL-Anweisung erst ausgeführt wird, wenn Sie eine Aktionsmethode aufrufen.

// Get the list of the files in a stage.
// The collect() method causes this SQL statement to be executed.
val dfStageFiles = session.sql("ls @myStage")
val files = dfStageFiles.collect()
files.foreach(println)

// Resume the operation of a warehouse.
// Note that you must call the collect method in order to execute
// the SQL statement.
session.sql("alter warehouse if exists myWarehouse resume if suspended").collect()

val tableDf = session.table("table").select(col("a"), col("b"))
// Get the count of rows from the table.
val numRows = tableDf.count()
println("Count: " + numRows);
Copy

Wenn Sie Methoden zur Transformation des DataFrame aufrufen möchten (z. B. Filtern, Auswahl usw.), beachten Sie, dass diese Methoden nur funktionieren, wenn die zugrunde liegende SQL-Anweisung eine SELECT-Anweisung ist. Die Transformationsmethoden werden für andere Arten von SQL-Anweisungen nicht unterstützt.

val df = session.sql("select id, category_id, name from sample_product_data where id > 10")
// Because the underlying SQL statement for the DataFrame is a SELECT statement,
// you can call the filter method to transform this DataFrame.
val results = df.filter(col("category_id") < 10).select(col("id")).collect()
results.foreach(println)

// In this example, the underlying SQL statement is not a SELECT statement.
val dfStageFiles = session.sql("ls @myStage")
// Calling the filter method results in an error.
dfStageFiles.filter(...)
Copy