Verwenden von DataFrames in Snowpark Scala¶
In Snowpark erfolgt die Abfrage und Verarbeitung von Daten hauptsächlich über einen DataFrame. Unter diesem Thema wird erklärt, wie Sie DataFrames verwenden.
Unter diesem Thema:
Um Daten abrufen und bearbeiten zu können, verwenden Sie die Klasse DataFrame. Ein DataFrame repräsentiert ein relationales Dataset, das im Lazy-Modus ausgewertet wird: Es wird nur ausgeführt, wenn eine bestimmte Aktion ausgelöst wird. In gewissem Sinne ist ein DataFrame wie eine Abfrage, die ausgewertet werden muss, um Daten abzurufen.
So rufen Sie Daten in einem DataFrame ab:
Konstruieren Sie ein DataFrame, das die Datenquelle für das Dataset angibt.
Sie können z. B. ein DataFrame erstellen, das Daten aus einer Tabelle, Daten einer externen CSV-Datei oder die Ausführung einer SQL-Anweisung enthält.
Geben Sie an, wie das Dataset in den DataFrame transformiert werden soll.
Sie können z. B. angeben, welche Spalten ausgewählt werden sollen, wie die Zeilen gefiltert werden sollen, wie die Ergebnisse sortiert und gruppiert werden sollen usw.
Führen Sie die Anweisung aus, um die Daten in den DataFrame abzurufen.
Um die Daten in den DataFrame abzurufen, müssen Sie eine Methode aufrufen, die eine Aktion ausführt (z. B. die
collect()
-Methode).
In den nächsten Abschnitten werden diese Schritte näher erläutert.
Einrichten der Beispiele für diesen Abschnitt¶
Einige der Beispiele in diesem Abschnitt verwenden einen DataFrame, um eine Tabelle namens sample_product_data
abzufragen. Wenn Sie diese Beispiele ausführen möchten, können Sie diese Tabelle erstellen und mit einigen Daten füllen, indem Sie die folgenden SQL-Anweisungen ausführen:
CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT);
INSERT INTO sample_product_data VALUES
(1, 0, 5, 'Product 1', 'prod-1', 1, 10),
(2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20),
(3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30),
(4, 0, 10, 'Product 2', 'prod-2', 2, 40),
(5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50),
(6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60),
(7, 0, 20, 'Product 3', 'prod-3', 3, 70),
(8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80),
(9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90),
(10, 0, 50, 'Product 4', 'prod-4', 4, 100),
(11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100),
(12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100);
Um zu überprüfen, ob die Tabelle erstellt wurde, führen Sie folgende Anweisung aus:
SELECT * FROM sample_product_data;
Erstellen eines DataFrame¶
Um einen DataFrame zu erstellen, können Sie Methoden der Session
-Klasse verwenden. Jede der folgenden Methoden erstellt einen DataFrame aus einem anderen Typ von Datenquelle:
Um einen DataFrame aus Daten einer Tabelle, einer Ansicht oder eines Streams zu erstellen, rufen Sie die
table
-Methode auf:// Create a DataFrame from the data in the "sample_product_data" table. val dfTable = session.table("sample_product_data") // To print out the first 10 rows, call: // dfTable.show()
Bemerkung
Die Methode
session.table
gibt einUpdatable
-Objekt zurück.Updatable
erweitertDataFrame
und bietet zusätzliche Methoden für die Verwendung der Daten in der Tabelle (z. B. Methoden zum Aktualisieren und Löschen von Daten). Weitere Informationen dazu finden Sie unter Aktualisieren, Löschen und Zusammenführen von Zeilen in einer Tabelle.Um einen DataFrame aus einer Sequenz von Werten zu erstellen, rufen Sie die
createDataFrame
-Methode auf:// Create a DataFrame containing a sequence of values. // In the DataFrame, name the columns "i" and "s". val dfSeq = session.createDataFrame(Seq((1, "one"), (2, "two"))).toDF("i", "s")
Bemerkung
Von Snowflake reservierte Wörter können bei der Konstruktion eines DataFrame nicht als Spaltennamen verwendet werden. Eine Liste der reservierten Wörter finden Sie unter Reservierte und begrenzte Schlüsselwörter.
Um einen DataFrame zu erstellen, der einen Wertebereich enthält, rufen Sie die
range
-Methode auf:// Create a DataFrame from a range val dfRange = session.range(1, 10, 2)
Um einen DataFrame zu erstellen, der die Daten aus einer in einem Stagingbereich befindlichen Datei enthält, verwenden Sie
read
zum Abrufen einesDataFrameReader
-Objekts. ImDataFrameReader
-Objekt rufen Sie die Methode auf, die dem Format der Daten in der Datei entspricht:// Create a DataFrame from data in a stage. val dfJson = session.read.json("@mystage2/data1.json")
Um einen DataFrame zu erstellen, der die Ergebnisse einer SQL-Abfrage enthält, rufen Sie die
sql
-Methode auf:// Create a DataFrame from a SQL query val dfSql = session.sql("SELECT name from products")
Hinweis: Obwohl Sie mit dieser Methode SELECT-Anweisungen ausführen können, die Daten aus Tabellen und Stagingdateien abrufen, sollten Sie dafür eher die Methoden
table
undread
verwenden. Methoden wietable
undread
bieten eine bessere Syntax- und Fehlerhervorhebung sowie intelligente Codevervollständigung in Entwicklungstools.
Festlegen der Transformation des Datasets¶
Um festzulegen, welche Spalten ausgewählt und wie die Ergebnisse gefiltert, sortiert, gruppiert usw. werden sollen, rufen Sie die DataFrame-Methoden auf, die das Dataset transformieren. Um die Spalten in diesen Methoden zu identifizieren, verwenden Sie die Funktion col
oder einen Ausdruck, der eine Spalte ergibt. (Siehe Angeben von Spalten und Ausdrücken.)
Beispiel:
Zur Angabe der Zeilen, die zurückgegeben werden sollen, rufen Sie die
filter
-Methode auf:// Import the col function from the functions object. import com.snowflake.snowpark.functions._ // Create a DataFrame for the rows with the ID 1 // in the "sample_product_data" table. // // This example uses the === operator of the Column object to perform an // equality check. val df = session.table("sample_product_data").filter(col("id") === 1) df.show()
Zur Angabe der Spalten, die ausgewählt werden sollen, rufen Sie die
select
-Methode auf:// Import the col function from the functions object. import com.snowflake.snowpark.functions._ // Create a DataFrame that contains the id, name, and serial_number // columns in te "sample_product_data" table. val df = session.table("sample_product_data").select(col("id"), col("name"), col("serial_number")) df.show()
Jede Methode gibt ein neues DataFrame-Objekt zurück, das transformiert wurde. (Die Methode hat keine Auswirkungen auf das ursprüngliche DataFrame-Objekt.) Wenn Sie also mehrere Transformationen anwenden möchten, können Sie Methodenaufrufe verketten, sodass jede nachfolgende Transformationsmethode auf dem neuen DataFrame-Objekt ausgeführt wird, das vom vorherigen Methodenaufruf zurückgegeben wurde.
Beachten Sie, dass mit diesen Transformationsmethoden keine Daten aus der Snowflake-Datenbank abgerufen werden können. (Die unter Ausführen einer Aktion zum Auswerten eines DataFrame beschriebenen Aktionsmethoden führen den Datenabruf durch.) Die Transformationsmethoden geben einfach an, wie die SQL-Anweisung aufgebaut sein soll.
Angeben von Spalten und Ausdrücken¶
Wenn Sie diese Transformationsmethoden aufrufen, müssen Sie möglicherweise Spalten angeben oder Ausdrücke, die Spalten verwenden. Wenn Sie z. B. die select
-Methode aufrufen, müssen Sie die Spalten angeben, die ausgewählt werden sollen.
Um auf eine Spalte zu verweisen, erstellen Sie ein Column-Objekt durch Aufruf der col-Funktion im com.snowflake.snowpark.functions
-Objekt.
// Import the col function from the functions object.
import com.snowflake.snowpark.functions._
val dfProductInfo = session.table("sample_product_data").select(col("id"), col("name"))
dfProductInfo.show()
Bemerkung
Weitere Informationen zum Erstellen eines Column
-Objekts für ein Literal finden Sie unter Verwenden von Literalen als Spaltenobjekte.
Wenn Sie einen Filter, eine Projektion, eine Verknüpfungsbedingung usw. angeben müssen, können Sie Column
-Objekte in einem Ausdruck verwenden. Beispiel:
Sie können
Column
-Objekte mit der Methodefilter
verwenden, um eine Filterbedingung anzugeben:// Specify the equivalent of "WHERE id = 20" // in an SQL SELECT statement. df.filter(col("id") === 20)
// Specify the equivalent of "WHERE a + b < 10" // in an SQL SELECT statement. df.filter((col("a") + col("b")) < 10)
Sie können
Column
-Objekte mit der Methodeselect
verwenden, um einen Alias zu definieren:// Specify the equivalent of "SELECT b * 10 AS c" // in an SQL SELECT statement. df.select((col("b") * 10) as "c")
Sie können
Column
-Objekte mit der Methodejoin
verwenden, um eine Verknüpfungsbedingung zu definieren:// Specify the equivalent of "X JOIN Y on X.a_in_X = Y.b_in_Y" // in an SQL SELECT statement. dfX.join(dfY, col("a_in_X") === col("b_in_Y"))
Verweisen auf Spalten in verschiedenen DataFrames¶
Wenn Sie auf Spalten in zwei verschiedenen DataFrame-Objekten verweisen müssen, die denselben Namen haben (z. B. beim Verknüpfen von DataFrames über diese Spalte), können Sie die DataFrame.col
-Methode in einem der beiden DataFrame-Objekte verwenden, um sich auf eine Spalte in diesem Objekt zu beziehen (z. B. df1.col("name")
und df2.col("name")
).
Im folgenden Beispiel wird gezeigt, wie Sie mit der DataFrame.col
-Methode auf eine Spalte in einem bestimmten DataFrame verweisen. Im Beispiel werden zwei DataFrame-Objekte verknüpft, die beide eine Spalte mit dem Namen key
haben. Dabei wird im Beispiel die Column.as
-Methode verwendet, um die Namen der Spalten im neu erstellten DataFrame zu ändern.
// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.col method to refer to the columns used in the join.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("key") === dfRhs.col("key")).select(dfLhs.col("value").as("L"), dfRhs.col("value").as("R"))
Verwenden der apply
-Methode zum Verweisen auf eine Spalte¶
Alternativ zur Methode DataFrame.col
können Sie die Methode DataFrame.apply
verwenden, um auf eine Spalte in einem bestimmten DataFrame zu verweisen. Wie die Methode DataFrame.col
akzeptiert auch die Methode DataFrame.apply
einen Spaltennamen als Eingabe und gibt ein Column
-Objekt zurück.
Beachten Sie, dass bei einem Objekt, das eine apply
-Methode in Scala hat, die apply
-Methode verwendet werden kann, indem das Objekt wie eine Funktion aufgerufen wird. Um z. B. df.apply("column_name")
aufzurufen, können Sie einfach df("column_name")
schreiben. Die folgenden Aufrufe sind gleichwertig:
df.col("<Spaltenname>")
df.apply("<Spaltenname>")
df("<Spaltenname>")
Das folgende Beispiel entspricht dem vorherigen Beispiel, verwendet aber die Methode DataFrame.apply
, um auf die Spalten in einer Verknüpfungsoperation zu verweisen:
// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.apply method to refer to the columns used in the join.
// Note that dfLhs("key") is shorthand for dfLhs.apply("key").
val dfJoined = dfLhs.join(dfRhs, dfLhs("key") === dfRhs("key")).select(dfLhs("value").as("L"), dfRhs("value").as("R"))
Verwenden von Kurzschrift für Spaltenobjekte¶
Alternativ zur Funktion col
können Sie auf eine Spalte auch wie folgt verweisen:
Verwenden Sie vor dem Spaltennamen in Anführungszeichen ein Dollarzeichen (
$"column_name"
).Verwenden Sie vor dem nicht in Anführungszeichen gesetzten Spaltennamen ein Hochkomma (ein einfaches Anführungszeichen) (
'column_name
).
Führen Sie dazu nach dem Erstellen eines Session
-Objekts einen Import der Namen aus dem implicits
-Objekt durch:
val session = Session.builder.configFile("/path/to/properties").create
// Import this after you create the session.
import session.implicits._
// Use the $ (dollar sign) shorthand.
val df = session.table("T").filter($"id" === 10).filter(($"a" + $"b") < 10)
// Use ' (apostrophe) shorthand.
val df = session.table("T").filter('id === 10).filter(('a + 'b) < 10).select('b * 10)
Verwenden von doppelten Anführungszeichen um Objektbezeichner (Tabellennamen, Spaltennamen usw.)¶
Die Namen von Datenbanken, Schemas, Tabellen und Stagingbereichen, die Sie angeben, müssen den Snowflake-Anforderungen an Bezeichner entsprechen. Wenn Sie einen Namen angeben, geht Snowflake davon aus, dass der Name in Großbuchstaben geschrieben ist. Daher sind z. B. die folgenden Aufrufe gleichwertig:
// The following calls are equivalent:
df.select(col("id123"))
df.select(col("ID123"))
Wenn der Name nicht den Anforderungen für Bezeichner entspricht, müssen Sie den Namen in doppelte Anführungszeichen ("
) setzen. Verwenden Sie einen Backslash (\
), um die doppelten Anführungszeichen innerhalb von Scala-Zeichenfolgenliteralen zu maskieren. Der folgende Tabellenname beginnt z. B. nicht mit einem Buchstaben oder einem Unterstrich, sodass Sie den Namen in doppelte Anführungszeichen setzen müssen:
val df = session.table("\"10tablename\"")
Beachten Sie, dass Sie bei der Angabe des Namens einer Spalte keine doppelten Anführungszeichen um den Namen verwenden müssen. Die Snowpark-Bibliothek schließt Spaltennamen automatisch in doppelte Anführungszeichen ein, wenn der Name nicht den Bezeichneranforderungen entspricht:
// The following calls are equivalent:
df.select(col("3rdID"))
df.select(col("\"3rdID\""))
// The following calls are equivalent:
df.select(col("id with space"))
df.select(col("\"id with space\""))
Wenn Sie bereits doppelte Anführungszeichen um einen Spaltennamen hinzugefügt haben, fügt die Bibliothek keine weiteren doppelten Anführungszeichen um den Namen ein.
In einigen Fällen kann der Spaltenname doppelte Anführungszeichen enthalten:
describe table quoted;
+------------------------+ ...
| name | ...
|------------------------+ ...
| name_with_"air"_quotes | ...
| "column_name_quoted" | ...
+------------------------+ ...
Wie unter Anforderungen an Bezeichner erläutert, müssen Sie für jedes doppelte Anführungszeichen innerhalb eines in Anführungszeichen geschriebenen Bezeichners zwei doppelte Anführungszeichen verwenden (z. B. "name_with_""air""_quotes"
und """column_name_quoted"""
):
val dfTable = session.table("quoted")
dfTable.select("\"name_with_\"\"air\"\"_quotes\"").show()
dfTable.select("\"\"\"column_name_quoted\"\"\"").show()
Wenn ein Bezeichner in doppelte Anführungszeichen eingeschlossen ist (unabhängig davon, ob Sie die Anführungszeichen explizit hinzugefügt haben oder die Bibliothek die Anführungszeichen für Sie hinzugefügt hat), berücksichtigt Snowflake die Groß-/Kleinschreibung des Bezeichners:
// The following calls are NOT equivalent!
// The Snowpark library adds double quotes around the column name,
// which makes Snowflake treat the column name as case-sensitive.
df.select(col("id with space"))
df.select(col("ID WITH SPACE"))
Verwenden von Literalen als Spaltenobjekte¶
Um ein Literal in einer Methode zu verwenden, die ein Column
-Objekt übergibt, erstellen Sie ein Column
-Objekt für das Literal, indem Sie das Literal an die lit
-Funktion im com.snowflake.snowpark.functions
-Objekt übergeben. Beispiel:
// Import for the lit and col functions.
import com.snowflake.snowpark.functions._
// Show the first 10 rows in which num_items is greater than 5.
// Use `lit(5)` to create a Column object for the literal 5.
df.filter(col("num_items").gt(lit(5))).show()
Wenn das Literal in Scala ein Gleitkomma- oder Double-Wert ist (z. B. 0.05
wird standardmäßig als Double verarbeitet), generiert die Snowpark-Bibliothek SQL-Code, der den Wert implizit in den entsprechenden Snowpark-Datentyp umwandelt (z. B. 0.05::DOUBLE
). Dies kann zu einem Näherungswert führen, der von der genau angegebenen Zahl abweicht.
Der Code im folgenden Beispiel zeigt keine übereinstimmenden Zeilen an, obwohl der Filter (der für Werte größer oder gleich 0.05
gilt) mit den Zeilen in DataFrame übereinstimmen sollte:
// Create a DataFrame that contains the value 0.05.
val df = session.sql("select 0.05 :: Numeric(5, 2) as a")
// Applying this filter results in no matching rows in the DataFrame.
df.filter(col("a") <= lit(0.06) - lit(0.01)).show()
Das Problem ist, dass lit(0.06)
und lit(0.01)
Näherungswerte für 0.06
und 0.01
liefern und nicht die genauen Werte.
Um dieses Problem zu vermeiden, können Sie eine der folgenden Methoden anwenden:
Option 1: Wandeln Sie das Literal in den Snowpark-Typ um, den Sie verwenden möchten. Verwenden Sie zum Beispiel einen Datentyp NUMBER mit einer Gesamtstellenzahl (precision) von 5 und einer Dezimalstellenzahl (scale) von 2:
df.filter(col("a") <= lit(0.06).cast(new DecimalType(5, 2)) - lit(0.01).cast(new DecimalType(5, 2))).show()
Option 2: Wandeln Sie den Wert in den gewünschten Typ umwandeln, bevor der Wert an die Funktion
lit
übergeben wird. So können Sie zum Beispiel den Typ BigDecimal verwenden:df.filter(col("a") <= lit(BigDecimal(0.06)) - lit(BigDecimal(0.01))).show()
Umwandeln eines Spaltenobjekts in einen bestimmten Typ¶
Um ein Column
-Objekt in einen bestimmten Typ umzuwandeln, rufen Sie die Column.cast-Methode auf, und übergeben Sie ein Typobjekt aus dem com.snowflake.snowpark.types package. So können Sie zum Beispiel ein Literal als NUMBER mit einer Gesamtstellenzahl (precision) von 5 und einer Dezimalstellenzahl (scale) von 2 darstellen:
// Import for the lit function.
import com.snowflake.snowpark.functions._
// Import for the DecimalType class..
import com.snowflake.snowpark.types._
val decimalValue = lit(0.05).cast(new DecimalType(5,2))
Verketten von Methodenaufrufen¶
Da jede Methode, die ein DataFrame-Objekt transformiert, ein neues DataFrame-Objekt zurückgibt, auf das die Transformation angewendet wurde, können Sie Methodenaufrufe verketten, um ein neues DataFrame zu erstellen, das auf eine andere Weise transformiert wird.
Im folgenden Beispiel wird ein DataFrame zurückgegeben, der für folgende Aufgaben konfiguriert ist:
Abfragen der Tabelle
sample_product_data
Rückgabe der Zeile mit
id = 1
Auswahl der Spalten
name
undserial_number
val dfProductInfo = session.table("sample_product_data").filter(col("id") === 1).select(col("name"), col("serial_number"))
dfProductInfo.show()
In diesem Beispiel:
session.table("sample_product_data")
gibt einen DataFrame für die Tabellesample_product_data
zurück.Der DataFrame enthält zwar noch nicht die Daten aus der Tabelle, aber das Objekt enthält die Definitionen der Spalten in der Tabelle.
filter(col("id") === 1)
gibt ein DataFrame für die Tabellesample_product_data
zurück, die so eingerichtet ist, dass sie die Zeile mitid = 1
zurückgibt.Beachten Sie erneut, dass der DataFrame noch nicht die übereinstimmende Zeile aus der Tabelle enthält. Die übereinstimmende Zeile wird erst abgerufen, wenn Sie eine Aktionsmethode aufrufen.
select(col("name"), col("serial_number"))
gibt ein DataFrame zurück, das die Spaltenname
undserial_number
für die Zeile in der Tabellesample_product_data
enthält, dieid = 1
hat.
Beachten Sie beim Verketten von Methodenaufrufen, dass die Reihenfolge der Aufrufe wichtig ist. Jeder Methodenaufruf gibt ein DataFrame zurück, das transformiert wurde. Stellen Sie sicher, dass nachfolgende Aufrufe das transformierte DataFrame verwenden.
Im folgenden Beispiel gibt die Methode select
ein DataFrame zurück, das nur zwei Spalten enthält: name
und serial_number
. Der Aufruf der filter
-Methode auf diesem DataFrame schlägt fehl, weil dieser die Spalte id
verwendet, die im transformierten DataFrame nicht vorhanden ist.
// This fails with the error "invalid identifier 'ID'."
val dfProductInfo = session.table("sample_product_data").select(col("name"), col("serial_number")).filter(col("id") === 1)
Im Gegensatz dazu wird der folgende Code erfolgreich ausgeführt, weil die filter()
-Methode auf einem DataFrame aufgerufen wird, der alle Spalten der Tabelle sample_product_data
enthält (einschließlich der Spalte id
):
// This succeeds because the DataFrame returned by the table() method
// includes the "id" column.
val dfProductInfo = session.table("sample_product_data").filter(col("id") === 1).select(col("name"), col("serial_number"))
dfProductInfo.show()
Beachten Sie, dass Sie die Methodenaufrufe select
und filter
möglicherweise in einer anderen Reihenfolge ausführen müssen als bei Verwendung der entsprechenden Schlüsselwörter (SELECT und WHERE) in einer SQL-Anweisung.
Begrenzen der Anzahl von Zeilen in einem DataFrame¶
Um die Anzahl der Zeilen in einem DataFrame zu begrenzen, können Sie die Transformationsmethode DataFrame.limit verwenden.
Die Snowpark-API bietet auch folgende Aktionsmethoden zum Abrufen und Ausdrucken einer begrenzten Anzahl von Zeilen:
Aktionsmethode DataFrame.first (zum Ausführen von Abfragen und zum Zurückgeben der ersten
n
Zeilen)Aktionsmethode DataFrame.show (zum Ausführen von Abfragen und zum Drucken der ersten
n
Zeilen)
Diese Methoden fügen der auszuführenden SQL-Anweisung im Endeffekt eine LIMIT-Klausel hinzu.
Wie in den Nutzungshinweisen zu LIMIT erläutert, sind die Ergebnisse nicht deterministisch, es sei denn, Sie geben in Verbindung mit LIMIT eine Sortierreihenfolge (ORDER BY) an.
Um die ORDER BY-Klausel zusammen mit der LIMIT-Klausel zu verwenden (z. B. damit ORDER BY nicht in einer separaten Unterabfrage steht), müssen Sie die Methode aufrufen, die die Ergebnisse auf den von der sort
-Methode zurückgegebenen DataFrame beschränkt.
Wenn Sie beispielsweise Methodenaufrufe verketten:
// Limit the number of rows to 5, sorted by parent_id.
var dfSubset = df.sort(col("parent_id")).limit(5);
// Return the first 5 rows, sorted by parent_id.
var arrayOfRows = df.sort(col("parent_id")).first(5)
// Print the first 5 rows, sorted by parent_id.
df.sort(col("parent_id")).show(5)
Abrufen von Spaltendefinitionen¶
Um die Definition der Spalten im Dataset für DataFrame abzurufen, rufen Sie die Methode schema
auf. Diese Methode gibt ein StructType
-Objekt zurück, das ein Array
von StructField
-Objekten enthält. Jedes StructField
-Objekt enthält die Definition einer Spalte.
// Get the StructType object that describes the columns in the
// underlying rowset.
val tableSchema = session.table("sample_product_data").schema
println("Schema for sample_product_data: " + tableSchema);
Im zurückgegebenen StructType
-Objekt sind die Spaltennamen immer normalisiert. Bezeichner, die nicht mit Anführungszeichen umschlossen sind, werden in Großbuchstaben zurückgegeben. Bezeichner in Anführungszeichen werden genau in der Schreibweise zurückgegeben, in der sie definiert wurden.
Im folgenden Beispiel wird ein DataFrame erstellt, das die Spalten ID
und 3rd
enthält. Für den Spaltennamen 3rd
schließt die Snowpark-Bibliothek den Namen automatisch in doppelte Anführungszeichen ein ("3rd"
), da der Name nicht den Anforderungen an einen Bezeichner entspricht.
Im Beispiel wird erst die schema
-Methode aufgerufen und dann die names
-Methode auf dem zurückgegebenen Objekt StructType
, um ein ArraySeq
von Spaltennamen zu erhalten. Die Namen werden in dem von der schema
-Methode zurückgegebenen StructType
normalisiert.
// Create a DataFrame containing the "id" and "3rd" columns.
val dfSelectedColumns = session.table("sample_product_data").select(col("id"), col("3rd"))
// Print out the names of the columns in the schema. This prints out:
// ArraySeq(ID, "3rd")
println(dfSelectedColumns.schema.names.toSeq)
Verknüpfen von DataFrames¶
Zum Verknüpfen (Join) von DataFrame-Objekten rufen Sie die DataFrame-Methode auf:
In den folgenden Abschnitten wird erläutert, wie DataFrames zur Durchführung einer Verknüpfung verwendet werden:
Einrichten der Beispieldaten für die Verknüpfungen¶
Die Beispiele in den nächsten Abschnitten verwenden Beispieldaten, die Sie durch Ausführen der folgenden SQL-Anweisungen einrichten können:
create or replace table sample_a (
id_a integer,
name_a varchar,
value integer
);
insert into sample_a (id_a, name_a, value) values
(10, 'A1', 5),
(40, 'A2', 10),
(80, 'A3', 15),
(90, 'A4', 20)
;
create or replace table sample_b (
id_b integer,
name_b varchar,
id_a integer,
value integer
);
insert into sample_b (id_b, name_b, id_a, value) values
(4000, 'B1', 40, 10),
(4001, 'B2', 10, 5),
(9000, 'B3', 80, 15),
(9099, 'B4', null, 200)
;
create or replace table sample_c (
id_c integer,
name_c varchar,
id_a integer,
id_b integer
);
insert into sample_c (id_c, name_c, id_a, id_b) values
(1012, 'C1', 10, null),
(1040, 'C2', 40, 4000),
(1041, 'C3', 40, 4001)
;
Angeben der Spalten für den Join¶
Mit der Methode DataFrame.join
können Sie die zu verwendenden Spalten auf eine der folgenden Arten angeben:
Geben Sie einen Spaltenausdruck an, der die Verknüpfungsbedingung beschreibt.
Geben Sie eine oder mehrere Spalten an, die als gemeinsame Spalten im Join verwendet werden sollen.
Im folgenden Beispiel wird eine innere Verknüpfung (Inner Join) für die Spalte mit dem Namen id_a
ausgeführt:
// Create a DataFrame that joins the DataFrames for the tables
// "sample_a" and "sample_b" on the column named "id_a".
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"))
dfJoined.show()
Beachten Sie, dass im Beispiel die Methode DataFrame.col
verwendet wird, um die im Join zu verwendenden Spalten anzugeben. Weitere Informationen zu dieser Methode finden Sie unter Angeben von Spalten und Ausdrücken.
Dies gibt die folgende Ausgabe aus:
----------------------------------------------------------------------
|"ID_A" |"NAME_A" |"VALUE" |"ID_B" |"NAME_B" |"ID_A" |"VALUE" |
----------------------------------------------------------------------
|10 |A1 |5 |4001 |B2 |10 |5 |
|40 |A2 |10 |4000 |B1 |40 |10 |
|80 |A3 |15 |9000 |B3 |80 |15 |
----------------------------------------------------------------------
Identische Spaltennamen, die im Join-Ergebnis dupliziert sind¶
In dem DataFrame, der aus einer Join-Verknüpfung resultiert, verwendet die Snowpark-Bibliothek die Spaltennamen, die in den verknüpften Tabellen gefunden wurden, selbst wenn die Spaltennamen in den Tabellen identisch sind. In diesem Fall werden diese Spaltennamen in dem aus der Verknüpfung resultierenden DataFrame dupliziert. Um auf eine duplizierte Spalte über den Namen zuzugreifen, rufen Sie die col
-Methode auf dem DataFrame auf, der die Originaltabelle der Spalte repräsentiert. (Weitere Informationen zum Spezifizieren von Spalten finden Sie unter Verweisen auf Spalten in verschiedenen DataFrames.)
Der Code im folgenden Beispiel verknüpft zwei DataFrames und ruft dann die select
-Methode auf dem verknüpften DataFrame auf. Die auszuwählenden Spalten werden angegeben, indem die Methode col
über die Variable aufgerufen wird, die die entsprechenden DataFrame-Objekte dfRhs
und dfLhs
repräsentiert. Die as
-Methode wird verwendet, um den von der select
-Methode erstellten Spalten im DataFrame neue Namen zu geben.
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"))
val dfSelected = dfJoined.select(dfLhs.col("value").as("LeftValue"), dfRhs.col("value").as("RightValue"))
dfSelected.show()
Dies gibt die folgende Ausgabe aus:
------------------------------
|"LEFTVALUE" |"RIGHTVALUE" |
------------------------------
|5 |5 |
|10 |10 |
|15 |15 |
------------------------------
Spalten vor dem Speichern oder Zwischenspeichern deduplizieren¶
Beachten Sie, dass Sie bei einem DataFrame, der aus einer Join-Verknüpfung resultiert und doppelte Spaltennamen enthält, die Spalten deduplizieren oder umbenennen müssen, um die Duplikate aus dem DataFrame zu entfernen, bevor Sie das Ergebnis in einer Tabelle speichern oder den DataFrame zwischenspeichern. Bei doppelten Spaltennamen in einem DataFrame, den Sie in einer Tabelle oder im Cache speichern, ersetzt die Snowpark-Bibliothek die doppelten Spaltennamen durch Aliasnamen, sodass diese nicht mehr doppelt vorkommen.
Das folgende Beispiel veranschaulicht, wie die Ausgabe eines zwischengespeicherten DataFrame aussehen könnte, wenn die Spaltennamen ID_A
und VALUE
in einer Join-Verknüpfung von zwei Tabellen dupliziert und dann vor dem Zwischenspeichern des Ergebnisses nicht dedupliziert oder umbenannt wurden.
--------------------------------------------------------------------------------------------------
|"l_ZSz7_ID_A" |"NAME_A" |"l_ZSz7_VALUE" |"ID_B" |"NAME_B" |"r_heec_ID_A" |"r_heec_VALUE" |
--------------------------------------------------------------------------------------------------
|10 |A1 |5 |4001 |B2 |10 |5 |
|40 |A2 |10 |4000 |B1 |40 |10 |
|80 |A3 |15 |9000 |B3 |80 |15 |
--------------------------------------------------------------------------------------------------
Ausführen einer natürlichen Verknüpfung (Natural Join)¶
Um eine natürliche Verknüpfung (Natural Join) auszuführen, bei der DataFrames über Spalten mit demselben Namen verknüpft werden, rufen Sie die Methode DataFrame.naturalJoin auf.
Im folgenden Beispiel werden die DataFrames für die Tabellen sample_a
und sample_b
über ihre gemeinsamen Spalten (die Spalte id_a
) verknüpft:
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.naturalJoin(dfRhs)
dfJoined.show()
Dies gibt die folgende Ausgabe aus:
---------------------------------------------------
|"ID_A" |"VALUE" |"NAME_A" |"ID_B" |"NAME_B" |
---------------------------------------------------
|10 |5 |A1 |4001 |B2 |
|40 |10 |A2 |4000 |B1 |
|80 |15 |A3 |9000 |B3 |
---------------------------------------------------
Geben Sie den Typ der Verknüpfung an:¶
Standardmäßig erstellt die Methode DataFrame.join
eine innere Verknüpfung (Inner Join). Um einen anderen Verknüpfungstyp anzugeben, verwenden Sie für das Argument joinType
einen der folgenden Werte:
Typ der Verknüpfung (Join) |
|
---|---|
Innere Verknüpfung (Inner Join) |
|
Linke äußere Verknüpfung: |
|
Rechte äußere Verknüpfung: |
|
Vollständige äußere Verknüpfung: |
|
Kreuzverknüpfung (Cross Join) |
|
Beispiel:
// Create a DataFrame that performs a left outer join on
// "sample_a" and "sample_b" on the column named "id_a".
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfLeftOuterJoin = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"), "left")
dfLeftOuterJoin.show()
Dies gibt die folgende Ausgabe aus:
----------------------------------------------------------------------
|"ID_A" |"NAME_A" |"VALUE" |"ID_B" |"NAME_B" |"ID_A" |"VALUE" |
----------------------------------------------------------------------
|40 |A2 |10 |4000 |B1 |40 |10 |
|10 |A1 |5 |4001 |B2 |10 |5 |
|80 |A3 |15 |9000 |B3 |80 |15 |
|90 |A4 |20 |NULL |NULL |NULL |NULL |
----------------------------------------------------------------------
Mehrere Tabellen verknüpfen¶
So verknüpfen Sie mehrere Tabellen:
Erstellen Sie einen DataFrame für jede Tabelle.
Rufen Sie die Methode
DataFrame.join
für den ersten DataFrame auf, und übergeben Sie den zweiten DataFrame.Rufen Sie mit den von der Methode
join
zurückgegebenen DataFrame die Methodejoin
auf und übergeben Sie den dritten DataFrame.
Sie können die join
-Aufrufe wie unten gezeigt verketten:
val dfFirst = session.table("sample_a")
val dfSecond = session.table("sample_b")
val dfThird = session.table("sample_c")
val dfJoinThreeTables = dfFirst.join(dfSecond, dfFirst.col("id_a") === dfSecond.col("id_a")).join(dfThird, dfFirst.col("id_a") === dfThird.col("id_a"))
dfJoinThreeTables.show()
Dies gibt die folgende Ausgabe aus:
------------------------------------------------------------------------------------------------------------
|"ID_A" |"NAME_A" |"VALUE" |"ID_B" |"NAME_B" |"ID_A" |"VALUE" |"ID_C" |"NAME_C" |"ID_A" |"ID_B" |
------------------------------------------------------------------------------------------------------------
|10 |A1 |5 |4001 |B2 |10 |5 |1012 |C1 |10 |NULL |
|40 |A2 |10 |4000 |B1 |40 |10 |1040 |C2 |40 |4000 |
|40 |A2 |10 |4000 |B1 |40 |10 |1041 |C3 |40 |4001 |
------------------------------------------------------------------------------------------------------------
Ausführen einer Selbstverknüpfung (Self-Join)¶
Wenn Sie eine Tabelle mit sich selbst über verschiedene Spalten verknüpfen müssen, können Sie die Selbstverknüpfung (Self-Join) nicht mit einem einzelnen DataFrame durchführen. Im folgenden Beispiel wird ein einzelnes DataFrame in einer Selbstverknüpfung verwendet. Diese Ausführung schlägt fehl, weil die Spaltenausdrücke für "id"
sowohl auf der linken als auch der rechten Seite der Verknüpfung vorhanden sind:
// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
val df = session.table("sample_product_data");
val dfJoined = df.join(df, col("id") === col("parent_id"))
// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
val df = session.table("sample_product_data");
val dfJoined = df.join(df, df("id") === df("parent_id"))
Beide Beispiele scheitern mit der folgenden Ausnahme:
Exception in thread "main" com.snowflake.snowpark.SnowparkClientException:
Joining a DataFrame to itself can lead to incorrect results due to ambiguity of column references.
Instead, join this DataFrame to a clone() of itself.
Verwenden Sie stattdessen die Methode DataFrame.clone, um einen Klon des DataFrame-Objekts zu erstellen, und verwenden Sie dann die beiden DataFrame-Objekte für die Verknüpfung:
// Create a DataFrame object for the "sample_product_data" table for the left-hand side of the join.
val dfLhs = session.table("sample_product_data")
// Clone the DataFrame object to use as the right-hand side of the join.
val dfRhs = dfLhs.clone()
// Create a DataFrame that joins the two DataFrames
// for the "sample_product_data" table on the
// "id" and "parent_id" columns.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id") === dfRhs.col("parent_id"))
dfJoined.show()
Wenn Sie eine Selbstverknüpfung auf derselben Spalte ausführen möchten, rufen Sie die join
-Methode auf, die eine Seq
von Spaltenausdrücken für die USING
-Klausel übergibt:
// Create a DataFrame that performs a self-join on a DataFrame
// using the column named "key".
val df = session.table("sample_product_data");
val dfJoined = df.join(df, Seq("key"))
Ausführen einer Aktion zum Auswerten eines DataFrame¶
Wie bereits erwähnt, wird der DataFrame im Lazy-Modus ausgewertet, d. h. die SQL-Anweisung wird erst dann zur Ausführung an den Server gesendet, wenn Sie eine Aktion ausführen. Eine Aktion löst die Auswertung des DataFrame aus und sendet die entsprechende SQL-Anweisung zur Ausführung an den Server.
In den folgenden Abschnitten wird erläutert, wie eine Aktion auf einem DataFrame synchron und asynchron ausgeführt werden kann:
Synchrones Ausführen einer Aktion¶
Um eine Aktion synchron auszuführen, rufen Sie eine der folgenden Aktionsmethoden auf:
Methode zur synchronen Ausführung einer Aktion |
Beschreibung |
---|---|
|
Wertet den DataFrame aus und gibt das resultierende Dataset als ein |
|
Wertet den DataFrame aus und gibt einen Iterator für Row-Objekte zurück. Verwenden Sie diese Methode bei einem großen Resultset, um zu vermeiden, dass alle Ergebnisse auf einmal in den Arbeitsspeicher geladen werden. Siehe Zurückgeben eines Iterators für die Zeilen. |
|
Wertet den DataFrame aus und gibt die Anzahl der Zeilen zurück. |
|
Wertet den DataFrame aus und gibt die Zeilen auf der Konsole aus. Beachten Sie, dass bei dieser Methode die Anzahl der Zeilen (standardmäßig) auf 10 beschränkt ist. Siehe Ausgeben der Zeilen eines DataFrame. |
|
Führt die Abfrage aus, erstellt eine temporäre Tabelle und füllt die Tabelle mit den Ergebnissen. Die Methode gibt ein |
|
Speichert die im DataFrame enthaltenen Daten in die angegebene Tabelle. Siehe Speichern von Daten in eine Tabelle. |
|
Speichert einen DataFrame in einer angegebenen Datei in einem Stagingbereich. Siehe Speichern eines DataFrame in Dateien in einem Stagingbereich. |
|
Kopiert die im DataFrame enthaltenen Daten in die angegebene Tabelle. Siehe Kopieren von Daten aus Dateien in eine Tabelle. |
|
Löscht Zeilen aus der angegebenen Tabelle. Siehe Aktualisieren, Löschen und Zusammenführen von Zeilen in einer Tabelle. |
|
Aktualisiert Zeilen in der angegebenen Tabelle. Siehe Aktualisieren, Löschen und Zusammenführen von Zeilen in einer Tabelle. |
|
Führt Zeilen in der angegebenen Tabelle zusammen. Siehe Aktualisieren, Löschen und Zusammenführen von Zeilen in einer Tabelle. |
Um die Abfrage auszuführen und die Anzahl der Ergebnisse zurückzugeben, rufen Sie die count
-Methode auf:
// Create a DataFrame for the "sample_product_data" table.
val dfProducts = session.table("sample_product_data")
// Send the query to the server for execution and
// print the count of rows in the table.
println("Rows returned: " + dfProducts.count())
Sie können auch Aktionsmethoden aufrufen, um Folgendes zu tun:
Ausführen einer Abfrage auf einer Tabelle und Zurückgeben der Ergebnisse
Ausführen einer Abfrage und Ausgeben der Ergebnisse auf einer Konsole
Hinweis: Wenn Sie die schema
-Methode aufrufen, um die Definitionen der Spalten im DataFrame zu erhalten, müssen Sie keine Aktionsmethode aufrufen.
Asynchrones Ausführen einer Aktion¶
Bemerkung
Dieses Feature wurde in Snowpark 0.11.0 eingeführt.
Zur asynchronen Ausführung einer Aktion müssen Sie die Methode async
aufrufen, um ein Objekt für den „asynchronen Akteur“ (z. B. DataFrameAsyncActor
) zurückzugeben, und dann eine asynchrone Aktionsmethode in diesem Objekt aufrufen.
Diese Aktionsmethoden eines asynchronen Akteursobjekts geben ein TypedAsyncJob
-Objekt zurück, das Sie verwenden können, um den Status der asynchronen Aktion zu überprüfen und die Ergebnisse der Aktion abzurufen.
In den nächsten Abschnitten wird erläutert, wie Sie Aktionen asynchron ausführen und die Ergebnisse überprüfen können.
Erläuterungen zum grundlegenden Ablauf asynchroner Aktionen¶
Sie können die folgenden Methoden verwenden, um eine Aktion asynchron auszuführen:
Methode zur asynchronen Ausführung einer Aktion |
Beschreibung |
---|---|
|
Wertet den DataFrame asynchron aus, um das resultierende Dataset als |
|
Wertet den DataFrame asynchron aus, um einen Iterator für Row-Objekte abzurufen. Verwenden Sie diese Methode bei einem großen Resultset, um zu vermeiden, dass alle Ergebnisse auf einmal in den Arbeitsspeicher geladen werden. Siehe Zurückgeben eines Iterators für die Zeilen. |
|
Wertet den DataFrame asynchron aus, um die Anzahl der Zeilen abzurufen. |
|
Speichert die im DataFrame enthaltenen Daten asynchron in die angegebene Tabelle. Siehe Speichern von Daten in eine Tabelle. |
|
Speichert einen DataFrame in einer angegebenen Datei in einem Stagingbereich. Siehe Speichern eines DataFrame in Dateien in einem Stagingbereich. |
|
Kopiert die im DataFrame enthaltenen Daten asynchron in die angegebene Tabelle. Siehe Kopieren von Daten aus Dateien in eine Tabelle. |
|
Löscht Zeilen asynchron aus der angegebenen Tabelle. Siehe Aktualisieren, Löschen und Zusammenführen von Zeilen in einer Tabelle. |
|
Aktualisiert Zeilen asynchron in der angegebenen Tabelle. Siehe Aktualisieren, Löschen und Zusammenführen von Zeilen in einer Tabelle. |
|
Führt Zeilen in der angegebenen Tabelle asynchron zusammen. Unterstützt in Version 1.3.0 oder höher. Siehe Aktualisieren, Löschen und Zusammenführen von Zeilen in einer Tabelle. |
Mit dem zurückgegebenen Objekt TypedAsyncJob können Sie Folgendes tun:
Um festzustellen, ob die Aktion abgeschlossen ist, rufen Sie die Methode
isDone
auf.Um die Abfrage-ID der zugehörigen Aktion zu erhalten, rufen Sie die Methode
getQueryId
auf.Um die Ergebnisse der Aktion zurückzugeben (z. B. das
Array
vonRow
-Objekten für die Methodecollect
oder die Anzahl der Zeilen für die Methodecount
), rufen Sie die MethodegetResult
auf.Beachten Sie, dass
getResult
ein blockierender Aufruf ist.Um die Aktion abzubrechen, rufen Sie die Methode
cancel
auf.
Um zum Beispiel eine Abfrage asynchron auszuführen und die Ergebnisse als Array
von Row
-Objekten abzurufen, rufen Sie DataFrame.async.collect
auf:
// Create a DataFrame with the "id" and "name" columns from the "sample_product_data" table.
// This does not execute the query.
val df = session.table("sample_product_data").select(col("id"), col("name"))
// Execute the query asynchronously.
// This call does not block.
val asyncJob = df.async.collect()
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// Get an Array of Rows containing the results, and print the results.
// Note that getResult is a blocking call.
val results = asyncJob.getResult()
results.foreach(println)
Um die Abfrage asynchron auszuführen und die Anzahl der Ergebnisse abzurufen, rufen Sie DataFrame.async.count
auf:
// Create a DataFrame for the "sample_product_data" table.
val dfProducts = session.table("sample_product_data")
// Execute the query asynchronously.
// This call does not block.
val asyncJob = df.async.count()
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// Print the count of rows in the table.
// Note that getResult is a blocking call.
println("Rows returned: " + asyncJob.getResult())
Festlegen der maximalen Anzahl der zu wartenden Sekunden¶
Beim Aufruf der Methode getResult
können Sie mit dem Argument maxWaitTimeInSeconds
die maximale Anzahl von Sekunden angeben, die auf den Abschluss der Abfrage gewartet werden soll, bevor versucht wird, die Ergebnisse abzurufen. Beispiel:
// Wait a maximum of 10 seconds for the query to complete before retrieving the results.
val results = asyncJob.getResult(10)
Wenn Sie dieses Argument weglassen, wartet die Methode auf die maximale Anzahl von Sekunden, die in der Konfigurationseigenschaft snowpark_request_timeout_in_seconds angegeben ist. (Dies ist eine Eigenschaft, die Sie beim Erstellen des Sitzungsobjekts festlegen können.)
Zugriff auf eine asynchrone Abfrage über die ID¶
Mithilfe der Abfrage-ID einer zuvor übermittelten asynchronen Abfrage können Sie die Methode Session.createAsyncJob
aufrufen, um ein AsyncJob-Objekt zu erstellen. Mithilfe dieses Objekts können Sie dann den Status der Abfrage überprüfen, die Abfrageergebnisse abrufen oder die Abfrage abbrechen.
Beachten Sie, dass AsyncJob
im Gegensatz zu TypedAsyncJob
keine getResult
-Methode zum Abrufen der Ergebnisse bereitstellt. Wenn Sie die Ergebnisse abrufen müssen, verwenden Sie stattdessen die Methode getRows
oder getIterator
.
Beispiel:
val asyncJob = session.createAsyncJob(myQueryId)
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// If you need to retrieve the results, call getRows to return an Array of Rows containing the results.
// Note that getRows is a blocking call.
val rows = asyncJob.getRows()
rows.foreach(println)
Abrufen von Zeilen in einem DataFrame¶
Nachdem Sie angegeben haben, wie der DataFrame transformiert werden soll, können Sie eine Aktionsmethode aufrufen, um eine Abfrage auszuführen und die Ergebnisse zurückzugeben. Sie können alle Zeilen in einem Array
zurückgeben, oder Sie können einen Iterator zurückgeben, mit dem Sie die Ergebnisse Zeile für Zeile durchlaufen können. In letzterem Fall werden bei großen Datenmengen die Zeilen blockweise in den Arbeitsspeicher geladen, um das Laden einer großen Datenmenge in den Arbeitsspeicher zu vermeiden.
Alle Zeilen zurückgeben¶
Um alle Zeilen auf einmal zurückzugeben, rufen Sie die Methode DataFrame.collect auf. Diese Methode gibt ein Array von Row-Objekten zurück. Um die Werte aus der Zeile abzurufen, rufen Sie die Methode getType
auf, wobei „Type“ der Datentyp ist (z. B. getString
, getInt
usw.).
Beispiel:
import com.snowflake.snowpark.functions_
val rows = session.table("sample_product_data").select(col("name"), col("category_id")).sort(col("name")).collect()
for (row <- rows) {
println(s"Name: ${row.getString(0)}; Category ID: ${row.getInt(1)}")
}
Zurückgeben eines Iterators für die Zeilen¶
Wenn Sie einen Iterator verwenden möchten, um die Row-Objekte in den Ergebnissen zeilenweise zu durchlaufen, rufen Sie DataFrame.toLocalIterator auf. Wenn die Datenmenge in den Ergebnissen groß ist, lädt die Methode die Zeilen blockweise, um zu vermeiden, dass alle Zeilen auf einmal in den Arbeitsspeicher geladen werden.
Beispiel:
import com.snowflake.snowpark.functions_
while (rowIterator.hasNext) {
val row = rowIterator.next()
println(s"Name: ${row.getString(0)}; Category ID: ${row.getInt(1)}")
}
Zurückgeben der ersten n
Zeilen¶
Um die ersten n
Zeilen zurückzugeben, rufen Sie die Methode DataFrame.first auf und übergeben die Anzahl der zurückzugebenden Zeilen.
Wie unter Begrenzen der Anzahl von Zeilen in einem DataFrame erläutert, sind die Ergebnisse nicht deterministisch. Wenn die Ergebnisse deterministisch sein sollen, müssen Sie diese Methode auf einem sortierten DataFrame (df.sort().first()
) aufrufen.
Beispiel:
import com.snowflake.snowpark.functions_
val df = session.table("sample_product_data")
val rows = df.sort(col("name")).first(5)
rows.foreach(println)
Ausgeben der Zeilen eines DataFrame¶
Um die ersten 10 Zeilen des DataFrame auf der Konsole auszugeben, rufen Sie die Methode DataFrame.show auf. Um eine andere Anzahl von Zeilen auszugeben, übergeben Sie die Anzahl der auszugebenden Zeilen.
Wie unter Begrenzen der Anzahl von Zeilen in einem DataFrame erläutert, sind die Ergebnisse nicht deterministisch. Wenn die Ergebnisse deterministisch sein sollen, müssen Sie diese Methode auf einem sortierten DataFrame (df.sort().show()
) aufrufen.
Beispiel:
import com.snowflake.snowpark.functions_
val df = session.table("sample_product_data")
df.sort(col("name")).show()
Aktualisieren, Löschen und Zusammenführen von Zeilen in einer Tabelle¶
Bemerkung
Dieses Feature wurde in Snowpark 0.7.0 eingeführt.
Wenn Sie Session.table
aufrufen, um ein DataFrame
-Objekt für eine Tabelle zu erstellen, gibt die Methode ein Updatable
-Objekt zurück, das DataFrame
um zusätzliche Methoden zum Aktualisieren und Löschen von Daten in der Tabelle erweitert. (Siehe Updatable.)
Wenn Sie Zeilen in einer Tabelle aktualisieren oder löschen müssen, können Sie die folgenden Methoden der Klasse Updatable
verwenden:
Rufen Sie
update
auf, um vorhandene Zeilen in der Tabelle zu aktualisieren. Siehe Aktualisieren von Zeilen in einer Tabelle.Rufen Sie
delete
auf, um Zeilen aus einer Tabelle zu löschen. Siehe Löschen von Zeilen aus einer Tabelle.Rufen Sie
merge
auf, um Zeilen einer Tabelle auf der Basis von Daten einer zweiten Tabelle oder Unterabfrage einzufügen, zu aktualisieren und zu löschen. (Dies ist das Äquivalent zum Befehl MERGE in SQL.) Siehe Zusammenführen (Merge) von Zeilen in einer Tabelle.
Aktualisieren von Zeilen in einer Tabelle¶
Übergeben Sie der Methode update
ein Map
-Objekt, das die zu aktualisierenden Spalten und die entsprechenden Werte verknüpft, die diesen Spalten zugewiesen werden sollen. update
gibt ein UpdateResult
-Objekt zurück, das die Anzahl der aktualisierten Zeilen enthält. (Siehe UpdateResult.)
Bemerkung
update
ist eine Aktionsmethode, was bedeutet, dass bei Aufruf der Methode die SQL-Anweisungen zur Ausführung an den Server gesendet werden.
Im folgenden Beispiel werden die Werte in der Spalte mit dem Namen count
durch den Wert 1
ersetzt:
val updatableDf = session.table("sample_product_data")
val updateResult = updatableDf.update(Map("count" -> lit(1)))
println(s"Number of rows updated: ${updateResult.rowsUpdated}")
Im obigen Beispiel wird der Name der Spalte verwendet, um die Spalte zu identifizieren. Sie können auch einen Spaltenausdruck verwenden:
val updateResult = updatableDf.update(Map(col("count") -> lit(1)))
Wenn die Aktualisierung nur erfolgen werden soll, wenn eine Bedingung erfüllt ist, können Sie diese Bedingung als Argument angeben. Im folgenden Beispiel werden die Werte in der Spalte mit dem Namen count
für Zeilen ersetzt, in denen die Spalte category_id
den Wert 20
hat:
val updateResult = updatableDf.update(Map(col("count") -> lit(1)), col("category_id") === 20)
Wenn die Bedingung auf einer Verknüpfung (Join) mit einem anderen DataFrame
-Objekt basieren soll, können Sie diesen DataFrame
als Argument übergeben und diesen DataFrame
in der Bedingung verwenden. Im folgenden Beispiel werden die Werte in der Spalte mit dem Namen count
bei Zeilen ersetzt, in denen die Spalte category_id
mit der category_id
in der DataFrame
-Spalte dfParts
übereinstimmt:
val updatableDf = session.table("sample_product_data")
val dfParts = session.table("parts")
val updateResult = updatableDf.update(Map(col("count") -> lit(1)), updatableDf("category_id") === dfParts("category_id"), dfParts)
Löschen von Zeilen aus einer Tabelle¶
Sie können für die Methode delete
eine Bedingung angeben, die die zu löschenden Zeilen identifiziert, und diese Bedingung kann auf einer Verknüpfung (Join) mit einem anderen DataFrame basieren. delete
gibt ein DeleteResult
-Objekt zurück, das die Anzahl der gelöschten Zeilen enthält. (Siehe DeleteResult.)
Bemerkung
delete
ist eine Aktionsmethode, was bedeutet, dass bei Aufruf der Methode die SQL-Anweisungen zur Ausführung an den Server gesendet werden.
Im folgenden Beispiel werden Zeilen gelöscht, die den Wert 1
in der Spalte category_id
haben:
val updatableDf = session.table("sample_product_data")
val deleteResult = updatableDf.delete(updatableDf("category_id") === 1)
println(s"Number of rows deleted: ${deleteResult.rowsDeleted}")
Wenn sich die Bedingung auf Spalten in einem anderen DataFrame bezieht, übergeben Sie dieses DataFrame als zweites Argument. Im folgenden Beispiel werden die Zeilen gelöscht, in denen die Spalte category_id
mit der Spalte category_id
des DataFrame
mit dem Namen dfParts
übereinstimmt, wobei dfParts
als zweites Argument übergeben wird:
val updatableDf = session.table("sample_product_data")
val deleteResult = updatableDf.delete(updatableDf("category_id") === dfParts("category_id"), dfParts)
println(s"Number of rows deleted: ${deleteResult.rowsDeleted}")
Zusammenführen (Merge) von Zeilen in einer Tabelle¶
Wenn Sie Zeilen einer Tabelle einfügen, aktualisieren oder löschen möchten, die auf Werten einer zweiten Tabelle oder einer Unterabfrage basieren (das Äquivalent des MERGE-Befehls in SQL), gehen Sie wie folgt vor:
Rufen Sie im
Updatable
-Objekt der Tabelle, in der die Daten zusammengeführt werden sollen, die Methodemerge
auf, und übergeben Sie dabei dasDataFrame
-Objekt für die andere Tabelle und den Spaltenausdruck für die Verknüpfungsbedingung.Dies gibt ein
MergeBuilder
-Objekt zurück, das Sie verwenden können, um die Aktionen (z. B. Einfügen, Aktualisieren, Löschen) für die übereinstimmenden und die nicht übereinstimmenden Zeilen anzugeben. (Siehe MergeBuilder.)Verwendung des Objekts
MergeBuilder
:Um die Aktualisierung oder Löschung von übereinstimmenden Zeilen anzugeben, rufen Sie die Methode
whenMatched
auf.Wenn Sie eine zusätzliche Bedingung angeben müssen, ob Zeilen aktualisiert oder gelöscht werden sollen, können Sie einen Spaltenausdruck für diese Bedingung übergeben.
Diese Methode gibt ein
MatchedClauseBuilder
-Objekt zurück, das Sie zur Angabe der auszuführenden Aktion verwenden können. (Siehe MatchedClauseBuilder.)Rufen Sie die Methode
update
oderdelete
im ObjektMatchedClauseBuilder
auf, um die Aktualisierungs- oder Löschaktion anzugeben, die für die übereinstimmenden Zeilen durchgeführt werden soll. Diese Methoden geben einMergeBuilder
-Objekt zurück, das Sie zur Angabe zusätzlicher Klauseln verwenden können.Um die Einfügung anzugeben, die ausgeführt werden soll, wenn Zeilen nicht übereinstimmen, rufen Sie die Methode
whenNotMatched
auf.Wenn Sie für das Einfügen einer Zeile eine zusätzliche Bedingung angeben müssen, können Sie einen Spaltenausdruck für diese Bedingung übergeben.
Diese Methode gibt ein
NotMatchedClauseBuilder
-Objekt zurück, das Sie zur Angabe der auszuführenden Aktion verwenden können. (Siehe NotMatchedClauseBuilder.)Rufen Sie die Methode
insert
im ObjektNotMatchedClauseBuilder
auf, um die Einfügeaktion festzulegen, die durchgeführt werden soll, wenn Zeilen nicht übereinstimmen. Diese Methode gibt einMergeBuilder
-Objekt zurück, das Sie zur Angabe zusätzlicher Klauseln verwenden können.
Wenn Sie die auszuführenden Einfügungen, Aktualisierungen und Löschungen angegeben haben, rufen Sie die Methode
collect
des ObjektsMergeBuilder
auf, um die angegebenen Einfügungen, Aktualisierungen und Löschungen in der Tabelle auszuführen.collect
gibt einMergeResult
-Objekt zurück, das die Anzahl der Zeilen enthält, die eingefügt, aktualisiert und gelöscht wurden. (Siehe MergeResult.)
Das folgende Beispiel fügt eine Zeile mit den Spalten id
und value
aus der Tabelle source
in die Tabelle target
ein, wenn die Tabelle target
keine Zeile mit einer übereinstimmenden ID enthält:
val mergeResult = target.merge(source, target("id") === source("id"))
.whenNotMatched.insert(Seq(source("id"), source("value")))
.collect()
Im folgenden Beispiel wird eine Zeile in der Tabelle target
mit dem Wert der Spalte value
aus der Zeile in der Tabelle source
aktualisiert, die dieselbe ID hat:
val mergeResult = target.merge(source, target("id") === source("id"))
.whenMatched.update(Map("value" -> source("value")))
.collect()
Speichern von Daten in eine Tabelle¶
Sie können den Inhalt eines DataFrame in eine neue oder bestehende Tabelle speichern. Dazu müssen Sie über die folgenden Berechtigungen verfügen:
CREATE TABLE-Berechtigungen für das Schema, wenn die Tabelle nicht vorhanden ist.
INSERT-Berechtigungen für die Tabelle.
So speichern Sie den Inhalt eines DataFrame in eine Tabelle:
Rufen Sie die Methode DataFrame.write auf, um ein DataFrameWriter-Objekt zu erhalten.
Rufen Sie die Methode DataFrameWriter.mode auf, und übergeben Sie dabei ein SaveMode-Objekt, das Ihre Präferenzen für das Schreiben in die Tabelle angibt:
Um Zeilen einzufügen, übergeben Sie
SaveMode.Append
.Um die vorhandene Tabelle zu überschreiben, übergeben Sie
SaveMode.Overwrite
.
Diese Methode gibt das gleiche
DataFrameWriter
-Objekt zurück, das mit dem angegebenen Modus konfiguriert wurde.Wenn Sie Zeilen in eine bestehende Tabelle einfügen (
SaveMode.Append
) und die Spaltennamen im DataFrame mit den Spaltennamen in der Tabelle übereinstimmen, rufen Sie die DataFrameWriter.option-Methode auf und geben Sie"columnOrder"
und"name"
als Argumente an.Bemerkung
Diese Methode wurde in Snowpark 1.4.0 eingeführt.
Standardmäßig ist die Option
columnOrder
auf"index"
gesetzt, was bedeutet, dassDataFrameWriter
die Werte in der Reihenfolge einfügt, in der die Spalten erscheinen. So fügtDataFrameWriter
beispielsweise den Wert aus der ersten Spalte des DataFrame in die erste Spalte der Tabelle ein, die zweite Spalte des DataFrame in die zweite Spalte der Tabelle usw.Diese Methode gibt das gleiche
DataFrameWriter
-Objekt zurück, das mit der angegebenen Option konfiguriert wurde.Rufen Sie die Methode DataFrameWriter.saveAsTable auf, um den Inhalt des DataFrame in die angegebene Tabelle zu speichern.
Sie müssen keine separate Methode (z. B.
collect
) aufrufen, um die SQL-Anweisung auszuführen, mit der Daten in die Tabelle gespeichert werden.saveAsTable
ist eine Aktionsmethode, die SQL-Anweisungen ausführt.
Im folgenden Beispiel wird eine bestehende Tabelle (identifiziert durch die Variable tableName
) mit dem Inhalt von DataFrame df
überschrieben:
df.write.mode(SaveMode.Overwrite).saveAsTable(tableName)
Im folgenden Beispiel wird eine Zeile aus dem DataFrame mit dem Namen df
in die bestehende Tabelle (identifiziert durch die Variable tableName
) eingefügt. In diesem Beispiel enthalten die Tabelle und das DataFrame beide die Spalten c1
und c2
.
Das Beispiel zeigt den Unterschied zwischen der Einstellung der Option columnOrder
auf "name"
(fügt Werte in die Tabellenspalten mit denselben Namen wie die DataFrame-Spalten ein) und der Verwendung der Standardoption columnOrder
(fügt Werte in die Tabellenspalten basierend auf der Reihenfolge der Spalten in DataFrame ein).
val df = session.sql("SELECT 1 AS c2, 2 as c1")
// With the columnOrder option set to "name", the DataFrameWriter uses the column names
// and inserts a row with the values (2, 1).
df.write.mode(SaveMode.Append).option("columnOrder", "name").saveAsTable(tableName)
// With the default value of the columnOrder option ("index"), the DataFrameWriter the uses column positions
// and inserts a row with the values (1, 2).
df.write.mode(SaveMode.Append).saveAsTable(tableName)
Erstellen einer Ansicht aus einem DataFrame¶
Um eine Ansicht aus einem DataFrame zu erstellen, rufen Sie die Methode DataFrame.createOrReplaceView auf:
df.createOrReplaceView("db.schema.viewName")
Beachten Sie, dass bei Aufruf von createOrReplaceView
die neue Ansicht sofort erstellt wird. Noch wichtiger ist, dass der Aufruf nicht dazu führt, dass der DataFrame ausgewertet wird. (Der DataFrame selbst wird erst ausgewertet, wenn Sie eine Aktion ausführen.)
Ansichten, die Sie durch Aufruf von createOrReplaceView
erstellen, sind persistent. Wenn Sie diese Ansicht nicht mehr benötigen, können Sie die Ansicht manuell löschen.
Wenn Sie eine temporäre Ansicht nur für die Sitzung erstellen müssen, rufen Sie stattdessen die Methode DataFrame.createOrReplaceTempView auf:
df.createOrReplaceTempView("db.schema.viewName")
Zwischenspeichern eines DataFrame¶
In einigen Fällen müssen Sie eine komplexe Abfrage ausführen und die Ergebnisse für spätere Operationen aufbewahren (um nicht dieselbe Abfrage erneut ausführen zu müssen). In diesen Fällen können Sie den Inhalt eines DataFrame zwischenspeichern, indem Sie die Methode DataFrame.cacheResult aufrufen.
Diese Methode ermöglicht Folgendes:
Ausführen der Abfrage.
Es ist nicht erforderlich, vor dem Aufruf von
cacheResult
eine separate Aktionsmethode aufzurufen, um die Ergebnisse abzurufen.cacheResult
ist eine Aktionsmethode, die die Abfrage ausführt.Speichern der Ergebnisse in einer temporären Tabelle.
Da
cacheResult
eine temporäre Tabelle erstellt, müssen Sie über die CREATE TABLE-Berechtigung für das verwendete Schema verfügen.Zurückgeben eines HasCachedResult-Objekts, das den Zugriff auf die Ergebnisse in der temporären Tabelle ermöglicht.
Da
HasCachedResult
denDataFrame
erweitert, können Sie auf diesen zwischengespeicherten Daten einige der Operationen ausführen, die auch auf dem DataFrame ausgeführt werden können.
Bemerkung
Da cacheResult
die Abfrage ausführt und die Ergebnisse in einer Tabelle speichert, kann diese Methode zu erhöhten Compute- und Speicherkosten führen.
Beispiel:
import com.snowflake.snowpark.functions_
// Set up a DataFrame to query a table.
val df = session.table("sample_product_data").filter(col("category_id") > 10)
// Retrieve the results and cache the data.
val cachedDf = df.cacheResult()
// Create a DataFrame containing a subset of the cached data.
val dfSubset = cachedDf.filter(col("category_id") === lit(20)).select(col("name"), col("category_id"))
dfSubset.show()
Beachten Sie, dass der ursprüngliche DataFrame durch den Aufruf dieser Methode nicht beeinflusst wird. Angenommen, dfTable
ist ein DataFrame der Tabelle sample_product_data
:
val dfTempTable = dfTable.cacheResult()
Nach Aufruf von cacheResult
verweist dfTable
immer noch auf die Tabelle sample_product_data
, und Sie können dfTable
weiterhin zur Abfrage und Aktualisierung dieser Tabelle verwenden.
Für die Weiterverarbeitung der in der temporären Tabelle zwischengespeicherten Daten verwenden Sie dfTempTable
(das von cacheResult
zurückgegebene HasCachedResult
-Objekt).
Verwenden von Dateien in Stagingbereichen¶
Die Snowpark-Bibliothek bietet Klassen und Methoden, mit denen Sie Daten in Snowflake laden und Daten aus Snowflake entladen können, indem Sie Dateien in Stagingbereichen verwenden.
Bemerkung
Um diese Klassen und Methoden auf einem Stagingbereich anwenden zu können, müssen Sie über die erforderlichen Berechtigungen für die Verwendung des Stagingbereichs verfügen.
In den nächsten Abschnitten wird erläutert, wie diese Klassen und Methoden verwendet werden:
Verwenden von Eingabestreams zum Hoch- und Herunterladen von Daten eines Stagingbereichs
Einrichten eines DataFrame für Dateien in einem Stagingbereich
Speichern eines DataFrame in Dateien in einem Stagingbereich
Hochladen und Herunterladen von Dateien in Stagingbereichen¶
Zum Hoch- und Herunterladen von Dateien in Stagingbereichen verwenden Sie das Objekt FileOperation:
Hochladen von Dateien in einen Stagingbereich¶
So laden Sie Dateien in einen Stagingbereich hoch:
Überprüfen Sie, ob Sie über die erforderlichen Berechtigungen zum Hochladen von Dateien in den Stagingbereich verfügen.
Verwenden Sie Session.file, um auf das FileOperation-Objekt für die Sitzung zuzugreifen.
Rufen Sie die Methode FileOperation.put auf, um die Dateien in den Stagingbereich hochzuladen.
Diese Methode führt den SQL-Befehl PUT aus.
Um alle optionalen Parameter für den PUT-Befehl anzugeben, erstellen Sie eine
Map
der Parameter und Werte und übergeben dieMap
alsoptions
-Argument. Beispiel:// Upload a file to a stage without compressing the file. val putOptions = Map("AUTO_COMPRESS" -> "FALSE") val putResults = session.file.put("file:///tmp/myfile.csv", "@myStage", putOptions)
Im Argument
localFilePath
können Sie Platzhalter (*
und?
) verwenden, um einen Satz von Dateien zu identifizieren, die hochgeladen werden sollen. Beispiel:// Upload the CSV files in /tmp with names that start with "file". // You can use the wildcard characters "*" and "?" to match multiple files. val putResults = session.file.put("file:///tmp/file*.csv", "@myStage/prefix2")
Überprüfen Sie das
Array
von PutResult-Objekten, die von derput
-Methode zurückgegeben werden, um festzustellen, ob die Dateien erfolgreich hochgeladen wurden. Geben Sie zum Beispiel den Dateinamen und den Status der PUT-Operation für diese Datei aus:// Print the filename and the status of the PUT operation. putResults.foreach(r => println(s" ${r.sourceFileName}: ${r.status}"))
Herunterladen von Dateien aus Stagingbereichen¶
So laden Sie Dateien aus einem Stagingbereich herunter:
Überprüfen Sie, ob Sie über die erforderlichen Berechtigungen zum Herunterladen von Dateien aus dem Stagingbereich verfügen.
Verwenden Sie Session.file, um auf das FileOperation-Objekt für die Sitzung zuzugreifen.
Rufen Sie die Methode FileOperation.get auf, um die Dateien aus einem Stagingbereich herunterzuladen.
Diese Methode führt den SQL-Befehl GET aus.
Um alle optionalen Parameter für den GET-Befehl anzugeben, erstellen Sie eine
Map
der Parameter und Werte und übergeben dieMap
alsoptions
-Argument. Beispiel:// Download files with names that match a regular expression pattern. val getOptions = Map("PATTERN" -> s"'.*file_.*.csv.gz'") val getResults = session.file.get("@myStage", "file:///tmp", getOptions)
Überprüfen Sie das
Array
von GetResult-Objekten, die von derget
-Methode zurückgegeben werden, um festzustellen, ob die Dateien erfolgreich heruntergeladen wurden. Geben Sie zum Beispiel den Dateinamen und den Status der GET-Operation für diese Datei aus:// Print the filename and the status of the GET operation. getResults.foreach(r => println(s" ${r.fileName}: ${r.status}"))
Verwenden von Eingabestreams zum Hoch- und Herunterladen von Daten eines Stagingbereichs¶
Bemerkung
Dieses Feature wurde in Snowpark 1.4.0 eingeführt.
Um Eingabestreams zum Hochladen von Daten in eine Stagingdatei und zum Herunterladen von Daten aus einer Stagingdatei zu verwenden, verwenden Sie die Methoden uploadStream
und downloadStream
des Objekts FileOperation:
Verwenden eines Eingabestreams zum Hochladen von Daten in eine Datei in einem Stagingbereich
Verwenden eines Eingabestreams zum Herunterladen von Daten aus einer Datei in einem Stagingbereich
Verwenden eines Eingabestreams zum Hochladen von Daten in eine Datei in einem Stagingbereich¶
So laden Sie die Daten aus einem java.io.InputStream-Objekt in eine Datei in einem Stagingbereich hoch:
Überprüfen Sie, ob Sie über die erforderlichen Berechtigungen zum Hochladen von Dateien in den Stagingbereich verfügen.
Verwenden Sie Session.file, um auf das FileOperation-Objekt für die Sitzung zuzugreifen.
Rufen Sie die Methode FileOperation.uploadStream auf.
Übergeben Sie den vollständigen Pfad zu der Datei im Stagingbereich, in die die Daten geschrieben werden sollen, sowie das Objekt
InputStream
. Verwenden Sie außerdem das Argumentcompress
, um anzugeben, ob die Daten vor dem Hochladen komprimiert werden sollen oder nicht.
Beispiel:
import java.io.InputStream
...
val compressData = true
val pathToFileOnStage = "@myStage/path/file"
session.file.uploadStream(pathToFileOnStage, new ByteArrayInputStream(fileContent.getBytes()), compressData)
Verwenden eines Eingabestreams zum Herunterladen von Daten aus einer Datei in einem Stagingbereich¶
So laden Sie die Daten aus einer Datei in einem Stagingbereich in ein java.io.InputStream-Objekt herunter:
Überprüfen Sie, ob Sie über die erforderlichen Berechtigungen zum Herunterladen von Dateien aus dem Stagingbereich verfügen.
Verwenden Sie Session.file, um auf das FileOperation-Objekt für die Sitzung zuzugreifen.
Rufen Sie die Methode FileOperationdownloadStream auf.
Übergeben Sie den vollständigen Pfad zu der Datei im Stagingbereich, die die herunterzuladenden Daten enthält. Verwenden Sie das Argument
decompress
, um anzugeben, ob die Daten in der Datei komprimiert sind oder nicht.
Beispiel:
import java.io.InputStream
...
val isDataCompressed = true
val pathToFileOnStage = "@myStage/path/file"
val is = session.file.downloadStream(pathToFileOnStage, isDataCompressed)
Einrichten eines DataFrame für Dateien in einem Stagingbereich¶
In diesem Abschnitt wird erläutert, wie Sie ein DataFrame-Objekt für Dateien einrichten, die sich in einem Snowflake-Stagingbereich befinden. Sobald Sie den DataFrame erstellt haben, können Sie den DataFrame für Folgendes verwenden:
Um einen DataFrame für eine Datei einzurichten, die sich ein einem Snowflake-Stagingbereich befindet, verwenden Sie die Klasse DataFrameReader
:
Überprüfen Sie, ob Sie über die folgenden Berechtigungen verfügen:
Berechtigungen für den Zugriff auf Dateien in dem Stagingbereich.
Eine der folgenden Optionen:
CREATE TABLE-Berechtigungen für das Schema, wenn Sie Kopieroptionen angeben möchten, die bestimmen, wie die Daten aus den Stagingdateien kopiert werden.
Andernfalls CREATE FILE FORMAT-Berechtigungen für das Schema.
Rufen Sie die Methode
read
derSession
-Klasse auf, um auf einDataFrameReader
-Objekt zuzugreifen.Wenn die Dateien im CSV-Format sind, beschreiben Sie die Felder in der Datei. Gehen Sie dabei wie folgt vor:
Erstellen Sie ein StructType-Objekt, das aus einer Sequenz von StructField-Objekten besteht, die die Felder in der Datei beschreiben.
Geben Sie für jedes
StructField
-Objekt Folgendes an:Name des Feldes
Datentyp des Feldes (angegeben als Objekt im
com.snowflake.snowpark.types
-Paket)Ob das Feld nullwertfähig ist oder nicht
Beispiel:
import com.snowflake.snowpark.types._ val schemaForDataFile = StructType( Seq( StructField("id", StringType, true), StructField("name", StringType, true)))
Rufen Sie die Methode
schema
im ObjektDataFrameReader
-Objekt auf, und übergeben Sie dabei dasStructType
-Objekt.Beispiel:
var dfReader = session.read.schema(schemaForDataFile)
Die Methode
schema
gibt einDataFrameReader
-Objekt zurück, das so konfiguriert ist, dass es Dateien liest, die die angegebenen Felder enthalten.Beachten Sie, dass Sie dies für Dateien in anderen Formaten (z. B. JSON) nicht tun müssen. Bei solchen Dateien behandelt der
DataFrameReader
die Daten als ein einzelnes Feld vom Typ VARIANT mit dem Feldnamen$1
.
Wenn Sie zusätzliche Informationen darüber angeben müssen, wie die Daten gelesen werden sollen (z. B. dass die Daten komprimiert sind oder dass eine CSV-Datei ein Semikolon statt eines Kommas zum Begrenzen von Feldern verwendet), rufen Sie die Methode DataFrameReader.option oder DataFrameReader.options auf.
Übergeben Sie den Namen und den Wert der Option, die Sie einstellen möchten. Sie können die folgenden Typen von Optionen einstellen:
Dateiformatoptionen, die in der CREATE FILE FORMAT-Dokumentation beschrieben sind
Kopieroptionen, die in der COPY INTO TABLE-Dokumentation beschrieben sind
Beachten Sie, dass die Einstellung von Kopieroptionen zu einer teureren Ausführungsstrategie führen kann, wenn Sie die Daten in den DataFrame abrufen.
Im folgenden Beispiel wird das
DataFrameReader
-Objekt für die Abfrage von Daten einer CSV-Datei eingerichtet, die nicht komprimiert ist und die ein Semikolon als Feldbegrenzer verwendet.dfReader = dfReader.option("field_delimiter", ";").option("COMPRESSION", "NONE")
Die Methode
option
gibt einDataFrameReader
-Objekt zurück, das mit den angegebenen Optionen konfiguriert ist.Um mehrere Optionen festzulegen, können Sie entweder verkettete Aufrufe mit der
option
-Methode verwenden (wie im obigen Beispiel gezeigt) oder die DataFrameReader.options-Methode aufrufen, wobei Sie einMap
-Objekt der Namen und Werte der Optionen übergeben.Rufen Sie die Methode auf, die dem Format der Dateien entspricht. Sie können eine der folgenden Methoden aufrufen:
Beim Aufrufen dieser Methoden übergeben Sie den Speicherort des Stagingbereichs, in dem sich die zu lesenden Dateien befinden.
val df = dfReader.csv("@s3_ts_stage/emails/data_0_0_0.csv")
Wenn Sie mehrere Dateien angeben möchten, die mit demselben Präfix beginnen, geben Sie das Präfix nach dem Namen des Stagingbereichs an. So laden Sie beispielsweise Dateien mit dem Präfix
csv_
aus dem Stagingbereich@mystage
:val df = dfReader.csv("@mystage/csv_")
Die Methoden, die dem Format einer Datei entsprechen, geben ein CopyableDataFrame-Objekt für diese Datei zurück.
CopyableDataFrame
erweitertDataFrame
und bietet zusätzliche Methoden für die Verarbeitung der Daten in den Stagingdateien.Rufen Sie eine Aktionsmethode auf, um Folgendes auszuführen:
Wie bei DataFrames für Tabellen werden die Daten erst dann in den DataFrame abgerufen, wenn Sie eine Aktionsmethode aufrufen.
Laden von Daten aus Dateien in einen DataFrame¶
Nachdem Sie einen DataFrame für Dateien in einem Stagingbereich eingerichtet haben, können Sie Daten aus den Dateien in den DataFrame laden:
Verwenden Sie die DataFrame-Objektmethoden, um alle Transformationen auszuführen, die für das Dataset benötigt werden (Auswahl bestimmter Felder, Filtern von Zeilen usw.).
Im folgenden Beispiel wird das Element
color
aus einer JSON-Datei namensdata.json
extrahiert, die sich im Stagingbereichmystage
befindet:val df = session.read.json("@mystage/data.json").select(col("$1")("color"))
Wie bereits erläutert, werden Daten in Dateien, die nicht im CSV-Format sind (z. B. JSON), von
DataFrameReader
als eine einzelne VARIANT-Spalte mit dem Namen$1
behandelt.Rufen Sie die Methode
DataFrame.collect
auf, um die Daten zu laden. Beispiel:val results = df.collect()
Kopieren von Daten aus Dateien in eine Tabelle¶
Nachdem das Einrichten eines DataFrame für Dateien in einem Stagingbereich abgeschlossen ist, können Sie die Methode CopyableDataFrame.copyInto aufrufen, um die Daten in eine Tabelle zu kopieren. Diese Methode führt den Befehl COPY INTO <Tabelle> aus.
Bemerkung
Sie dürfen die collect
-Methode erst aufrufen, nachdem Sie copyInto
aufgerufen haben. Die Daten aus der Datei dürfen sich erst im DataFrame befinden, nachdem Sie copyInto
aufgerufen haben.
Mit dem folgenden Code werden beispielsweise Daten aus der durch myFileStage
angegebenen CSV-Datei in die Tabelle mytable
geladen. Da sich die Daten in einer CSV-Datei befinden, muss der Code auch die Felder in der Datei beschreiben. Im Beispiel wird dazu die Methode DataFrameReader.schema aufgerufen, und es wird ein StructType-Objekt (csvFileSchema
) mit einer Sequenz von StructField-Objekten übergeben, die die Felder beschreiben.
val df = session.read.schema(csvFileSchema).csv(myFileStage)
df.copyInto("mytable")
Speichern eines DataFrame in Dateien in einem Stagingbereich¶
Bemerkung
Dieses Feature wurde in Snowpark 1.5.0 eingeführt.
Wenn Sie einen DataFrame in Dateien in einem Stagingbereich speichern müssen, können Sie die DataFrameWriter-Methode aufrufen, die dem Format der Datei entspricht (z. B. die csv
-Methode, um in eine CSV-Datei zu schreiben), und dabei den Speichertort im Stagingbereich angeben, an dem die Dateien gespeichert werden sollen. Diese DataFrameWriter
-Methoden führen den Befehl COPY INTO <Speicherort> aus.
Bemerkung
Sie dürfen die collect
-Methode erst aufrufen, nachdem Sie DataFrameWriter
aufgerufen haben. Die Daten aus der Datei dürfen sich erst im DataFrame befinden, nachdem Sie diese Methoden aufgerufen haben.
So speichern Sie den Inhalt eines DataFrame in Dateien in einem Stagingbereich:
Rufen Sie die Methode DataFrame.write auf, um ein DataFrameWriter-Objekt zu erhalten. So erhalten Sie beispielsweise das
DataFrameWriter
-Objekt für einen DataFrame, der die Tabelle mit dem Namensample_product_data
repräsentiert:dfWriter = session.table("sample_product_data").write
Wenn Sie den Inhalt der Datei überschreiben möchten (falls die Datei existiert), rufen Sie die Methode DataFrameWriter.mode auf und übergeben
SaveMode.Overwrite
.Andernfalls meldet der
DataFrameWriter
-Objekt standardmäßig einen Fehler, wenn die angegebene Datei im Stagingbereich bereits vorhanden ist.Diese
mode
-Methode gibt das gleicheDataFrameWriter
-Objekt zurück, das mit dem angegebenen Modus konfiguriert wurde.So geben Sie beispielsweise an, dass
DataFrameWriter
die im Stagingbereich befindliche Datei überschreiben soll:dfWriter = dfWriter.mode(SaveMode.Overwrite)
Wenn Sie zusätzliche Informationen darüber angeben müssen, wie die Daten gespeichert werden sollen (z. B. dass die Daten komprimiert werden sollen oder dass Sie ein Semikolon zur Abgrenzung von Feldern in einer CSV-Datei verwenden möchten), dann rufen Sie die Methode DataFrameWriter.option oder DataFrameWriter.options auf.
Übergeben Sie den Namen und den Wert der Option, die Sie einstellen möchten. Sie können die folgenden Typen von Optionen einstellen:
Dateiformatoptionen, die in der Dokumentation zu COPY INTO <Speicherort> beschrieben sind.
Kopieroptionen, die in der Dokumentation zu COPY INTO <Speicherort> beschrieben sind.
Beachten Sie, dass Sie die Methode
option
nicht zum Einstellen der folgenden Optionen verwenden können:Die Option für den Formattyp TYPE.
Die Kopieroption OVERWRITE. Um diese Option festzulegen, rufen Sie stattdessen die Methode
mode
auf (wie im vorherigen Schritt erwähnt).
Im folgenden Beispiel wird das
DataFrameWriter
-Objekt so eingerichtet, dass es Daten in einer CSV-Datei in unkomprimierter Form speichert, wobei ein Semikolon (statt eines Kommas) als Trennzeichen verwendet wird.dfWriter = dfWriter.option("field_delimiter", ";").option("COMPRESSION", "NONE")
Die Methode
option
gibt einDataFrameWriter
-Objekt zurück, das mit den angegebenen Optionen konfiguriert ist.Um mehrere Optionen zu setzen, können Sie verkettete Aufrufe mit der Methode
option
verwenden (wie im obigen Beispiel gezeigt) oder die Methode DataFrameWriter.options aufrufen und dabei einMap
-Objekt der Namen und Werte der Optionen übergeben.Um Details über jede gespeicherte Datei zu erhalten, setzen Sie die Kopieroption
DETAILED_OUTPUT
aufTRUE
.Standardmäßig hat
DETAILED_OUTPUT
den WertFALSE
, was bedeutet, dass die Methode eine einzelne Ausgabezeile mit den Feldern"rows_unloaded"
,"input_bytes"
und"output_bytes"
zurückgibt.Wenn Sie
DETAILED_OUTPUT
aufTRUE
setzen, gibt die Methode für jede gespeicherte Datei eine Zeile der Ausgabe zurück. Jede Zeile enthält die FelderFILE_NAME
,FILE_SIZE
undROW_COUNT
.Rufen Sie die Methode auf, die dem Format der Datei entspricht, um die Daten in der Datei zu speichern. Sie können eine der folgenden Methoden aufrufen:
Beim Aufrufen dieser Methoden übergeben Sie den Speicherort des Stagingbereichs der Datei, in die die Daten geschrieben werden sollen (z. B.
@mystage
).Standardmäßig speichert die Methode die Daten in Dateinamen mit dem Präfix
data_
(z. B.@mystage/data_0_0_0.csv
). Wenn Sie möchten, dass die Dateien mit einem anderen Präfix benannt werden, geben Sie das Präfix nach dem Stagingbereichsnamen an. Beispiel:val writeFileResult = dfWriter.csv("@mystage/saved_data")
In diesem Beispiel wird der Inhalt des DataFrame-Objekts in Dateien gespeichert, die mit dem Präfix
saved_data
beginnen (z. B.@mystage/saved_data_0_0_0.csv
).Überprüfen Sie das zurückgegebene WriteFileResult-Objekt auf Informationen zu der in die Datei geschriebene Datenmenge.
Über das
WriteFileResult
-Objekt können Sie auf die vom Befehl COPY INTO <Speicherort> erzeugte Ausgabe zugreifen:Um auf die Zeilen der Ausgabe als Array von Row-Objekten zuzugreifen, rufen Sie die Methode
rows
auf.Um festzustellen, welche Felder in den Zeilen vorhanden sind, verwenden Sie das
schema
-Wertelement, das ein StructType-Objekt ist, das die Felder in der Zeile beschreibt.
So drucken Sie beispielsweise die Namen der Felder und Werte in den Ausgabezeilen aus:
val writeFileResult = dfWriter.csv("@mystage/saved_data") for ((row, index) <- writeFileResult.rows.zipWithIndex) { (writeFileResult.schema.fields, writeFileResult.rows(index).toSeq).zipped.foreach { (structField, element) => println(s"${structField.name}: $element") } }
Im folgende Beispiel wird ein DataFrame-Objekt verwendet, um den Inhalt der Tabelle mit dem Namen car_sales
in JSON-Dateien mit dem Präfix saved_data
im Stagingbereich @mystage
(z. B. @mystage/saved_data_0_0_0.json
) zu speichern. Beispielcode:
Überschreibt die Datei, wenn die Datei bereits im Stagingbereich vorhanden ist.
Gibt eine detaillierte Ausgabe zur Speicheroperation zurück.
Speichert die Daten unkomprimiert.
Abschließend druckt der Beispielcode jedes Feld und jeden Wert in den zurückgegebenen Ausgabezeilen aus:
val df = session.table("car_sales")
val writeFileResult = df.write.mode(SaveMode.Overwrite).option("DETAILED_OUTPUT", "TRUE").option("compression", "none").json("@mystage/saved_data")
for ((row, index) <- writeFileResult.rows.zipWithIndex) {
println(s"Row: $index")
(writeFileResult.schema.fields, writeFileResult.rows(index).toSeq).zipped.foreach {
(structField, element) => println(s"${structField.name}: $element")
}
}
Verwenden von semistrukturierten Daten¶
Mit einem DataFrame können Sie semistrukturierte Daten (z. B. JSON-Daten) abfragen und darauf zugreifen. In den nächsten Abschnitten wird erläutert, wie semistrukturierte Daten in einem DataFrame verwendet werden:
Bemerkung
Die Beispiele in diesen Abschnitten verwenden die Beispieldaten aus In Beispielen verwendete Beispieldaten.
Durchsuchen semistrukturierter Daten¶
Um auf ein bestimmtes Feld oder Element in semistrukturierten Daten zu verweisen, verwenden Sie die folgenden Methoden des Column-Objekts:
Verwenden Sie Column.apply(„<Feldname>“), um ein
Column
-Objekt für ein Feld in einem OBJECT (oder ein VARIANT, das ein OBJECT enthält) zurückzugeben.Verwenden Sie Column.apply(<Index>), um ein
Column
-Objekt für ein Element in einem ARRAY (oder ein VARIANT, das ein ARRAY enthält) zurückzugeben.
Bemerkung
Wenn der Feldname oder die Elemente im Pfad unregelmäßig sind und die Verwendung der Column.apply
-Methoden erschweren, können Sie als Alternative die Funktionen get, get_ignore_case oder get_path verwenden.
Wie unter Verwenden der apply-Methode zum Verweisen auf eine Spalte erwähnt, können Sie den Methodennamen apply
weglassen:
col("column_name")("field_name")
col("column_name")(index)
Im folgenden Code wird beispielsweise das Feld dealership
in Objekten der Spalte src
der Beispieldaten ausgewählt:
val df = session.table("car_sales")
df.select(col("src")("dealership")).show()
Der Code gibt Folgendes aus:
----------------------------
|"""SRC""['DEALERSHIP']" |
----------------------------
|"Valley View Auto Sales" |
|"Tindel Toyota" |
----------------------------
Bemerkung
Die Werte im DataFrame sind in doppelte Anführungszeichen eingeschlossen, da diese Werte als Zeichenfolgenliterale zurückgegeben werden. Weitere Informationen zum Umwandeln dieser Werte in einen bestimmten Typ finden Sie unter Explizites Umwandeln von Werten semistrukturierter Daten.
Sie können auch Methodenaufrufe verketten, um einen Pfad zu einem bestimmten Feld oder Element zu durchlaufen.
Im folgende Code wird zum Beispiel das Feld name
im Objekt salesperson
ausgewählt:
val df = session.table("car_sales")
df.select(col("src")("salesperson")("name")).show()
Der Code gibt Folgendes aus:
------------------------------------
|"""SRC""['SALESPERSON']['NAME']" |
------------------------------------
|"Frank Beasley" |
|"Greg Northrup" |
------------------------------------
In einem weiteren Codebeispiel wird das erste Element des Feldes vehicle
ausgewählt, das eine Reihe von Fahrzeugen enthält. Im Beispiel wird auch das Feld price
aus dem ersten Element ausgewählt.
val df = session.table("car_sales")
df.select(col("src")("vehicle")(0)).show()
df.select(col("src")("vehicle")(0)("price")).show()
Der Code gibt Folgendes aus:
---------------------------
|"""SRC""['VEHICLE'][0]" |
---------------------------
|{ |
| "extras": [ |
| "ext warranty", |
| "paint protection" |
| ], |
| "make": "Honda", |
| "model": "Civic", |
| "price": "20275", |
| "year": "2017" |
|} |
|{ |
| "extras": [ |
| "ext warranty", |
| "rust proofing", |
| "fabric protection" |
| ], |
| "make": "Toyota", |
| "model": "Camry", |
| "price": "23500", |
| "year": "2017" |
|} |
---------------------------
------------------------------------
|"""SRC""['VEHICLE'][0]['PRICE']" |
------------------------------------
|"20275" |
|"23500" |
------------------------------------
Als Alternative zur Methode apply
können Sie die Funktionen get, get_ignore_case oder get_path verwenden, wenn der Feldname oder die Elemente im Pfad unregelmäßig sind und die Verwendung der Column.apply
-Methoden erschweren.
Die folgenden Codezeilen geben zum Beispiel beide den Wert eines bestimmten Feldes in einem Objekt aus:
df.select(get(col("src"), lit("dealership"))).show()
df.select(col("src")("dealership")).show()
In ähnlicher Weise geben die folgenden Codezeilen beide den Wert eines Feldes für einen bestimmten Pfad in einem Objekt aus:
df.select(get_path(col("src"), lit("vehicle[0].make"))).show()
df.select(col("src")("vehicle")(0)("make")).show()
Explizites Umwandeln von Werten semistrukturierter Daten¶
Standardmäßig werden die Werte von Feldern und Elementen als Zeichenfolgenliterale (einschließlich der doppelten Anführungszeichen) zurückgegeben, wie in den obigen Beispielen gezeigt.
Zur Vermeidung unerwarteter Ergebnisse rufen Sie die Methode cast auf, um den Wert in einen ganz bestimmten Typ umzuwandeln. Mit dem folgenden Code werden zum Beispiel die Werte ohne und mit Umwandlung ausgegeben:
// Import the objects for the data types, including StringType.
import com.snowflake.snowpark.types._
...
val df = session.table("car_sales")
df.select(col("src")("salesperson")("id")).show()
df.select(col("src")("salesperson")("id").cast(StringType)).show()
Der Code gibt Folgendes aus:
----------------------------------
|"""SRC""['SALESPERSON']['ID']" |
----------------------------------
|"55" |
|"274" |
----------------------------------
---------------------------------------------------
|"CAST (""SRC""['SALESPERSON']['ID'] AS STRING)" |
---------------------------------------------------
|55 |
|274 |
---------------------------------------------------
Vereinfachen eines Arrays von Objekten zu Zeilen¶
Wenn Sie semistrukturierte Daten für einen DataFrame vereinfachen müssen (um z. B. für jedes Objekt in einem Array eine Zeile zu erstellen), rufen Sie die Methode DataFrame.flatten auf. Diese Methode entspricht der SQL-Funktion FLATTEN. Wenn Sie einen Pfad zu einem Objekt oder Array übergeben, gibt die Methode einen DataFrame zurück, der eine Zeile für jedes Feld oder Element in dem Objekt oder Array enthält.
In den Beispieldaten ist beispielsweise src:customer
ein Array von Objekten, die Informationen über einen Kunden enthalten. Jedes Objekt enthält die Felder name
und address
.
Wenn Sie diesen Pfad an die Funktion flatten
übergeben:
val df = session.table("car_sales")
df.flatten(col("src")("customer")).show()
Dann gibt diese Methode einen DataFrame zurück:
----------------------------------------------------------------------------------------------------------------------------------------------------------
|"SRC" |"SEQ" |"KEY" |"PATH" |"INDEX" |"VALUE" |"THIS" |
----------------------------------------------------------------------------------------------------------------------------------------------------------
|{ |1 |NULL |[0] |0 |{ |[ |
| "customer": [ | | | | | "address": "San Francisco, CA", | { |
| { | | | | | "name": "Joyce Ridgely", | "address": "San Francisco, CA", |
| "address": "San Francisco, CA", | | | | | "phone": "16504378889" | "name": "Joyce Ridgely", |
| "name": "Joyce Ridgely", | | | | |} | "phone": "16504378889" |
| "phone": "16504378889" | | | | | | } |
| } | | | | | |] |
| ], | | | | | | |
| "date": "2017-04-28", | | | | | | |
| "dealership": "Valley View Auto Sales", | | | | | | |
| "salesperson": { | | | | | | |
| "id": "55", | | | | | | |
| "name": "Frank Beasley" | | | | | | |
| }, | | | | | | |
| "vehicle": [ | | | | | | |
| { | | | | | | |
| "extras": [ | | | | | | |
| "ext warranty", | | | | | | |
| "paint protection" | | | | | | |
| ], | | | | | | |
| "make": "Honda", | | | | | | |
| "model": "Civic", | | | | | | |
| "price": "20275", | | | | | | |
| "year": "2017" | | | | | | |
| } | | | | | | |
| ] | | | | | | |
|} | | | | | | |
|{ |2 |NULL |[0] |0 |{ |[ |
| "customer": [ | | | | | "address": "New York, NY", | { |
| { | | | | | "name": "Bradley Greenbloom", | "address": "New York, NY", |
| "address": "New York, NY", | | | | | "phone": "12127593751" | "name": "Bradley Greenbloom", |
| "name": "Bradley Greenbloom", | | | | |} | "phone": "12127593751" |
| "phone": "12127593751" | | | | | | } |
| } | | | | | |] |
| ], | | | | | | |
| "date": "2017-04-28", | | | | | | |
| "dealership": "Tindel Toyota", | | | | | | |
| "salesperson": { | | | | | | |
| "id": "274", | | | | | | |
| "name": "Greg Northrup" | | | | | | |
| }, | | | | | | |
| "vehicle": [ | | | | | | |
| { | | | | | | |
| "extras": [ | | | | | | |
| "ext warranty", | | | | | | |
| "rust proofing", | | | | | | |
| "fabric protection" | | | | | | |
| ], | | | | | | |
| "make": "Toyota", | | | | | | |
| "model": "Camry", | | | | | | |
| "price": "23500", | | | | | | |
| "year": "2017" | | | | | | |
| } | | | | | | |
| ] | | | | | | |
|} | | | | | | |
----------------------------------------------------------------------------------------------------------------------------------------------------------
Von diesem DataFrame aus können Sie die Felder name
und address
jedes Objekts im Feld VALUE
auswählen:
df.flatten(col("src")("customer")).select(col("value")("name"), col("value")("address")).show()
-------------------------------------------------
|"""VALUE""['NAME']" |"""VALUE""['ADDRESS']" |
-------------------------------------------------
|"Joyce Ridgely" |"San Francisco, CA" |
|"Bradley Greenbloom" |"New York, NY" |
-------------------------------------------------
Der folgende Code ergänzt das vorherige Beispiel, indem die Werte in einen bestimmten Typ umgewandelt und die Namen der Spalten geändert werden:
df.flatten(col("src")("customer")).select(col("value")("name").cast(StringType).as("Customer Name"), col("value")("address").cast(StringType).as("Customer Address")).show()
-------------------------------------------
|"Customer Name" |"Customer Address" |
-------------------------------------------
|Joyce Ridgely |San Francisco, CA |
|Bradley Greenbloom |New York, NY |
-------------------------------------------
Ausführen von SQL-Anweisungen¶
Um eine von Ihnen angegebene SQL-Anweisung auszuführen, rufen Sie die Methode sql
in der Klasse Session
auf und übergeben die auszuführende Anweisung. Diese Methode gibt einen DataFrame zurück.
Beachten Sie, dass die SQL-Anweisung erst ausgeführt wird, wenn Sie eine Aktionsmethode aufrufen.
// Get the list of the files in a stage.
// The collect() method causes this SQL statement to be executed.
val dfStageFiles = session.sql("ls @myStage")
val files = dfStageFiles.collect()
files.foreach(println)
// Resume the operation of a warehouse.
// Note that you must call the collect method in order to execute
// the SQL statement.
session.sql("alter warehouse if exists myWarehouse resume if suspended").collect()
val tableDf = session.table("table").select(col("a"), col("b"))
// Get the count of rows from the table.
val numRows = tableDf.count()
println("Count: " + numRows);
Wenn Sie Methoden zur Transformation des DataFrame aufrufen möchten (z. B. Filtern, Auswahl usw.), beachten Sie, dass diese Methoden nur funktionieren, wenn die zugrunde liegende SQL-Anweisung eine SELECT-Anweisung ist. Die Transformationsmethoden werden für andere Arten von SQL-Anweisungen nicht unterstützt.
val df = session.sql("select id, category_id, name from sample_product_data where id > 10")
// Because the underlying SQL statement for the DataFrame is a SELECT statement,
// you can call the filter method to transform this DataFrame.
val results = df.filter(col("category_id") < 10).select(col("id")).collect()
results.foreach(println)
// In this example, the underlying SQL statement is not a SELECT statement.
val dfStageFiles = session.sql("ls @myStage")
// Calling the filter method results in an error.
dfStageFiles.filter(...)