Verwenden von DataFrames in Snowpark Java¶
In Snowpark erfolgt die Abfrage und Verarbeitung von Daten hauptsächlich über einen DataFrame. Unter diesem Thema wird erklärt, wie Sie DataFrames verwenden.
Unter diesem Thema:
Verwenden Sie zum Abrufen und Ändern von Daten die Klasse DataFrame. Ein DataFrame repräsentiert ein relationales Dataset, das im Lazy-Modus ausgewertet wird: Es wird nur ausgeführt, wenn eine bestimmte Aktion ausgelöst wird. In gewissem Sinne ist ein DataFrame wie eine Abfrage, die ausgewertet werden muss, um Daten abzurufen.
So rufen Sie Daten in einem DataFrame ab:
Konstruieren Sie ein DataFrame, das die Datenquelle für das Dataset angibt.
Sie können z. B. ein DataFrame erstellen, das Daten aus einer Tabelle, Daten einer externen CSV-Datei oder die Ausführung einer SQL-Anweisung enthält.
Geben Sie an, wie das Dataset in den DataFrame transformiert werden soll.
Sie können z. B. angeben, welche Spalten ausgewählt werden sollen, wie die Zeilen gefiltert werden sollen, wie die Ergebnisse sortiert und gruppiert werden sollen usw.
Führen Sie die Anweisung aus, um die Daten in den DataFrame abzurufen.
Um die Daten in den DataFrame abzurufen, müssen Sie eine Methode aufrufen, die eine Aktion ausführt (z. B. die
collect()
-Methode).
In den nächsten Abschnitten werden diese Schritte näher erläutert.
Einrichten der Beispiele für diesen Abschnitt¶
Einige der Beispiele in diesem Abschnitt verwenden einen DataFrame, um eine Tabelle namens sample_product_data
abzufragen. Wenn Sie diese Beispiele ausführen möchten, können Sie diese Tabelle erstellen und mit einigen Daten füllen, indem Sie die folgenden SQL-Anweisungen ausführen:
CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT, amount NUMBER(12, 2), quantity INT, product_date DATE);
INSERT INTO sample_product_data VALUES
(1, 0, 5, 'Product 1', 'prod-1', 1, 10, 1.00, 15, TO_DATE('2021.01.01', 'YYYY.MM.DD')),
(2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20, 2.00, 30, TO_DATE('2021.02.01', 'YYYY.MM.DD')),
(3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30, 3.00, 45, TO_DATE('2021.03.01', 'YYYY.MM.DD')),
(4, 0, 10, 'Product 2', 'prod-2', 2, 40, 4.00, 60, TO_DATE('2021.04.01', 'YYYY.MM.DD')),
(5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50, 5.00, 75, TO_DATE('2021.05.01', 'YYYY.MM.DD')),
(6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60, 6.00, 90, TO_DATE('2021.06.01', 'YYYY.MM.DD')),
(7, 0, 20, 'Product 3', 'prod-3', 3, 70, 7.00, 105, TO_DATE('2021.07.01', 'YYYY.MM.DD')),
(8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80, 7.25, 120, TO_DATE('2021.08.01', 'YYYY.MM.DD')),
(9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90, 7.50, 135, TO_DATE('2021.09.01', 'YYYY.MM.DD')),
(10, 0, 50, 'Product 4', 'prod-4', 4, 100, 7.75, 150, TO_DATE('2021.10.01', 'YYYY.MM.DD')),
(11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100, 8.00, 165, TO_DATE('2021.11.01', 'YYYY.MM.DD')),
(12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100, 8.50, 180, TO_DATE('2021.12.01', 'YYYY.MM.DD'));
Um zu überprüfen, ob die Tabelle erstellt wurde, führen Sie folgende Anweisung aus:
SELECT * FROM sample_product_data;
Erstellen eines DataFrame¶
Um ein DataFrame zu erstellen, können Sie Methoden der Session
-Klasse verwenden. Jede der folgenden Methoden erstellt einen DataFrame aus einem anderen Typ von Datenquelle:
Um einen DataFrame aus Daten einer Tabelle, einer Ansicht oder eines Streams zu erstellen, rufen Sie die
table
-Methode auf:// Create a DataFrame from the data in the "sample_product_data" table. DataFrame dfTable = session.table("sample_product_data"); // Print out the first 10 rows. dfTable.show();
Bemerkung
Die Methode
table
gibt einUpdatable
-Objekt zurück.Updatable
erweitertDataFrame
und bietet zusätzliche Methoden für die Verwendung der Daten in der Tabelle (z. B. Methoden zum Aktualisieren und Löschen von Daten). Siehe Aktualisieren, Löschen und Zusammenführen von Zeilen in einer Tabelle.So erstellen Sie einen DataFrame aus angegebenen Werten:
Erstellen Sie ein Array aus
Row
-Objekten, die die Werte enthalten.Erstellen Sie ein
StructType
-Objekt, das die Datentypen dieser Werte beschreibt.Rufen Sie die Methode
createDataFrame
auf, und übergeben Sie das Array und dasStructType
-Objekt.
// Import name from the types package, which contains StructType and StructField. import com.snowflake.snowpark_java.types.*; ... // Create a DataFrame containing specified values. Row[] data = {Row.create(1, "a"), Row.create(2, "b")}; StructType schema = StructType.create( new StructField("num", DataTypes.IntegerType), new StructField("str", DataTypes.StringType)); DataFrame df = session.createDataFrame(data, schema); // Print the contents of the DataFrame. df.show();
Bemerkung
Von Snowflake reservierte Wörter können bei der Konstruktion eines DataFrame nicht als Spaltennamen verwendet werden. Eine Liste der reservierten Wörter finden Sie unter Reservierte und begrenzte Schlüsselwörter.
Um einen DataFrame zu erstellen, der einen Wertebereich enthält, rufen Sie die
range
-Methode auf:// Create a DataFrame from a range DataFrame dfRange = session.range(1, 10, 2); // Print the contents of the DataFrame. dfRange.show();
Um einen DataFrame zu erstellen, der die Daten aus einer in einem Stagingbereich befindlichen Datei enthält, verwenden Sie
read
zum Abrufen einesDataFrameReader
-Objekts. ImDataFrameReader
-Objekt rufen Sie die Methode auf, die dem Format der Daten in der Datei entspricht:// Create a DataFrame from data in a stage. DataFrame dfJson = session.read().json("@mystage2/data1.json"); // Print the contents of the DataFrame. dfJson.show();
Um einen DataFrame zu erstellen, der die Ergebnisse einer SQL-Abfrage enthält, rufen Sie die
sql
-Methode auf:// Create a DataFrame from a SQL query DataFrame dfSql = session.sql("SELECT name from sample_product_data"); // Print the contents of the DataFrame. dfSql.show();
Hinweis: Obwohl Sie mit dieser Methode SELECT-Anweisungen ausführen können, die Daten aus Tabellen und Stagingdateien abrufen, sollten Sie dafür eher die Methoden
table
undread
verwenden. Methoden wietable
undread
sorgen in Entwicklungstools für eine bessere Hervorhebung der Syntax und von Fehlern sowie eine intelligente Codevervollständigung.
Festlegen der Transformation des Datasets¶
Um festzulegen, welche Spalten ausgewählt und wie die Ergebnisse gefiltert, sortiert, gruppiert usw. werden sollen, rufen Sie die DataFrame-Methoden auf, die das Dataset transformieren. Um die Spalten in diesen Methoden zu identifizieren, verwenden Sie die statische Methode Functions.col
oder einen Ausdruck, der eine Spalte ergibt (siehe Angeben von Spalten und Ausdrücken).
Beispiel:
Zur Angabe der Zeilen, die zurückgegeben werden sollen, rufen Sie die
filter
-Methode auf:// Create a DataFrame for the rows with the ID 1 // in the "sample_product_data" table. DataFrame df = session.table("sample_product_data").filter( Functions.col("id").equal_to(Functions.lit(1))); df.show();
Zur Angabe der Spalten, die ausgewählt werden sollen, rufen Sie die
select
-Methode auf:// Create a DataFrame that contains the id, name, and serial_number // columns in te "sample_product_data" table. DataFrame df = session.table("sample_product_data").select( Functions.col("id"), Functions.col("name"), Functions.col("serial_number")); df.show();
Jede Methode gibt ein neues DataFrame-Objekt zurück, das transformiert wurde. (Die Methode hat keine Auswirkungen auf das ursprüngliche DataFrame-Objekt.) Wenn Sie also mehrere Transformationen anwenden möchten, können Sie Methodenaufrufe verketten, sodass jede nachfolgende Transformationsmethode auf dem neuen DataFrame-Objekt ausgeführt wird, das vom vorherigen Methodenaufruf zurückgegeben wurde.
Beachten Sie, dass mit diesen Transformationsmethoden keine Daten aus der Snowflake-Datenbank abgerufen werden können. (Die unter Ausführen einer Aktion zum Auswerten eines DataFrame beschriebenen Aktionsmethoden führen den Datenabruf durch.) Die Transformationsmethoden geben einfach an, wie die SQL-Anweisung aufgebaut sein soll.
Angeben von Spalten und Ausdrücken¶
Wenn Sie diese Transformationsmethoden aufrufen, müssen Sie möglicherweise Spalten angeben oder Ausdrücke, die Spalten verwenden. Wenn Sie z. B. die select
-Methode aufrufen, müssen Sie die Spalten angeben, die ausgewählt werden sollen.
Um auf eine Spalte zu verweisen, erstellen Sie ein Column-Objekt durch Aufruf der statischen Methode Functions.col.
DataFrame dfProductInfo = session.table("sample_product_data").select(Functions.col("id"), Functions.col("name"));
dfProductInfo.show();
Bemerkung
Weitere Informationen zum Erstellen eines Column
-Objekts für ein Literal finden Sie unter Verwenden von Literalen als Spaltenobjekte.
Wenn Sie einen Filter, eine Projektion, eine Verknüpfungsbedingung usw. angeben müssen, können Sie Column
-Objekte in einem Ausdruck verwenden. Beispiel:
Sie können
Column
-Objekte mit der Methodefilter
verwenden, um eine Filterbedingung anzugeben:// Specify the equivalent of "WHERE id = 12" // in an SQL SELECT statement. DataFrame df = session.table("sample_product_data"); df.filter(Functions.col("id").equal_to(Functions.lit(12))).show();
// Specify the equivalent of "WHERE key + category_id < 10" // in an SQL SELECT statement. DataFrame df2 = session.table("sample_product_data"); df2.filter(Functions.col("key").plus(Functions.col("category_id")).lt(Functions.lit(10))).show();
Sie können
Column
-Objekte mit der Methodeselect
verwenden, um einen Alias zu definieren:// Specify the equivalent of "SELECT key * 10 AS c" // in an SQL SELECT statement. DataFrame df3 = session.table("sample_product_data"); df3.select(Functions.col("key").multiply(Functions.lit(10)).as("c")).show();
Sie können
Column
-Objekte mit der Methodejoin
verwenden, um eine Verknüpfungsbedingung zu definieren:// Specify the equivalent of "sample_a JOIN sample_b on sample_a.id_a = sample_b.id_a" // in an SQL SELECT statement. DataFrame dfLhs = session.table("sample_a"); DataFrame dfRhs = session.table("sample_b"); DataFrame dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a").equal_to(dfRhs.col("id_a"))); dfJoined.show();
Verweisen auf Spalten in verschiedenen DataFrames¶
Wenn Sie auf Spalten in zwei verschiedenen DataFrame-Objekten verweisen müssen, die denselben Namen haben (z. B. beim Verknüpfen von DataFrames über diese Spalte), können Sie die col
-Methode für jedes DataFrame-Objekt verwenden, um sich auf eine Spalte in diesem Objekt zu beziehen (z. B. df1.col("name")
und df2.col("name")
).
Im folgenden Beispiel wird gezeigt, wie Sie mit der col
-Methode auf eine Spalte in einem bestimmten DataFrame verweisen. Im Beispiel werden zwei DataFrame-Objekte verknüpft, die beide eine Spalte mit dem Namen value
haben. Im Beispiel wird die as
-Methode des Column
-Objekts verwendet, um die Namen der Spalten im neu erstellten DataFrame zu ändern.
// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.col method to refer to the columns used in the join.
DataFrame dfLhs = session.table("sample_a");
DataFrame dfRhs = session.table("sample_b");
DataFrame dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a").equal_to(dfRhs.col("id_a"))).select(dfLhs.col("value").as("L"), dfRhs.col("value").as("R"));
dfJoined.show();
Verwenden von doppelten Anführungszeichen um Objektbezeichner (Tabellennamen, Spaltennamen usw.)¶
Die Namen von Datenbanken, Schemas, Tabellen und Stagingbereichen, die Sie angeben, müssen den Snowflake-Anforderungen an Bezeichner entsprechen. Wenn Sie einen Namen angeben, geht Snowflake davon aus, dass der Name in Großbuchstaben geschrieben ist. Daher sind z. B. die folgenden Aufrufe gleichwertig:
// The following calls are equivalent:
df.select(Functions.col("id123"));
df.select(Functions.col("ID123"));
Wenn der Name nicht den Anforderungen für Bezeichner entspricht, müssen Sie den Namen in doppelte Anführungszeichen ("
) setzen. Verwenden Sie einen Backslash (\
) als Escape-Zeichen zum Umschließen von doppelten Anführungszeichen innerhalb von Scala-Zeichenfolgenliteralen. Der folgende Tabellenname beginnt z. B. nicht mit einem Buchstaben oder einem Unterstrich, sodass Sie den Namen in doppelte Anführungszeichen setzen müssen:
DataFrame df = session.table("\"10tablename\"");
Beachten Sie, dass Sie bei der Angabe des Namens einer Spalte keine doppelten Anführungszeichen um den Namen verwenden müssen. Die Snowpark-Bibliothek schließt Spaltennamen automatisch in doppelte Anführungszeichen ein, wenn der Name nicht den Bezeichneranforderungen entspricht:
// The following calls are equivalent:
df.select(Functions.col("3rdID"));
df.select(Functions.col("\"3rdID\""));
// The following calls are equivalent:
df.select(Functions.col("id with space"));
df.select(Functions.col("\"id with space\""));
Wenn Sie bereits doppelte Anführungszeichen um einen Spaltennamen hinzugefügt haben, fügt die Bibliothek keine weiteren doppelten Anführungszeichen um den Namen ein.
In einigen Fällen kann der Spaltenname doppelte Anführungszeichen enthalten:
describe table quoted;
+------------------------+ ...
| name | ...
|------------------------+ ...
| name_with_"air"_quotes | ...
| "column_name_quoted" | ...
+------------------------+ ...
Wie unter Anforderungen an Bezeichner erläutert, müssen Sie für jedes doppelte Anführungszeichen innerhalb eines in Anführungszeichen geschriebenen Bezeichners zwei doppelte Anführungszeichen verwenden (z. B. "name_with_""air""_quotes"
und """column_name_quoted"""
):
DataFrame dfTable = session.table("quoted");
dfTable.select("\"name_with_\"\"air\"\"_quotes\"");
dfTable.select("\"\"\"column_name_quoted\"\"\"");
Wenn ein Bezeichner in doppelte Anführungszeichen eingeschlossen ist (unabhängig davon, ob Sie die Anführungszeichen explizit hinzugefügt haben oder die Bibliothek die Anführungszeichen für Sie hinzugefügt hat), berücksichtigt Snowflake die Groß-/Kleinschreibung des Bezeichners:
// The following calls are NOT equivalent!
// The Snowpark library adds double quotes around the column name,
// which makes Snowflake treat the column name as case-sensitive.
df.select(Functions.col("id with space"));
df.select(Functions.col("ID WITH SPACE"));
Verwenden von Literalen als Spaltenobjekte¶
Um ein Literal in einer Methode zu verwenden, die ein Column
-Objekt übergibt, erstellen Sie ein Column
-Objekt für das Literal, indem Sie das Literal an die statische Methode lit
in der Functions
-Klasse übergeben. Beispiel:
// Show the first 10 rows in which category_id is greater than 5.
// Use `Functions.lit(5)` to create a Column object for the literal 5.
DataFrame df = session.table("sample_product_data");
df.filter(Functions.col("category_id").gt(Functions.lit(5))).show();
Wenn das Literal in Java ein Gleitkomma- oder Double-Wert ist (z. B. 0.05
wird standardmäßig als Double verarbeitet), generiert die Snowpark-Bibliothek SQL-Code, der den Wert implizit in den entsprechenden Snowpark-Datentyp umwandelt (z. B. 0.05::DOUBLE
). Dies kann zu einem Näherungswert führen, der von der genau angegebenen Zahl abweicht.
Der Code im folgenden Beispiel zeigt keine übereinstimmenden Zeilen an, obwohl der Filter (der für Werte größer oder gleich 0.05
gilt) mit den Zeilen in DataFrame übereinstimmen sollte:
// Create a DataFrame that contains the value 0.05.
DataFrame df = session.sql("select 0.05 :: Numeric(5, 2) as a");
// Applying this filter results in no matching rows in the DataFrame.
df.filter(Functions.col("a").leq(Functions.lit(0.06).minus(Functions.lit(0.01)))).show();
Das Problem ist, dass Functions.lit(0.06)
und Functions.lit(0.01)
Näherungswerte für 0.06
und 0.01
liefern und nicht die genauen Werte.
Zur Umgehung dieses Problems wandeln Sie das Literal in den Snowpark-Typ um, den Sie verwenden möchten. Verwenden Sie zum Beispiel einen NUMBER mit einer Gesamtstellenzahl (precision) von 5 und einer Dezimalstellenzahl (scale) von 2:
import com.snowflake.snowpark_java.types.*;
...
df.filter(Functions.col("a").leq(Functions.lit(0.06).cast(DataTypes.createDecimalType(5, 2)).minus(Functions.lit(0.01).cast(DataTypes.createDecimalType(5, 2))))).show();
Umwandeln eines Spaltenobjekts in einen bestimmten Typ¶
Um ein Column
-Objekt in einen bestimmten Typ umzuwandeln, rufen Sie die cast-Methode auf und übergeben ein Typobjekt aus dem com.snowflake.snowpark_java.types-Paket. So können Sie zum Beispiel ein Literal als NUMBER mit einer Gesamtstellenzahl (precision) von 5 und einer Dezimalstellenzahl (scale) von 2 darstellen:
// Import for the DecimalType class..
import com.snowflake.snowpark_java.types.*;
Column decimalValue = Functions.lit(0.05).cast(DataTypes.createDecimalType(5,2));
Verketten von Methodenaufrufen¶
Da jede Methode, die ein DataFrame-Objekt transformiert, ein neues DataFrame-Objekt zurückgibt, auf das die Transformation angewendet wurde, können Sie Methodenaufrufe verketten, um ein neues DataFrame zu erstellen, das auf eine andere Weise transformiert wird.
Im folgenden Beispiel wird ein DataFrame zurückgegeben, der für folgende Aufgaben konfiguriert ist:
Abfragen der Tabelle
sample_product_data
Rückgabe der Zeile mit
id = 1
Auswahl der Spalten
name
undserial_number
DataFrame dfProductInfo = session.table("sample_product_data").filter(Functions.col("id").equal_to(Functions.lit(1))).select(Functions.col("name"), Functions.col("serial_number"));
dfProductInfo.show();
In diesem Beispiel:
session.table("sample_product_data")
gibt einen DataFrame für die Tabellesample_product_data
zurück.Der DataFrame enthält zwar noch nicht die Daten aus der Tabelle, aber das Objekt enthält die Definitionen der Spalten in der Tabelle.
filter(Functions.col("id").equal_to(Functions.lit(1)))
gibt ein DataFrame für die Tabellesample_product_data
zurück, die so eingerichtet ist, dass sie die Zeile mitid = 1
zurückgibt.Beachten Sie erneut, dass das DataFrame noch nicht die übereinstimmende Zeile aus der Tabelle enthält. Die übereinstimmende Zeile wird erst abgerufen, wenn Sie eine Aktionsmethode aufrufen.
select(Functions.col("name"), Functions.col("serial_number"))
gibt ein DataFrame zurück, das die Spaltenname
undserial_number
für die Zeile in der Tabellesample_product_data
enthält, dieid = 1
hat.
Beachten Sie beim Verketten von Methodenaufrufen, dass die Reihenfolge der Aufrufe wichtig ist. Jeder Methodenaufruf gibt ein DataFrame zurück, das transformiert wurde. Stellen Sie sicher, dass nachfolgende Aufrufe das transformierte DataFrame verwenden.
Im folgenden Beispiel gibt die Methode select
ein DataFrame zurück, das nur zwei Spalten enthält: name
und serial_number
. Der Aufruf der filter
-Methode auf diesem DataFrame schlägt fehl, weil dieser die Spalte id
verwendet, die im transformierten DataFrame nicht vorhanden ist.
// This fails with the error "invalid identifier 'ID'."
DataFrame dfProductInfo = session.table("sample_product_data").select(Functions.col("name"), Functions.col("serial_number")).filter(Functions.col("id").equal_to(Functions.lit(1)));
dfProductInfo.show();
Im Gegensatz dazu wird der folgende Code erfolgreich ausgeführt, weil die filter()
-Methode auf einem DataFrame aufgerufen wird, der alle Spalten der Tabelle sample_product_data
enthält (einschließlich der Spalte id
):
// This succeeds because the DataFrame returned by the table() method
// includes the "id" column.
DataFrame dfProductInfo = session.table("sample_product_data").filter(Functions.col("id").equal_to(Functions.lit(1))).select(Functions.col("name"), Functions.col("serial_number"));
dfProductInfo.show();
Beachten Sie, dass Sie die Methodenaufrufe select
und filter
möglicherweise in einer anderen Reihenfolge ausführen müssen als bei Verwendung der entsprechenden Schlüsselwörter (SELECT und WHERE) in einer SQL-Anweisung.
Begrenzen der Anzahl von Zeilen in einem DataFrame¶
Um die Anzahl der Zeilen in einem DataFrame zu begrenzen, können Sie die Transformationsmethode limit verwenden.
Die Snowpark-API bietet auch folgende Aktionsmethoden zum Abrufen und Ausdrucken einer begrenzten Anzahl von Zeilen:
Aktionsmethode first (zum Ausführen von Abfragen und zum Zurückgeben der ersten
n
Zeilen)Aktionsmethode show (zum Ausführen von Abfragen und zum Drucken der ersten
n
Zeilen)
Diese Methoden fügen der auszuführenden SQL-Anweisung im Endeffekt eine LIMIT-Klausel hinzu.
Wie in den Nutzungshinweisen zu LIMIT erläutert, sind die Ergebnisse nicht deterministisch, es sei denn, Sie geben in Verbindung mit LIMIT eine Sortierreihenfolge (ORDER BY) an.
Um die ORDER BY-Klausel zusammen mit der LIMIT-Klausel zu verwenden (z. B. damit ORDER BY nicht in einer separaten Unterabfrage steht), müssen Sie die Methode aufrufen, die die Ergebnisse auf den von der sort
-Methode zurückgegebenen DataFrame beschränkt.
Wenn Sie beispielsweise Methodenaufrufe verketten:
DataFrame df = session.table("sample_product_data");
// Limit the number of rows to 5, sorted by parent_id.
DataFrame dfSubset = df.sort(Functions.col("parent_id")).limit(5);
// Return the first 5 rows, sorted by parent_id.
Row[] arrayOfRows = df.sort(Functions.col("parent_id")).first(5);
// Print the first 5 rows, sorted by parent_id.
df.sort(Functions.col("parent_id")).show(5);
Abrufen von Spaltendefinitionen¶
Um die Definition der Spalten im Dataset für DataFrame abzurufen, verwenden Sie die schema
-Methode. Diese Methode gibt ein StructType
-Objekt zurück, das ein Array
von StructField
-Objekten enthält. Jedes StructField
-Objekt enthält die Definition einer Spalte.
import com.snowflake.snowpark_java.types.*;
...
// Get the StructType object that describes the columns in the
// underlying rowset.
StructType tableSchema = session.table("sample_product_data").schema();
System.out.println("Schema for sample_product_data: " + tableSchema);
Im zurückgegebenen StructType
-Objekt sind die Spaltennamen immer normalisiert. Bezeichner, die nicht mit Anführungszeichen umschlossen sind, werden in Großbuchstaben zurückgegeben. Bezeichner in Anführungszeichen werden genau in der Schreibweise zurückgegeben, in der sie definiert wurden.
Im folgenden Beispiel wird ein DataFrame erstellt zurück, der die Spalten mit den Namen ID
und 3rd
enthält. Für den Spaltennamen 3rd
schließt die Snowpark-Bibliothek den Namen automatisch in doppelte Anführungszeichen ein ("3rd"
), da der Name nicht den Anforderungen an einen Bezeichner entspricht.
Im Beispiel wird erst die schema
-Methode aufgerufen und dann die names
-Methode auf dem zurückgegebenen Objekt StructType
, um ein Array von Spaltennamen zu erhalten. Die Namen werden in dem von der schema
-Methode zurückgegebenen StructType
normalisiert.
import java.util.Arrays;
...
// Create a DataFrame containing the "id" and "3rd" columns.
DataFrame dfSelectedColumns = session.table("sample_product_data").select(Functions.col("id"), Functions.col("3rd"));
// Print out the names of the columns in the schema.
System.out.println(Arrays.toString(dfSelectedColumns.schema().names()));
Verknüpfen von DataFrames¶
Zum Verknüpfen (Join) von DataFrame-Objekten rufen Sie die join-Methode auf:
In den folgenden Abschnitten wird erläutert, wie DataFrames zur Durchführung einer Verknüpfung verwendet werden:
Einrichten der Beispieldaten für die Verknüpfungen¶
Die Beispiele in den nächsten Abschnitten verwenden Beispieldaten, die Sie durch Ausführen der folgenden SQL-Anweisungen einrichten können:
CREATE OR REPLACE TABLE sample_a (
id_a INTEGER,
name_a VARCHAR,
value INTEGER
);
INSERT INTO sample_a (id_a, name_a, value) VALUES
(10, 'A1', 5),
(40, 'A2', 10),
(80, 'A3', 15),
(90, 'A4', 20)
;
CREATE OR REPLACE TABLE sample_b (
id_b INTEGER,
name_b VARCHAR,
id_a INTEGER,
value INTEGER
);
INSERT INTO sample_b (id_b, name_b, id_a, value) VALUES
(4000, 'B1', 40, 10),
(4001, 'B2', 10, 5),
(9000, 'B3', 80, 15),
(9099, 'B4', NULL, 200)
;
CREATE OR REPLACE TABLE sample_c (
id_c INTEGER,
name_c VARCHAR,
id_a INTEGER,
id_b INTEGER
);
INSERT INTO sample_c (id_c, name_c, id_a, id_b) VALUES
(1012, 'C1', 10, NULL),
(1040, 'C2', 40, 4000),
(1041, 'C3', 40, 4001)
;
Angeben der Spalten für den Join¶
Mit der Methode DataFrame.join
können Sie die zu verwendenden Spalten auf eine der folgenden Arten angeben:
Geben Sie einen Spaltenausdruck an, der die Verknüpfungsbedingung beschreibt.
Geben Sie eine oder mehrere Spalten an, die als gemeinsame Spalten im Join verwendet werden sollen.
Im folgenden Beispiel wird eine innere Verknüpfung (Inner Join) für die Spalte mit dem Namen id_a
ausgeführt:
// Create a DataFrame that joins the DataFrames for the tables
// "sample_a" and "sample_b" on the column named "id_a".
DataFrame dfLhs = session.table("sample_a");
DataFrame dfRhs = session.table("sample_b");
DataFrame dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a").equal_to(dfRhs.col("id_a")));
dfJoined.show();
Beachten Sie, dass im Beispiel die Methode DataFrame.col
verwendet wird, um die im Join zu verwendenden Spalten anzugeben. Weitere Informationen zu dieser Methode finden Sie unter Angeben von Spalten und Ausdrücken.
Dies gibt die folgende Ausgabe aus:
----------------------------------------------------------------------
|"ID_A" |"NAME_A" |"VALUE" |"ID_B" |"NAME_B" |"ID_A" |"VALUE" |
----------------------------------------------------------------------
|10 |A1 |5 |4001 |B2 |10 |5 |
|40 |A2 |10 |4000 |B1 |40 |10 |
|80 |A3 |15 |9000 |B3 |80 |15 |
----------------------------------------------------------------------
Identische Spaltennamen, die im Join-Ergebnis dupliziert sind¶
In dem DataFrame, der aus einer Join-Verknüpfung resultiert, verwendet die Snowpark-Bibliothek die Spaltennamen, die in den verknüpften Tabellen gefunden wurden, selbst wenn die Spaltennamen in den Tabellen identisch sind. In diesem Fall werden diese Spaltennamen in dem aus der Verknüpfung resultierenden DataFrame dupliziert. Um auf eine duplizierte Spalte über den Namen zuzugreifen, rufen Sie die col
-Methode auf dem DataFrame auf, der die Originaltabelle der Spalte repräsentiert. (Weitere Informationen zur Angabe von Spalten finden Sie unter Verweisen auf Spalten in verschiedenen DataFrames.)
Der Code im folgenden Beispiel verbindet zwei DataFrames und ruft dann die select
-Methode auf dem verknüpften DataFrame auf. Die auszuwählenden Spalten werden angegeben, indem die Methode col
über die Variable aufgerufen wird, die die entsprechenden DataFrame-Objekte dfRhs
und dfLhs
repräsentiert. Die as
-Methode wird verwendet, um den von der select
-Methode erstellten Spalten im DataFrame neue Namen zu geben.
DataFrame dfLhs = session.table("sample_a");
DataFrame dfRhs = session.table("sample_b");
DataFrame dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a").equal_to(dfRhs.col("id_a")));
DataFrame dfSelected = dfJoined.select(dfLhs.col("value").as("LeftValue"), dfRhs.col("value").as("RightValue"));
dfSelected.show();
Dies gibt die folgende Ausgabe aus:
------------------------------
|"LEFTVALUE" |"RIGHTVALUE" |
------------------------------
|5 |5 |
|10 |10 |
|15 |15 |
------------------------------
Spalten vor dem Speichern oder Zwischenspeichern deduplizieren¶
Beachten Sie, dass Sie bei einem DataFrame, der aus einer Join-Verknüpfung resultiert und doppelte Spaltennamen enthält, die Spalten deduplizieren oder umbenennen müssen, um die Duplikate aus dem DataFrame zu entfernen, bevor Sie das Ergebnis in einer Tabelle speichern oder den DataFrame zwischenspeichern. Bei doppelten Spaltennamen in einem DataFrame, den Sie in einer Tabelle oder im Cache speichern, ersetzt die Snowpark-Bibliothek die doppelten Spaltennamen durch Aliasnamen, sodass diese nicht mehr doppelt vorkommen.
Das folgende Beispiel veranschaulicht, wie die Ausgabe eines zwischengespeicherten DataFrame aussehen könnte, wenn die Spaltennamen ID_A
und VALUE
in einer Join-Verknüpfung von zwei Tabellen dupliziert und dann vor dem Zwischenspeichern des Ergebnisses nicht dedupliziert oder umbenannt wurden.
--------------------------------------------------------------------------------------------------
|"l_ZSz7_ID_A" |"NAME_A" |"l_ZSz7_VALUE" |"ID_B" |"NAME_B" |"r_heec_ID_A" |"r_heec_VALUE" |
--------------------------------------------------------------------------------------------------
|10 |A1 |5 |4001 |B2 |10 |5 |
|40 |A2 |10 |4000 |B1 |40 |10 |
|80 |A3 |15 |9000 |B3 |80 |15 |
--------------------------------------------------------------------------------------------------
Ausführen einer natürlichen Verknüpfung (Natural Join)¶
Um eine natürliche Verknüpfung (Natural Join) auszuführen, bei der DataFrames über Spalten mit demselben Namen verknüpft werden, rufen Sie die Methode naturalJoin auf.
Im folgenden Beispiel werden die DataFrames für die Tabellen sample_a
und sample_b
über ihre gemeinsamen Spalten (die Spalte id_a
) verknüpft:
DataFrame dfLhs = session.table("sample_a");
DataFrame dfRhs = session.table("sample_b");
DataFrame dfJoined = dfLhs.naturalJoin(dfRhs);
dfJoined.show();
Dies gibt die folgende Ausgabe aus:
---------------------------------------------------
|"ID_A" |"VALUE" |"NAME_A" |"ID_B" |"NAME_B" |
---------------------------------------------------
|10 |5 |A1 |4001 |B2 |
|40 |10 |A2 |4000 |B1 |
|80 |15 |A3 |9000 |B3 |
---------------------------------------------------
Geben Sie den Typ der Verknüpfung an:¶
Standardmäßig erstellt die Methode DataFrame.join
eine innere Verknüpfung (Inner Join). Um einen anderen Verknüpfungstyp anzugeben, verwenden Sie für das Argument joinType
einen der folgenden Werte:
Typ der Verknüpfung (Join) |
|
---|---|
Innere Verknüpfung (Inner Join) |
|
Linke äußere Verknüpfung: |
|
Rechte äußere Verknüpfung: |
|
Vollständige äußere Verknüpfung: |
|
Kreuzverknüpfung (Cross Join) |
|
Beispiel:
// Create a DataFrame that performs a left outer join on
// "sample_a" and "sample_b" on the column named "id_a".
DataFrame dfLhs = session.table("sample_a");
DataFrame dfRhs = session.table("sample_b");
DataFrame dfLeftOuterJoin = dfLhs.join(dfRhs, dfLhs.col("id_a").equal_to(dfRhs.col("id_a")), "left");
dfLeftOuterJoin.show();
Dies gibt die folgende Ausgabe aus:
----------------------------------------------------------------------
|"ID_A" |"NAME_A" |"VALUE" |"ID_B" |"NAME_B" |"ID_A" |"VALUE" |
----------------------------------------------------------------------
|40 |A2 |10 |4000 |B1 |40 |10 |
|10 |A1 |5 |4001 |B2 |10 |5 |
|80 |A3 |15 |9000 |B3 |80 |15 |
|90 |A4 |20 |NULL |NULL |NULL |NULL |
----------------------------------------------------------------------
Mehrere Tabellen verknüpfen¶
So verknüpfen Sie mehrere Tabellen:
Erstellen Sie einen DataFrame für jede Tabelle.
Rufen Sie die Methode
DataFrame.join
für den ersten DataFrame auf, und übergeben Sie den zweiten DataFrame.Rufen Sie mit den von der Methode
join
zurückgegebenen DataFrame die Methodejoin
auf und übergeben Sie den dritten DataFrame.
Sie können die join
-Aufrufe wie unten gezeigt verketten:
DataFrame dfFirst = session.table("sample_a");
DataFrame dfSecond = session.table("sample_b");
DataFrame dfThird = session.table("sample_c");
DataFrame dfJoinThreeTables = dfFirst.join(dfSecond, dfFirst.col("id_a").equal_to(dfSecond.col("id_a"))).join(dfThird, dfFirst.col("id_a").equal_to(dfThird.col("id_a")));
dfJoinThreeTables.show();
Dies gibt die folgende Ausgabe aus:
------------------------------------------------------------------------------------------------------------
|"ID_A" |"NAME_A" |"VALUE" |"ID_B" |"NAME_B" |"ID_A" |"VALUE" |"ID_C" |"NAME_C" |"ID_A" |"ID_B" |
------------------------------------------------------------------------------------------------------------
|10 |A1 |5 |4001 |B2 |10 |5 |1012 |C1 |10 |NULL |
|40 |A2 |10 |4000 |B1 |40 |10 |1040 |C2 |40 |4000 |
|40 |A2 |10 |4000 |B1 |40 |10 |1041 |C3 |40 |4001 |
------------------------------------------------------------------------------------------------------------
Ausführen einer Selbstverknüpfung (Self-Join)¶
Wenn Sie eine Tabelle mit sich selbst über verschiedene Spalten verknüpfen müssen, können Sie die Selbstverknüpfung (Self-Join) nicht mit einem einzelnen DataFrame durchführen. Im folgenden Beispiel wird ein einzelnes DataFrame in einer Selbstverknüpfung verwendet. Diese Ausführung schlägt fehl, weil die Spaltenausdrücke für "id"
sowohl auf der linken als auch der rechten Seite der Verknüpfung vorhanden sind:
// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
DataFrame df = session.table("sample_product_data");
DataFrame dfJoined = df.join(df, Functions.col("id").equal_to(Functions.col("parent_id")));
// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
DataFrame df = session.table("sample_product_data");
DataFrame dfJoined = df.join(df, df.col("id").equal_to(df.col("parent_id")));
Beide Beispiele scheitern mit der folgenden Ausnahme:
Exception in thread "main" com.snowflake.snowpark_java.SnowparkClientException:
Joining a DataFrame to itself can lead to incorrect results due to ambiguity of column references.
Instead, join this DataFrame to a clone() of itself.
Verwenden Sie stattdessen die clone-Methode, um einen Klon des DataFrame-Objekts zu erstellen, und verwenden Sie dann die beiden DataFrame-Objekte für die Verknüpfung:
// Create a DataFrame object for the "sample_product_data" table for the left-hand side of the join.
DataFrame dfLhs = session.table("sample_product_data");
// Clone the DataFrame object to use as the right-hand side of the join.
DataFrame dfRhs = dfLhs.clone();
// Create a DataFrame that joins the two DataFrames
// for the "sample_product_data" table on the
// "id" and "parent_id" columns.
DataFrame dfJoined = dfLhs.join(dfRhs, dfLhs.col("id").equal_to(dfRhs.col("parent_id")));
dfJoined.show();
Wenn Sie eine Selbstverknüpfung auf derselben Spalte ausführen möchten, rufen Sie die join
-Methode auf, die den Namen der Spalte (oder eines Arrays mit Spalten dieses Namens) für die USING
-Klausel übergibt:
// Create a DataFrame that performs a self-join on a DataFrame
// using the column named "key".
DataFrame df = session.table("sample_product_data");
DataFrame dfJoined = df.join(df, "key");
Ausführen einer Aktion zum Auswerten eines DataFrame¶
Wie bereits erwähnt, wird der DataFrame im Lazy-Modus ausgewertet, d. h. die SQL-Anweisung wird erst dann zur Ausführung an den Server gesendet, wenn Sie eine Aktion ausführen. Eine Aktion löst die Auswertung des DataFrame aus und sendet die entsprechende SQL-Anweisung zur Ausführung an den Server.
In den folgenden Abschnitten wird erläutert, wie eine Aktion auf einem DataFrame synchron und asynchron ausgeführt werden kann:
Synchrones Ausführen einer Aktion¶
Um eine Aktion synchron auszuführen, rufen Sie eine der folgenden Aktionsmethoden auf:
Methode zur synchronen Ausführung einer Aktion |
Beschreibung |
---|---|
|
Wertet den DataFrame aus und gibt das resultierende Dataset als ein |
|
Wertet den DataFrame aus und gibt einen |
|
Wertet den DataFrame aus und gibt die Anzahl der Zeilen zurück. |
|
Wertet den DataFrame aus und gibt die Zeilen auf der Konsole aus. Beachten Sie, dass bei dieser Methode die Anzahl der Zeilen (standardmäßig) auf 10 beschränkt ist. Siehe Ausgeben der Zeilen eines DataFrame. |
|
Führt die Abfrage aus, erstellt eine temporäre Tabelle und füllt die Tabelle mit den Ergebnissen. Die Methode gibt ein |
|
Speichert die im DataFrame enthaltenen Daten in die angegebene Tabelle. Siehe Speichern von Daten in eine Tabelle. |
|
Kopiert die im DataFrame enthaltenen Daten in die angegebene Tabelle. Siehe Kopieren von Daten aus Dateien in eine Tabelle. |
|
Löscht Zeilen aus der angegebenen Tabelle. Siehe Aktualisieren, Löschen und Zusammenführen von Zeilen in einer Tabelle. |
|
Aktualisiert Zeilen in der angegebenen Tabelle. Siehe Aktualisieren, Löschen und Zusammenführen von Zeilen in einer Tabelle. |
|
Führt Zeilen in der angegebenen Tabelle zusammen. Siehe Aktualisieren, Löschen und Zusammenführen von Zeilen in einer Tabelle. |
Um beispielsweise die Abfrage auszuführen und die Anzahl der Ergebnisse zurückzugeben, rufen Sie die count
-Methode auf:
// Create a DataFrame for the "sample_product_data" table.
DataFrame dfProducts = session.table("sample_product_data");
// Send the query to the server for execution and
// print the count of rows in the table.
System.out.println("Rows returned: " + dfProducts.count());
Sie können auch Aktionsmethoden aufrufen, um Folgendes zu tun:
Ausführen einer Abfrage auf einer Tabelle und Zurückgeben der Ergebnisse
Ausführen einer Abfrage und Ausgeben der Ergebnisse auf einer Konsole
Hinweis: Wenn Sie die schema
-Methode aufrufen, um die Definitionen der Spalten im DataFrame zu erhalten, müssen Sie keine Aktionsmethode aufrufen.
Asynchrones Ausführen einer Aktion¶
Bemerkung
Dieses Feature wurde in Snowpark 0.11.0 eingeführt.
Zur asynchronen Ausführung einer Aktion müssen Sie die Methode async
aufrufen, um ein Objekt für den „asynchronen Akteur“ (z. B. DataFrameAsyncActor
) zurückzugeben, und dann eine asynchrone Aktionsmethode in diesem Objekt aufrufen.
Diese Aktionsmethoden eines asynchronen Akteursobjekts geben ein TypedAsyncJob
-Objekt zurück, das Sie verwenden können, um den Status der asynchronen Aktion zu überprüfen und die Ergebnisse der Aktion abzurufen.
In den nächsten Abschnitten wird erläutert, wie Sie Aktionen asynchron ausführen und die Ergebnisse überprüfen können.
Erläuterungen zum grundlegenden Ablauf asynchroner Aktionen¶
Sie können die folgenden Methoden verwenden, um eine Aktion asynchron auszuführen:
Methode zur asynchronen Ausführung einer Aktion |
Beschreibung |
---|---|
|
Wertet den DataFrame asynchron aus, um das resultierende Dataset als |
|
Wertet den DataFrame asynchron aus, um einen |
|
Wertet den DataFrame asynchron aus, um die Anzahl der Zeilen abzurufen. |
|
Speichert die im DataFrame enthaltenen Daten asynchron in die angegebene Tabelle. Siehe Speichern von Daten in eine Tabelle. |
|
Kopiert die im DataFrame enthaltenen Daten asynchron in die angegebene Tabelle. Siehe Kopieren von Daten aus Dateien in eine Tabelle. |
|
Löscht Zeilen asynchron aus der angegebenen Tabelle. Siehe Aktualisieren, Löschen und Zusammenführen von Zeilen in einer Tabelle. |
|
Aktualisiert Zeilen asynchron in der angegebenen Tabelle. Siehe Aktualisieren, Löschen und Zusammenführen von Zeilen in einer Tabelle. |
Mit dem zurückgegebenen Objekt TypedAsyncJob können Sie Folgendes tun:
Um festzustellen, ob die Aktion abgeschlossen ist, rufen Sie die Methode
isDone
auf.Um die Abfrage-ID der zugehörigen Aktion zu erhalten, rufen Sie die Methode
getQueryId
auf.Um die Ergebnisse der Aktion zurückzugeben (z. B. das
Array
vonRow
-Objekten für die Methodecollect
oder die Anzahl der Zeilen für die Methodecount
), rufen Sie die MethodegetResult
auf.Beachten Sie, dass
getResult
ein blockierender Aufruf ist.Um die Aktion abzubrechen, rufen Sie die Methode
cancel
auf.
Um zum Beispiel eine Abfrage asynchron auszuführen und die Ergebnisse als Array
von Row
-Objekten abzurufen, rufen Sie async().collect()
auf:
import java.util.Arrays;
// Create a DataFrame with the "id" and "name" columns from the "sample_product_data" table.
// This does not execute the query.
DataFrame df = session.table("sample_product_data").select(Functions.col("id"), Functions.col("name"));
// Execute the query asynchronously.
// This call does not block.
TypedAsyncJob<Row[]> asyncJob = df.async().collect();
// Check if the query has completed execution.
System.out.println("Is query " + asyncJob.getQueryId() + " done? " + asyncJob.isDone());
// Get an Array of Rows containing the results, and print the results.
// Note that getResult is a blocking call.
Row[] results = asyncJob.getResult();
System.out.println(Arrays.toString(results));
Um die Abfrage asynchron auszuführen und die Anzahl der Ergebnisse abzurufen, rufen Sie async().count()
auf:
// Create a DataFrame for the "sample_product_data" table.
DataFrame dfProducts = session.table("sample_product_data");
// Execute the query asynchronously.
// This call does not block.
TypedAsyncJob<Long> asyncJob = dfProducts.async().count();
// Check if the query has completed execution.
System.out.println("Is query " + asyncJob.getQueryId() + " done? " + asyncJob.isDone());
// Print the count of rows in the table.
// Note that getResult is a blocking call.
System.out.println("Rows returned: " + asyncJob.getResult());
Festlegen der maximalen Anzahl der zu wartenden Sekunden¶
Beim Aufruf der Methode getResult
können Sie mit dem Argument maxWaitTimeInSeconds
die maximale Anzahl von Sekunden angeben, die auf den Abschluss der Abfrage gewartet werden soll, bevor versucht wird, die Ergebnisse abzurufen. Beispiel:
// Wait a maximum of 10 seconds for the query to complete before retrieving the results.
Row[] results = asyncJob.getResult(10);
Wenn Sie dieses Argument weglassen, wartet die Methode auf die maximale Anzahl von Sekunden, die in der Konfigurationseigenschaft snowpark_request_timeout_in_seconds angegeben ist. (Dies ist eine Eigenschaft, die Sie beim Erstellen des Sitzungsobjekts festlegen können.)
Zugriff auf eine asynchrone Abfrage über die ID¶
Mithilfe der Abfrage-ID einer zuvor übermittelten asynchronen Abfrage können Sie die Methode Session.createAsyncJob
aufrufen, um ein AsyncJob-Objekt zu erstellen. Mithilfe dieses Objekts können Sie dann den Status der Abfrage überprüfen, die Abfrageergebnisse abrufen oder die Abfrage abbrechen.
Beachten Sie, dass AsyncJob
im Gegensatz zu TypedAsyncJob
keine getResult
-Methode zum Abrufen der Ergebnisse bereitstellt. Wenn Sie die Ergebnisse abrufen müssen, verwenden Sie stattdessen die Methode getRows
oder getIterator
.
Beispiel:
import java.util.Arrays;
...
AsyncJob asyncJob = session.createAsyncJob(myQueryId);
// Check if the query has completed execution.
System.out.println("Is query " + asyncJob.getQueryId() + " done? " + asyncJob.isDone());
// If you need to retrieve the results, call getRows to return an Array of Rows containing the results.
// Note that getRows is a blocking call.
Row[] rows = asyncJob.getRows();
System.out.println(Arrays.toString(rows));
Abrufen von Zeilen in einem DataFrame¶
Nachdem Sie angegeben haben, wie der DataFrame transformiert werden soll, können Sie eine Aktionsmethode aufrufen, um eine Abfrage auszuführen und die Ergebnisse zurückzugeben. Sie können alle Zeilen in einem Array
zurückgeben, oder Sie können einen Iterator
zurückgeben, mit dem Sie die Ergebnisse Zeile für Zeile durchlaufen können. In letzterem Fall werden bei großen Datenmengen die Zeilen blockweise in den Arbeitsspeicher geladen, um das Laden einer großen Datenmenge in den Arbeitsspeicher zu vermeiden.
Alle Zeilen zurückgeben¶
Um alle Zeilen auf einmal zurückzugeben, rufen Sie die Methode collect auf. Diese Methode gibt ein Array von Row-Objekten zurück. Um die Werte aus der Zeile abzurufen, rufen Sie die Methode getType
auf, wobei „Type“ der Datentyp ist (z. B. getString
, getInt
usw.).
Beispiel:
Row[] rows = session.table("sample_product_data").select(Functions.col("name"), Functions.col("category_id")).sort(Functions.col("name")).collect();
for (Row row : rows) {
System.out.println("Name: " + row.getString(0) + "; Category ID: " + row.getInt(1));
}
Zurückgeben eines Iterators für die Zeilen¶
Wenn Sie einen Iterator
verwenden möchten, um die Row-Objekte in den Ergebnissen zeilenweise zu durchlaufen, rufen Sie toLocalIterator auf. Wenn die Datenmenge in den Ergebnissen groß ist, lädt die Methode die Zeilen blockweise, um zu vermeiden, dass alle Zeilen auf einmal in den Arbeitsspeicher geladen werden.
Beispiel:
import java.util.Iterator;
Iterator<Row> rowIterator = session.table("sample_product_data").select(Functions.col("name"), Functions.col("category_id")).sort(Functions.col("name")).toLocalIterator();
while (rowIterator.hasNext()) {
Row row = rowIterator.next();
System.out.println("Name: " + row.getString(0) + "; Category ID: " + row.getInt(1));
}
Zurückgeben der ersten n
Zeilen¶
Um die ersten n
Zeilen zurückzugeben, rufen Sie die Methode first auf und übergeben die Anzahl der zurückzugebenden Zeilen.
Wie unter Begrenzen der Anzahl von Zeilen in einem DataFrame erläutert, sind die Ergebnisse nicht deterministisch. Wenn die Ergebnisse deterministisch sein sollen, müssen Sie diese Methode auf einem sortierten DataFrame (df.sort().first()
) aufrufen.
Beispiel:
import java.util.Arrays;
...
DataFrame df = session.table("sample_product_data");
Row[] rows = df.sort(Functions.col("name")).first(5);
System.out.println(Arrays.toString(rows));
Ausgeben der Zeilen eines DataFrame¶
Um die ersten 10 Zeilen des DataFrame auf der Konsole auszugeben, rufen Sie die Methode show auf. Um eine andere Anzahl von Zeilen auszugeben, übergeben Sie die Anzahl der auszugebenden Zeilen.
Wie unter Begrenzen der Anzahl von Zeilen in einem DataFrame erläutert, sind die Ergebnisse nicht deterministisch. Wenn die Ergebnisse deterministisch sein sollen, müssen Sie diese Methode auf einem sortierten DataFrame (df.sort().show()
) aufrufen.
Beispiel:
DataFrame df = session.table("sample_product_data");
df.sort(Functions.col("name")).show();
Aktualisieren, Löschen und Zusammenführen von Zeilen in einer Tabelle¶
Bemerkung
Dieses Feature wurde in Snowpark 0.7.0 eingeführt.
Wenn Sie Session.table
aufrufen, um ein DataFrame
-Objekt für eine Tabelle zu erstellen, gibt die Methode ein Updatable
-Objekt zurück, das DataFrame
um zusätzliche Methoden zum Aktualisieren und Löschen von Daten in der Tabelle erweitert. (Siehe Updatable.)
Wenn Sie Zeilen in einer Tabelle aktualisieren oder löschen müssen, können Sie die folgenden Methoden der Klasse Updatable
verwenden:
Rufen Sie
update
oderupdateColumn
auf, um in der Tabelle vorhandene Zeilen zu aktualisieren. Siehe Aktualisieren von Zeilen in einer Tabelle.Rufen Sie
delete
aus, um Zeilen aus einer Tabelle zu löschen. Siehe Löschen von Zeilen aus einer Tabelle.Rufen Sie
merge
auf, um Zeilen einer Tabelle auf der Basis von Daten einer zweiten Tabelle oder Unterabfrage einzufügen, zu aktualisieren und zu löschen. (Dies ist das Äquivalent zum Befehl MERGE in SQL.) Siehe Zusammenführen (Merge) von Zeilen in einer Tabelle.
Aktualisieren von Zeilen in einer Tabelle¶
Um die Zeilen einer Tabelle zu aktualisieren, rufen Sie die Methode update
oder updateColumn
auf und übergeben eine Map
, die die Zuordnung der zu aktualisierenden Spalten und der entsprechenden Werte, die diesen Spalten zugewiesen werden sollen, enthält:
Um die Spaltennamen als Zeichenfolgen in der
Map
anzugeben, rufen SieupdateColumn
auf.Um
Column
-Objekte inMap
anzugeben, rufen Sieupdate
auf.
Beide Methoden geben ein UpdateResult
-Objekt zurück, das die Anzahl der aktualisierten Zeilen enthält. (siehe UpdateResult).
Bemerkung
Beide Methoden sind Aktionsmethoden, was bedeutet, dass bei Aufruf der Methode SQL-Anweisungen zur Ausführung an den Server gesendet werden.
Wenn Sie beispielsweise die Werte in der Spalte mit dem Namen count
durch den Wert 1
ersetzen und dabei eine Map
verwenden möchten, die den Spaltennamen (ein String
-Wert) mit dem entsprechenden Wert verknüpft, rufen Sie updateColumn
auf:
import java.util.HashMap;
import java.util.Map;
...
Map<String, Column> assignments = new HashMap<>();
assignments.put("3rd", Functions.lit(1));
Updatable updatableDf = session.table("sample_product_data");
UpdateResult updateResult = updatableDf.updateColumn(assignments);
System.out.println("Number of rows updated: " + updateResult.getRowsUpdated());
Wenn Sie in der ein Map
ein Column
-Objekt verwenden möchten, um die zu aktualisierende Spalte zu identifizieren, rufen Sie update
auf:
import java.util.HashMap;
import java.util.Map;
...
Map<Column, Column> assignments = new HashMap<>();
assignments.put(Functions.col("3rd"), Functions.lit(1));
Updatable updatableDf = session.table("sample_product_data");
UpdateResult updateResult = updatableDf.update(assignments);
System.out.println("Number of rows updated: " + updateResult.getRowsUpdated());
Wenn die Aktualisierung nur erfolgen soll, wenn eine Bedingung erfüllt ist, können Sie diese Bedingung als Argument angeben. Im folgenden Beispiel wird der Wert in der Spalte mit dem Namen count
durch 2
für die Zeilen ersetzt, in denen die Spalte category_id
den Wert 20
hat:
import java.util.HashMap;
import java.util.Map;
...
Map<Column, Column> assignments = new HashMap<>();
assignments.put(Functions.col("3rd"), Functions.lit(2));
Updatable updatableDf = session.table("sample_product_data");
UpdateResult updateResult = updatableDf.update(assignments, Functions.col("category_id").equal_to(Functions.lit(20)));
System.out.println("Number of rows updated: " + updateResult.getRowsUpdated());
Wenn die Bedingung auf einer Verknüpfung mit einem anderen DataFrame
-Objekt basieren soll, können Sie dieses DataFrame
als Argument übergeben und dieses DataFrame
in der Bedingung verwenden. Im folgenden Beispiel wird der Wert in der Spalte mit dem Namen count
durch 3
für die Zeilen ersetzt, in denen die Spalte category_id
mit der category_id
in der DataFrame
-Spalte dfParts
übereinstimmt:
import java.util.HashMap;
import java.util.Map;
...
Map<Column, Column> assignments = new HashMap<>();
assignments.put(Functions.col("3rd"), Functions.lit(3));
Updatable updatableDf = session.table("sample_product_data");
DataFrame dfParts = session.table("parts");
UpdateResult updateResult = updatableDf.update(assignments, updatableDf.col("category_id").equal_to(dfParts.col("category_id")), dfParts);
System.out.println("Number of rows updated: " + updateResult.getRowsUpdated());
Löschen von Zeilen aus einer Tabelle¶
Sie können für die Methode delete
eine Bedingung angeben, die die zu löschenden Zeilen identifiziert, und diese Bedingung kann auf einer Verknüpfung (Join) mit einem anderen DataFrame basieren. delete
gibt ein DeleteResult
-Objekt zurück, das die Anzahl der gelöschten Zeilen enthält (siehe DeleteResult).
Bemerkung
delete
ist eine Aktionsmethode, was bedeutet, dass bei Aufruf der Methode SQL-Anweisungen zur Ausführung an den Server gesendet werden.
Im folgenden Beispiel werden Zeilen gelöscht, die den Wert 1
in der Spalte category_id
haben:
Updatable updatableDf = session.table("sample_product_data");
DeleteResult deleteResult = updatableDf.delete(updatableDf.col("category_id").equal_to(Functions.lit(1)));
System.out.println("Number of rows deleted: " + deleteResult.getRowsDeleted());
Wenn sich die Bedingung auf Spalten in einem anderen DataFrame bezieht, übergeben Sie dieses DataFrame als zweites Argument. Im folgenden Beispiel werden die Zeilen gelöscht, in denen die Spalte category_id
mit der category_id
in der DataFrame
-Spalte dfParts
übereinstimmt, wobei dfParts
als zweites Argument übergeben wird:
Updatable updatableDf = session.table("sample_product_data");
DeleteResult deleteResult = updatableDf.delete(updatableDf.col("category_id").equal_to(dfParts.col("category_id")), dfParts);
System.out.println("Number of rows deleted: " + deleteResult.getRowsDeleted());
Zusammenführen (Merge) von Zeilen in einer Tabelle¶
Wenn Sie Zeilen einer Tabelle einfügen, aktualisieren oder löschen möchten, die auf Werten einer zweiten Tabelle oder einer Unterabfrage basieren (das Äquivalent des MERGE-Befehls in SQL), gehen Sie wie folgt vor:
Rufen Sie im
Updatable
-Objekt der Tabelle, in der die Daten zusammengeführt werden sollen, die Methodemerge
auf, und übergeben Sie dabei dasDataFrame
-Objekt für die andere Tabelle und den Spaltenausdruck für die Verknüpfungsbedingung.Dies gibt ein
MergeBuilder
-Objekt zurück, das Sie verwenden können, um die Aktionen (z. B. Einfügen, Aktualisieren, Löschen) für die übereinstimmenden und die nicht übereinstimmenden Zeilen anzugeben (siehe MergeBuilder).Verwendung des Objekts
MergeBuilder
:Um die Aktualisierung oder Löschung von übereinstimmenden Zeilen anzugeben, rufen Sie die Methode
whenMatched
auf.Wenn Sie eine zusätzliche Bedingung angeben müssen, ob Zeilen aktualisiert oder gelöscht werden sollen, können Sie einen Spaltenausdruck für diese Bedingung übergeben.
Diese Methode gibt ein
MatchedClauseBuilder
-Objekt zurück, das Sie zur Angabe der auszuführenden Aktion verwenden können (siehe MatchedClauseBuilder).Rufen Sie die Methode
update
oderdelete
im ObjektMatchedClauseBuilder
auf, um die Aktualisierungs- oder Löschaktion anzugeben, die für die übereinstimmenden Zeilen durchgeführt werden soll. Diese Methoden geben einMergeBuilder
-Objekt zurück, das Sie zur Angabe zusätzlicher Klauseln verwenden können.Um die Einfügung anzugeben, die ausgeführt werden soll, wenn Zeilen nicht übereinstimmen, rufen Sie die Methode
whenNotMatched
auf.Wenn Sie für das Einfügen einer Zeile eine zusätzliche Bedingung angeben müssen, können Sie einen Spaltenausdruck für diese Bedingung übergeben.
Diese Methode gibt ein
NotMatchedClauseBuilder
-Objekt zurück, das Sie zur Angabe der auszuführenden Aktion verwenden können (siehe NotMatchedClauseBuilder).Rufen Sie die Methode
insert
im ObjektNotMatchedClauseBuilder
auf, um die Einfügeaktion festzulegen, die durchgeführt werden soll, wenn Zeilen nicht übereinstimmen. Diese Methoden geben einMergeBuilder
-Objekt zurück, das Sie zur Angabe zusätzlicher Klauseln verwenden können.
Wenn Sie die auszuführenden Einfügungen, Aktualisierungen und Löschungen angegeben haben, rufen Sie die Methode
collect
des ObjektsMergeBuilder
auf, um die angegebenen Einfügungen, Aktualisierungen und Löschungen in der Tabelle auszuführen.collect
gibt einMergeResult
-Objekt zurück, das die Anzahl der Zeilen enthält, die eingefügt, aktualisiert und gelöscht wurden (siehe MergeResult).
Das folgende Beispiel fügt eine Zeile mit den Spalten id
und value
aus der Tabelle source
in die Tabelle target
ein, wenn die Tabelle target
keine Zeile mit einer übereinstimmenden ID enthält:
MergeResult mergeResult = target.merge(source, target.col("id").equal_to(source.col("id")))
.whenNotMatched().insert([source.col("id"), source.col("value")])
.collect();
Im folgenden Beispiel wird eine Zeile in der Tabelle target
mit dem Wert der Spalte value
aus der Zeile in der Tabelle source
aktualisiert, die dieselbe ID hat:
import java.util.HashMap;
import java.util.Map;
...
Map<String, Column> assignments = new HashMap<>();
assignments.put("value", source.col("value"));
MergeResult mergeResult = target.merge(source, target.col("id").equal_to(source.col("id")))
.whenMatched().update(assignments)
.collect();
Speichern von Daten in eine Tabelle¶
Sie können den Inhalt eines DataFrame in eine neue oder bestehende Tabelle speichern. Dazu müssen Sie über die folgenden Berechtigungen verfügen:
CREATE TABLE-Berechtigungen für das Schema, wenn die Tabelle nicht vorhanden ist.
INSERT-Berechtigungen für die Tabelle.
So speichern Sie den Inhalt eines DataFrame in eine Tabelle:
Rufen Sie die write-Methode des DataFrame auf, um ein DataFrameWriter-Objekt zu erhalten.
Rufen Sie die mode-Methode des
DataFrameWriter
-Objekts auf, und übergeben Sie dabei ein SaveMode-Objekt, das Ihre Präferenzen für das Schreiben in die Tabelle angibt:Um Zeilen einzufügen, übergeben Sie
SaveMode.Append
.Um die vorhandene Tabelle zu überschreiben, übergeben Sie
SaveMode.Overwrite
.
Diese Methode gibt das gleiche
DataFrameWriter
-Objekt zurück, das mit dem angegebenen Modus konfiguriert wurde.Wenn Sie Zeilen in eine bestehende Tabelle einfügen (
SaveMode.Append
) und die Spaltennamen im DataFrame mit den Spaltennamen in der Tabelle übereinstimmen, rufen Sie die DataFrameWriter.option-Methode auf, und geben Sie"columnOrder"
und"name"
als Argumente an.Bemerkung
Diese Methode wurde in Snowpark 1.4.0 eingeführt.
Standardmäßig ist die Option
columnOrder
auf"index"
gesetzt, was bedeutet, dassDataFrameWriter
die Werte in der Reihenfolge einfügt, in der die Spalten erscheinen. So fügtDataFrameWriter
beispielsweise den Wert aus der ersten Spalte von DataFrame in die erste Spalte der Tabelle ein, die zweite Spalte von DataFrame in die zweite Spalte der Tabelle usw.Diese Methode gibt das gleiche
DataFrameWriter
-Objekt zurück, das mit der angegebenen Option konfiguriert wurde.Rufen Sie die Methode saveAsTable des
DataFrameWriter
-Objekts auf, um den Inhalt des DataFrame in der angegebenen Tabelle zu speichern.Sie müssen keine separate Methode (z. B.
collect
) aufrufen, um die SQL-Anweisung auszuführen, mit der Daten in die Tabelle gespeichert werden.saveAsTable
ist eine Aktionsmethode, die SQL-Anweisungen ausführt.
Im folgenden Beispiel wird eine bestehende Tabelle (identifiziert durch die Variable tableName
) mit dem Inhalt von DataFrame df
überschrieben:
df.write().mode(SaveMode.Overwrite).saveAsTable(tableName);
Im folgenden Beispiel wird eine Zeile aus dem DataFrame df
in die bestehende Tabelle (identifiziert durch die Variable tableName
) eingefügt: In diesem Beispiel enthalten die Tabelle und das DataFrame beide die Spalten c1
und c2
.
Das Beispiel zeigt den Unterschied zwischen der Einstellung der Option columnOrder
auf "name"
(fügt Werte in die Tabellenspalten mit denselben Namen wie die DataFrame-Spalten ein) und der Verwendung der Standardoption columnOrder
(fügt Werte in die Tabellenspalten basierend auf der Reihenfolge der Spalten in DataFrame ein).
DataFrame df = session.sql("SELECT 1 AS c2, 2 as c1");
// With the columnOrder option set to "name", the DataFrameWriter uses the column names
// and inserts a row with the values (2, 1).
df.write().mode(SaveMode.Append).option("columnOrder", "name").saveAsTable(tableName);
// With the default value of the columnOrder option ("index"), the DataFrameWriter uses the column positions
// and inserts a row with the values (1, 2).
df.write().mode(SaveMode.Append).saveAsTable(tableName);
Erstellen einer Ansicht aus einem DataFrame¶
Um eine Ansicht aus einem DataFrame zu erstellen, rufen Sie die Methode createOrReplaceView auf:
df.createOrReplaceView("db.schema.viewName");
Beachten Sie, dass bei Aufruf von createOrReplaceView
die neue Ansicht sofort erstellt wird. Noch wichtiger ist, dass der Aufruf nicht dazu führt, dass der DataFrame ausgewertet wird. (Der DataFrame selbst wird erst ausgewertet, wenn Sie eine Aktion ausführen.)
Ansichten, die Sie durch Aufruf von createOrReplaceView
erstellen, sind persistent. Wenn Sie diese Ansicht nicht mehr benötigen, können Sie die Ansicht manuell löschen.
Wenn Sie eine temporäre Ansicht nur für die Sitzung erstellen müssen, rufen Sie stattdessen die Methode createOrReplaceTempView auf:
df.createOrReplaceTempView("db.schema.viewName");
Zwischenspeichern eines DataFrame¶
In einigen Fällen müssen Sie eine komplexe Abfrage ausführen und die Ergebnisse für spätere Operationen aufbewahren (um nicht dieselbe Abfrage erneut ausführen zu müssen). In diesen Fällen können Sie den Inhalt eines DataFrame zwischenspeichern, indem Sie die Methode cacheResult aufrufen.
Diese Methode ermöglicht Folgendes:
Ausführen der Abfrage.
Es ist nicht erforderlich, vor dem Aufruf von
cacheResult
eine separate Aktionsmethode aufzurufen, um die Ergebnisse abzurufen.cacheResult
ist eine Aktionsmethode, die die Abfrage ausführt.Speichern der Ergebnisse in einer temporären Tabelle.
Da
cacheResult
eine temporäre Tabelle erstellt, müssen Sie über die CREATE TABLE-Berechtigung für das verwendete Schema verfügen.Zurückgeben eines HasCachedResult-Objekts, das den Zugriff auf die Ergebnisse in der temporären Tabelle ermöglicht.
Da
HasCachedResult
denDataFrame
erweitert, können Sie auf diesen zwischengespeicherten Daten einige der Operationen ausführen, die auch auf dem DataFrame ausgeführt werden können.
Bemerkung
Da cacheResult
die Abfrage ausführt und die Ergebnisse in einer Tabelle speichert, kann diese Methode zu erhöhten Compute- und Speicherkosten führen.
Beispiel:
// Set up a DataFrame to query a table.
DataFrame df = session.table("sample_product_data").filter(Functions.col("category_id").gt(Functions.lit(10)));
// Retrieve the results and cache the data.
HasCachedResult cachedDf = df.cacheResult();
// Create a DataFrame containing a subset of the cached data.
DataFrame dfSubset = cachedDf.filter(Functions.col("category_id").equal_to(Functions.lit(20))).select(Functions.col("name"), Functions.col("category_id"));
dfSubset.show();
Beachten Sie, dass der ursprüngliche DataFrame durch den Aufruf dieser Methode nicht beeinflusst wird. Angenommen, dfTable
ist ein DataFrame der Tabelle sample_product_data
:
HasCachedResult dfTempTable = dfTable.cacheResult();
Nach Aufruf von cacheResult
verweist dfTable
immer noch auf die Tabelle sample_product_data
, und Sie können dfTable
weiterhin zur Abfrage und Aktualisierung dieser Tabelle verwenden.
Für die Weiterverarbeitung der in der temporären Tabelle zwischengespeicherten Daten verwenden Sie dfTempTable
(das von cacheResult
zurückgegebene HasCachedResult
-Objekt).
Verwenden von Dateien in Stagingbereichen¶
Die Snowpark-Bibliothek bietet Klassen und Methoden, mit denen Sie Daten in Snowflake laden und Daten aus Snowflake entladen können, indem Sie Dateien in Stagingbereichen verwenden.
Bemerkung
Um diese Klassen und Methoden auf einem Stagingbereich anwenden zu können, müssen Sie über die erforderlichen Berechtigungen für die Verwendung des Stagingbereichs verfügen.
In den nächsten Abschnitten wird erläutert, wie diese Klassen und Methoden verwendet werden:
Verwenden von Eingabestreams zum Hoch- und Herunterladen von Daten eines Stagingbereichs
Einrichten eines DataFrame für Dateien in einem Stagingbereich
Speichern eines DataFrame in Dateien in einem Stagingbereich
Hochladen und Herunterladen von Dateien in Stagingbereichen¶
Zum Hoch- und Herunterladen von Dateien eines Stagingbereichs verwenden Sie die Methoden put
und get
des Objekts FileOperation:
Hochladen von Dateien in einen Stagingbereich¶
So laden Sie Dateien in einen Stagingbereich hoch:
Überprüfen Sie, ob Sie über die erforderlichen Berechtigungen zum Hochladen von Dateien in den Stagingbereich verfügen.
Verwenden Sie die Methode file des Objekts
Session
, um auf das Objekt FileOperation für die Sitzung zuzugreifen.Rufen Sie die Methode put des Objekts
FileOperation
auf, um die Dateien in einen Stagingbereich hochzuladen.Diese Methode führt den SQL-Befehl PUT aus.
Um alle optionalen Parameter für den PUT-Befehl anzugeben, erstellen Sie eine
Map
der Parameter und Werte und übergeben dieMap
alsoptions
-Argument. Beispiel:import java.util.HashMap; import java.util.Map; ... // Upload a file to a stage without compressing the file. Map<String, String> putOptions = new HashMap<>(); putOptions.put("AUTO_COMPRESS", "FALSE"); PutResult[] putResults = session.file().put("file:///tmp/myfile.csv", "@myStage", putOptions);
Im Argument
localFileName
können Sie Platzhalter (*
und?
) verwenden, um einen Satz von Dateien zu identifizieren, die hochgeladen werden sollen. Beispiel:// Upload the CSV files in /tmp with names that start with "file". // You can use the wildcard characters "*" and "?" to match multiple files. PutResult[] putResults = session.file().put("file:///tmp/file*.csv", "@myStage/prefix2")
Überprüfen Sie das
Array
von PutResult-Objekten, die von derput
-Methode zurückgegeben werden, um festzustellen, ob die Dateien erfolgreich hochgeladen wurden. Geben Sie zum Beispiel den Dateinamen und den Status der PUT-Operation für diese Datei aus:// Print the filename and the status of the PUT operation. for (PutResult result : putResults) { System.out.println(result.getSourceFileName() + ": " + result.getStatus()); }
Herunterladen von Dateien aus Stagingbereichen¶
So laden Sie Dateien aus einem Stagingbereich herunter:
Überprüfen Sie, ob Sie über die erforderlichen Berechtigungen zum Herunterladen von Dateien aus dem Stagingbereich verfügen.
Verwenden Sie die Methode file des Objekts
Session
, um auf das Objekt FileOperation für die Sitzung zuzugreifen.Rufen Sie die Methode get des Objekts
FileOperation
auf, um die Dateien aus einem Stagingbereich herunterzuladen.Diese Methode führt den SQL-Befehl GET aus.
Um alle optionalen Parameter für den GET-Befehl anzugeben, erstellen Sie eine
Map
der Parameter und Werte und übergeben dieMap
alsoptions
-Argument. Beispiel:import java.util.HashMap; import java.util.Map; ... // Upload a file to a stage without compressing the file. // Download files with names that match a regular expression pattern. Map<String, String> getOptions = new HashMap<>(); getOptions.put("PATTERN", "'.*file_.*.csv.gz'"); GetResult[] getResults = session.file().get("@myStage", "file:///tmp", getOptions);
Überprüfen Sie das
Array
von GetResult-Objekten, die von derget
-Methode zurückgegeben werden, um festzustellen, ob die Dateien erfolgreich heruntergeladen wurden. Geben Sie zum Beispiel den Dateinamen und den Status der GET-Operation für diese Datei aus:// Print the filename and the status of the GET operation. for (GetResult result : getResults) { System.out.println(result.getFileName() + ": " + result.getStatus()); }
Verwenden von Eingabestreams zum Hoch- und Herunterladen von Daten eines Stagingbereichs¶
Bemerkung
Dieses Feature wurde in Snowpark 1.4.0 eingeführt.
Um Eingabestreams zum Hochladen von Daten in eine Stagingdatei und zum Herunterladen von Daten aus einer Stagingdatei zu verwenden, verwenden Sie die Methoden uploadStream
und downloadStream
des Objekts FileOperation:
Verwenden eines Eingabestreams zum Hochladen von Daten in eine Datei in einem Stagingbereich
Verwenden eines Eingabestreams zum Herunterladen von Daten aus einer Datei in einem Stagingbereich
Verwenden eines Eingabestreams zum Hochladen von Daten in eine Datei in einem Stagingbereich¶
So laden Sie die Daten aus einem java.io.InputStream-Objekt in eine Datei in einem Stagingbereich hoch:
Überprüfen Sie, ob Sie über die erforderlichen Berechtigungen zum Hochladen von Dateien in den Stagingbereich verfügen.
Verwenden Sie die Methode file des Objekts
Session
, um auf das Objekt FileOperation für die Sitzung zuzugreifen.Rufen Sie die Methode uploadStream des Objekts
FileOperation
auf.Übergeben Sie den vollständigen Pfad zu der Datei im Stagingbereich, in die die Daten geschrieben werden sollen, sowie das Objekt
InputStream
. Verwenden Sie außerdem das Argumentcompress
, um anzugeben, ob die Daten vor dem Hochladen komprimiert werden sollen oder nicht.
Beispiel:
import java.io.InputStream;
...
boolean compressData = true;
String pathToFileOnStage = "@myStage/path/file";
session.file().uploadStream(pathToFileOnStage, new ByteArrayInputStream(fileContent.getBytes()), compressData);
Verwenden eines Eingabestreams zum Herunterladen von Daten aus einer Datei in einem Stagingbereich¶
So laden Sie die Daten aus einer Datei in einem Stagingbereich in ein java.io.InputStream-Objekt herunter:
Überprüfen Sie, ob Sie über die erforderlichen Berechtigungen zum Herunterladen von Dateien aus dem Stagingbereich verfügen.
Verwenden Sie die Methode file des Objekts
Session
, um auf das Objekt FileOperation für die Sitzung zuzugreifen.Rufen Sie die Methode downloadStream des Objekts
FileOperation
auf.Übergeben Sie den vollständigen Pfad zu der Datei im Stagingbereich, die die herunterzuladenden Daten enthält. Verwenden Sie das Argument
decompress
, um anzugeben, ob die Daten in der Datei komprimiert sind oder nicht.
Beispiel:
import java.io.InputStream;
...
boolean isDataCompressed = true;
String pathToFileOnStage = "@myStage/path/file";
InputStream is = session.file().downloadStream(pathToFileOnStage, isDataCompressed);
Einrichten eines DataFrame für Dateien in einem Stagingbereich¶
In diesem Abschnitt wird erläutert, wie Sie ein DataFrame-Objekt für Dateien einrichten, die sich in einem Snowflake-Stagingbereich befindet. Sobald Sie den DataFrame erstellt haben, können Sie den DataFrame für Folgendes verwenden:
Um einen DataFrame für eine Datei einzurichten, die sich ein einem Snowflake-Stagingbereich befindet, verwenden Sie die Klasse DataFrameReader
:
Überprüfen Sie, ob Sie über die folgenden Berechtigungen verfügen:
Berechtigungen für den Zugriff auf Dateien in dem Stagingbereich.
Eine der folgenden Optionen:
CREATE TABLE-Berechtigungen für das Schema, wenn Sie Kopieroptionen angeben möchten, die bestimmen, wie die Daten aus den Stagingdateien kopiert werden.
Andernfalls CREATE FILE FORMAT-Berechtigungen für das Schema.
Rufen Sie die Methode
read
derSession
-Klasse auf, um auf einDataFrameReader
-Objekt zuzugreifen.Wenn die Dateien im CSV-Format sind, beschreiben Sie die Felder in der Datei. Gehen Sie dabei wie folgt vor:
Erstellen Sie ein StructType-Objekt, das aus einem Array von StructField-Objekten besteht, die die Felder in der Datei beschreiben.
Geben Sie für jedes
StructField
-Objekt Folgendes an:Name des Feldes
Datentyp des Feldes (angegeben als Objekt im
com.snowflake.snowpark_java.types
-Paket)Ob das Feld nullwertfähig ist oder nicht
Beispiel:
import com.snowflake.snowpark_java.types.*; ... StructType schemaForDataFile = StructType.create( new StructField("id", DataTypes.StringType, true), new StructField("name", DataTypes.StringType, true));
Rufen Sie die Methode
schema
im ObjektDataFrameReader
-Objekt auf, und übergeben Sie dabei dasStructType
-Objekt.Beispiel:
DataFrameReader dfReader = session.read().schema(schemaForDataFile);
Die Methode
schema
gibt einDataFrameReader
-Objekt zurück, das so konfiguriert ist, dass es Dateien liest, die die angegebenen Felder enthalten.Beachten Sie, dass Sie dies für Dateien in anderen Formaten (z. B. JSON) nicht tun müssen. Für solche Dateien behandelt der
DataFrameReader
die Daten als ein einzelnes Feld vom Typ VARIANT mit dem Feldnamen$1
.
Wenn Sie zusätzliche Informationen darüber angeben müssen, wie die Daten gelesen werden sollen (z. B. dass die Daten komprimiert sind oder dass eine CSV-Datei ein Semikolon statt eines Kommas zum Begrenzen von Feldern verwendet), rufen Sie die Methode DataFrameReader.option oder DataFrameReader.options auf.
Geben Sie den Namen und den Wert der Option ein, die Sie einstellen möchten. Sie können die folgenden Typen von Optionen einstellen:
Dateiformatoptionen, die in der CREATE FILE FORMAT-Dokumentation beschrieben sind
Kopieroptionen, die in der COPY INTO TABLE-Dokumentation beschrieben sind
Beachten Sie, dass die Einstellung von Kopieroptionen zu einer teureren Ausführungsstrategie führen kann, wenn Sie die Daten in den DataFrame abrufen.
Im folgenden Beispiel wird das
DataFrameReader
-Objekt für die Abfrage von Daten einer CSV-Datei eingerichtet, die nicht komprimiert ist und die ein Semikolon als Feldbegrenzer verwendet.dfReader = dfReader.option("field_delimiter", ";").option("COMPRESSION", "NONE");
Die Methode
option
gibt einDataFrameReader
-Objekt zurück, das mit den angegebenen Optionen konfiguriert ist.Um mehrere Optionen festzulegen, können Sie entweder verkettete Aufrufe mit der
option
-Methode verwenden (wie im obigen Beispiel gezeigt) oder die DataFrameReader.options-Methode aufrufen, wobei Sie einMap
-Objekt der Namen und Werte der Optionen übergeben.Rufen Sie die Methode auf, die dem Format der Dateien entspricht: Sie können eine der folgenden Methoden aufrufen:
Beim Aufrufen dieser Methoden übergeben Sie den Speicherort des Stagingbereichs der zu lesenden Dateien. Beispiel:
DataFrame df = dfReader.csv("@mystage/myfile.csv");
Wenn Sie mehrere Dateien angeben möchten, die mit demselben Präfix beginnen, geben Sie das Präfix nach dem Namen des Stagingbereichs an. So laden Sie beispielsweise Dateien mit dem Präfix
csv_
aus dem Stagingbereich@mystage
:DataFrame df = dfReader.csv("@mystage/csv_");
Die Methoden, die dem Format einer Datei entsprechen, geben ein CopyableDataFrame-Objekt für diese Datei zurück.
CopyableDataFrame
erweitertDataFrame
und bietet zusätzliche Methoden für die Verarbeitung der Daten in den Stagingdateien.Rufen Sie eine Aktionsmethode auf, um Folgendes auszuführen:
Wie bei DataFrames für Tabellen werden die Daten erst dann in den DataFrame abgerufen, wenn Sie eine Aktionsmethode aufrufen.
Laden von Daten aus Dateien in einen DataFrame¶
Nachdem Sie einen DataFrame für Dateien in einem Stagingbereich eingerichtet haben, können Sie Daten aus den Dateien in den DataFrame laden:
Verwenden Sie die DataFrame-Objektmethoden, um alle Transformationen auszuführen, die für das Dataset benötigt werden (Auswahl bestimmter Felder, Filtern von Zeilen usw.).
Im folgenden Beispiel wird das Element
color
aus einer JSON-Datei namensdata.json
extrahiert, die sich im Stagingbereichmystage
befindet:DataFrame df = session.read().json("@mystage/data.json").select(Functions.col("$1").subField("color"));
Wie bereits erläutert, werden Daten in Dateien, die nicht im CSV-Format sind (z. B. JSON), von
DataFrameReader
als eine einzelne VARIANT-Spalte mit dem Namen$1
behandelt.Rufen Sie die Methode
DataFrame.collect
auf, um die Daten zu laden. Beispiel:Row[] results = df.collect();
Kopieren von Daten aus Dateien in eine Tabelle¶
Nachdem das Einrichten eines DataFrame für Dateien in einem Stagingbereich abgeschlossen ist, können Sie die Methode copyInto aufrufen, um die Daten in eine Tabelle zu kopieren. Diese Methode führt den Befehl COPY INTO <Tabelle> aus.
Bemerkung
Sie dürfen die collect
-Methode erst aufrufen, nachdem Sie copyInto
aufgerufen haben. Die Daten aus der Datei dürfen sich erst im DataFrame befinden, nachdem Sie copyInto
aufgerufen haben.
Mit dem folgenden Code werden beispielsweise Daten aus der durch myFileStage
angegebenen CSV-Datei in die Tabelle mytable
geladen. Da sich die Daten in einer CSV-Datei befinden, muss der Code auch die Felder in der Datei beschreiben. Im Beispiel wird dazu die Methode schema des DataFrameReader
-Objekts aufgerufen, und es wird ein StructType-Objekt (schemaForDataFile
) mit einem Array von StructField-Objekten übergeben, die die Felder beschreiben.
CopyableDataFrame copyableDf = session.read().schema(schemaForDataFile).csv("@mystage/myfile.csv");
copyableDf.copyInto("mytable");
Speichern eines DataFrame in Dateien in einem Stagingbereich¶
Bemerkung
Dieses Feature wurde in Snowpark 1.5.0 eingeführt.
Wenn Sie einen DataFrame in Dateien in einem Stagingbereich speichern müssen, können Sie die DataFrameWriter-Methode aufrufen, die dem Format der Datei entspricht (z. B. die csv
-Methode, um in eine CSV-Datei zu schreiben), und dabei den Speichertort im Stagingbereich angeben, an dem die Dateien gespeichert werden sollen. Diese DataFrameWriter
-Methoden führen den Befehl COPY INTO <Speicherort> aus.
Bemerkung
Sie dürfen die collect
-Methode erst aufrufen, nachdem Sie diese DataFrameWriter
-Methoden aufgerufen haben. Die Daten aus der Datei dürfen sich erst im DataFrame befinden, nachdem Sie diese Methoden aufgerufen haben.
So speichern Sie den Inhalt eines DataFrame in Dateien in einem Stagingbereich:
Rufen Sie die write-Methode des DataFrame auf, um ein DataFrameWriter-Objekt zu erhalten. So erhalten Sie beispielsweise das
DataFrameWriter
-Objekt für einen DataFrame, der die Tabelle mit dem Namensample_product_data
repräsentiert:DataFrameWriter dfWriter = session.table("sample_product_data").write();
Wenn Sie den Inhalt der Datei überschreiben möchten (falls die Datei existiert), rufen Sie die Methode mode des Objekts
DataFrameWriter
auf und übergebenSaveMode.Overwrite
.Andernfalls meldet der
DataFrameWriter
-Objekt standardmäßig einen Fehler, wenn die angegebene Datei im Stagingbereich bereits vorhanden ist.Diese
mode
-Methode gibt das gleicheDataFrameWriter
-Objekt zurück, das mit dem angegebenen Modus konfiguriert wurde.So geben Sie beispielsweise an, dass
DataFrameWriter
die im Stagingbereich befindliche Datei überschreiben soll:dfWriter = dfWriter.mode(SaveMode.Overwrite);
Wenn Sie zusätzliche Informationen darüber angeben müssen, wie die Daten gespeichert werden sollen (z. B. dass die Daten komprimiert werden sollen oder dass Sie ein Semikolon zur Abgrenzung von Feldern in einer CSV-Datei verwenden möchten), dann rufen Sie die Methode DataFrameWriter.option oder DataFrameWriter.options auf.
Geben Sie den Namen und den Wert der Option ein, die Sie einstellen möchten. Sie können die folgenden Typen von Optionen einstellen:
Dateiformatoptionen, die in der Dokumentation zu COPY INTO <Speicherort> beschrieben sind.
Kopieroptionen, die in der Dokumentation zu COPY INTO <Speicherort> beschrieben sind.
Beachten Sie, dass Sie die Methode
option
nicht zum Einstellen der folgenden Optionen verwenden können:Die Option für den Formattyp TYPE.
Die Kopieroption OVERWRITE. Um diese Option zu setzen, rufen Sie stattdessen die Methode
mode
auf (wie im vorherigen Schritt erwähnt).
Im folgenden Beispiel wird das
DataFrameWriter
-Objekt so eingerichtet, dass es Daten in einer CSV-Datei in unkomprimierter Form speichert, wobei ein Semikolon (statt eines Kommas) als Trennzeichen verwendet wird.dfWriter = dfWriter.option("field_delimiter", ";").option("COMPRESSION", "NONE");
Die Methode
option
gibt einDataFrameWriter
-Objekt zurück, das mit den angegebenen Optionen konfiguriert ist.Um mehrere Optionen zu setzen, können Sie verkettete Aufrufe mit der Methode
option
verwenden (wie im obigen Beispiel gezeigt) oder die Methode DataFrameWriter.options aufrufen und dabei einMap
-Objekt der Namen und Werte der Optionen übergeben.Um Details über jede gespeicherte Datei zu erhalten, setzen Sie die Kopieroption
DETAILED_OUTPUT
aufTRUE
.Standardmäßig hat
DETAILED_OUTPUT
den WertFALSE
, was bedeutet, dass die Methode eine einzelne Ausgabezeile mit den Feldern"rows_unloaded"
,"input_bytes"
und"output_bytes"
zurückgibt.Wenn Sie
DETAILED_OUTPUT
aufTRUE
setzen, gibt die Methode für jede gespeicherte Datei eine Zeile der Ausgabe zurück. Jede Zeile enthält die FelderFILE_NAME
,FILE_SIZE
undROW_COUNT
.Rufen Sie die Methode auf, die dem Format der Datei entspricht, um die Daten in der Datei zu speichern. Sie können eine der folgenden Methoden aufrufen:
Beim Aufrufen dieser Methoden übergeben Sie den Speicherort des Stagingbereichs der Datei, in die die Daten geschrieben werden sollen (z. B.
@mystage
).Standardmäßig speichert die Methode die Daten in Dateinamen mit dem Präfix
data_
(z. B.@mystage/data_0_0_0.csv
). Wenn Sie möchten, dass die Dateien mit einem anderen Präfix benannt werden, geben Sie das Präfix nach dem Stagingbereichsnamen an. Beispiel:WriteFileResult writeFileResult = dfWriter.csv("@mystage/saved_data");
In diesem Beispiel wird der Inhalt des DataFrame-Objekts in Dateien gespeichert, die mit dem Präfix
saved_data
beginnen (z. B.@mystage/saved_data_0_0_0.csv
).Überprüfen Sie das zurückgegebene WriteFileResult-Objekt auf Informationen zu der in die Datei geschriebene Datenmenge.
Über das
WriteFileResult
-Objekt können Sie auf die vom Befehl COPY INTO <Speicherort> erzeugte Ausgabe zugreifen:Um auf die Zeilen der Ausgabe als Array von Row-Objekten zuzugreifen, rufen Sie die Methode
getRows
auf.Um festzustellen, welche Felder in den Zeilen vorhanden sind, rufen Sie die Methode
getSchema
auf, die ein StructType-Objekt zurückgibt, das die Felder in der Zeile beschreibt.
So drucken Sie beispielsweise die Namen der Felder und Werte in den Ausgabezeilen aus:
WriteFileResult writeFileResult = dfWriter.csv("@mystage/saved_data"); Row[] rows = writeFileResult.getRows(); StructType schema = writeFileResult.getSchema(); for (int i = 0 ; i < rows.length ; i++) { System.out.println("Row:" + i); Row row = rows[i]; for (int j = 0; j < schema.size(); j++) { System.out.println(schema.get(j).name() + ": " + row.get(j)); } }
Im folgende Beispiel wird ein DataFrame-Objekt verwendet, um den Inhalt der Tabelle mit dem Namen car_sales
in JSON-Dateien mit dem Präfix saved_data
im Stagingbereich @mystage
(z. B. @mystage/saved_data_0_0_0.json
) zu speichern. Hier der Beispielcode:
Überschreibt die Datei, wenn die Datei bereits im Stagingbereich vorhanden ist.
Gibt eine detaillierte Ausgabe zur Speicheroperation zurück.
Speichert die Daten unkomprimiert.
Abschließend druckt der Beispielcode jedes Feld und jeden Wert in den zurückgegebenen Ausgabezeilen aus:
DataFrame df = session.table("car_sales");
WriteFileResult writeFileResult = df.write().mode(SaveMode.Overwrite).option("DETAILED_OUTPUT", "TRUE").option("compression", "none").json("@mystage/saved_data");
Row[] rows = writeFileResult.getRows();
StructType schema = writeFileResult.getSchema();
for (int i = 0 ; i < rows.length ; i++) {
System.out.println("Row:" + i);
Row row = rows[i];
for (int j = 0; j < schema.size(); j++) {
System.out.println(schema.get(j).name() + ": " + row.get(j));
}
}
Verwenden von semistrukturierten Daten¶
Mit einem DataFrame können Sie semistrukturierte Daten abfragen und darauf zugreifen (z. B. JSON-Daten). In den nächsten Abschnitten wird erläutert, wie semistrukturierte Daten in einem DataFrame verwendet werden:
Bemerkung
Die Beispiele in diesen Abschnitten verwenden die Beispieldaten aus In Beispielen verwendete Beispieldaten.
Durchsuchen semistrukturierter Daten¶
Um auf ein bestimmtes Feld oder Element in semistrukturierten Daten zu verweisen, verwenden Sie die folgenden Methoden des Column-Objekts:
Verwenden Sie subField(„<Feldname>“), um ein
Column
-Objekt für ein Feld in einem OBJECT (oder ein VARIANT, das ein OBJECT enthält) zurückzugeben.Verwenden Sie subField(<Index>), um ein
Column
-Objekt für ein Element in einem ARRAY (oder ein VARIANT, das ein ARRAY enthält) zurückzugeben.
Bemerkung
Wenn der Feldname oder die Elemente im Pfad unregelmäßig sind und die Verwendung der Column.apply
-Methoden erschweren, können Sie als Alternative Functions.get, Functions.get_ignore_case oder Functions.get_path verwenden.
Im folgenden Code wird beispielsweise das Feld dealership
in Objekten der Spalte src
der Beispieldaten ausgewählt:
DataFrame df = session.table("car_sales");
df.select(Functions.col("src").subField("dealership")).show();
Der Code gibt Folgendes aus:
----------------------------
|"""SRC""['DEALERSHIP']" |
----------------------------
|"Valley View Auto Sales" |
|"Tindel Toyota" |
----------------------------
Bemerkung
Die Werte in DataFrame sind von doppelten Anführungszeichen umgeben, da diese Werte als Zeichenfolgenliterale zurückgegeben werden. Weitere Informationen zum Umwandeln dieser Werte in einen bestimmten Typ finden Sie unter Explizites Umwandeln von Werten semistrukturierter Daten.
Sie können auch Methodenaufrufe verketten, um einen Pfad zu einem bestimmten Feld oder Element zu durchlaufen.
Im folgende Code wird zum Beispiel das Feld name
im Objekt salesperson
ausgewählt:
DataFrame df = session.table("car_sales");
df.select(Functions.col("src").subField("salesperson").subField("name")).show();
Der Code gibt Folgendes aus:
------------------------------------
|"""SRC""['SALESPERSON']['NAME']" |
------------------------------------
|"Frank Beasley" |
|"Greg Northrup" |
------------------------------------
In einem weiteren Codebeispiel wird das erste Element des Feldes vehicle
ausgewählt, das eine Reihe von Fahrzeugen enthält. Im Beispiel wird auch das Feld price
aus dem ersten Element ausgewählt.
DataFrame df = session.table("car_sales");
df.select(Functions.col("src").subField("vehicle").subField(0)).show();
df.select(Functions.col("src").subField("vehicle").subField(0).subField("price")).show();
Der Code gibt Folgendes aus:
---------------------------
|"""SRC""['VEHICLE'][0]" |
---------------------------
|{ |
| "extras": [ |
| "ext warranty", |
| "paint protection" |
| ], |
| "make": "Honda", |
| "model": "Civic", |
| "price": "20275", |
| "year": "2017" |
|} |
|{ |
| "extras": [ |
| "ext warranty", |
| "rust proofing", |
| "fabric protection" |
| ], |
| "make": "Toyota", |
| "model": "Camry", |
| "price": "23500", |
| "year": "2017" |
|} |
---------------------------
------------------------------------
|"""SRC""['VEHICLE'][0]['PRICE']" |
------------------------------------
|"20275" |
|"23500" |
------------------------------------
Als Alternative zur Methode apply
können Sie die Funktionen get, get_ignore_case oder get_path verwenden, wenn der Feldname oder die Elemente im Pfad unregelmäßig sind und die Verwendung der Column.subField
-Methoden erschweren.
Die folgenden Codezeilen geben zum Beispiel beide den Wert eines bestimmten Feldes in einem Objekt aus:
df.select(Functions.get(Functions.col("src"), Functions.lit("dealership"))).show();
df.select(Functions.col("src").subField("dealership")).show();
In ähnlicher Weise geben die folgenden Codezeilen beide den Wert eines Feldes für einen bestimmten Pfad in einem Objekt aus:
df.select(Functions.get_path(Functions.col("src"), Functions.lit("vehicle[0].make"))).show();
df.select(Functions.col("src").subField("vehicle").subField(0).subField("make")).show();
Explizites Umwandeln von Werten semistrukturierter Daten¶
Standardmäßig werden die Werte von Feldern und Elementen als Zeichenfolgenliterale (einschließlich der doppelten Anführungszeichen) zurückgegeben, wie in den obigen Beispielen gezeigt.
Um unerwartete Ergebnisse zu vermeiden, rufen Sie die Methode cast auf, um den Wert in einen ganz bestimmten Typ umzuwandeln. Mit dem folgenden Code werden zum Beispiel die Werte ohne und mit Umwandlung ausgegeben:
// Import the objects for the data types, including StringType.
import com.snowflake.snowpark_java.types.*;
...
DataFrame df = session.table("car_sales");
df.select(Functions.col("src").subField("salesperson").subField("id")).show();
df.select(Functions.col("src").subField("salesperson").subField("id").cast(DataTypes.StringType)).show();
Der Code gibt Folgendes aus:
----------------------------------
|"""SRC""['SALESPERSON']['ID']" |
----------------------------------
|"55" |
|"274" |
----------------------------------
---------------------------------------------------
|"CAST (""SRC""['SALESPERSON']['ID'] AS STRING)" |
---------------------------------------------------
|55 |
|274 |
---------------------------------------------------
Vereinfachen eines Arrays von Objekten zu Zeilen¶
Wenn Sie semistrukturierte Daten für ein DataFrame vereinfachen müssen (z. B. eine Zeile für jedes Objekt in einem Array erstellen), rufen Sie die Methode flatten auf. Diese Methode entspricht der SQL-Funktion FLATTEN. Wenn Sie einen Pfad zu einem Objekt oder Array übergeben, gibt die Methode ein DataFrame zurück, das eine Zeile für jedes Feld oder Element im Objekt oder Array enthält.
In den Beispieldaten ist beispielsweise src:customer
ein Array von Objekten, die Informationen über einen Kunden enthalten. Jedes Objekt enthält ein Feld name
und ein Feld address
.
Wenn Sie diesen Pfad an die Funktion flatten
übergeben:
DataFrame df = session.table("car_sales");
df.flatten(Functions.col("src").subField("customer")).show();
Dann gibt diese Methode einen DataFrame zurück:
----------------------------------------------------------------------------------------------------------------------------------------------------------
|"SRC" |"SEQ" |"KEY" |"PATH" |"INDEX" |"VALUE" |"THIS" |
----------------------------------------------------------------------------------------------------------------------------------------------------------
|{ |1 |NULL |[0] |0 |{ |[ |
| "customer": [ | | | | | "address": "San Francisco, CA", | { |
| { | | | | | "name": "Joyce Ridgely", | "address": "San Francisco, CA", |
| "address": "San Francisco, CA", | | | | | "phone": "16504378889" | "name": "Joyce Ridgely", |
| "name": "Joyce Ridgely", | | | | |} | "phone": "16504378889" |
| "phone": "16504378889" | | | | | | } |
| } | | | | | |] |
| ], | | | | | | |
| "date": "2017-04-28", | | | | | | |
| "dealership": "Valley View Auto Sales", | | | | | | |
| "salesperson": { | | | | | | |
| "id": "55", | | | | | | |
| "name": "Frank Beasley" | | | | | | |
| }, | | | | | | |
| "vehicle": [ | | | | | | |
| { | | | | | | |
| "extras": [ | | | | | | |
| "ext warranty", | | | | | | |
| "paint protection" | | | | | | |
| ], | | | | | | |
| "make": "Honda", | | | | | | |
| "model": "Civic", | | | | | | |
| "price": "20275", | | | | | | |
| "year": "2017" | | | | | | |
| } | | | | | | |
| ] | | | | | | |
|} | | | | | | |
|{ |2 |NULL |[0] |0 |{ |[ |
| "customer": [ | | | | | "address": "New York, NY", | { |
| { | | | | | "name": "Bradley Greenbloom", | "address": "New York, NY", |
| "address": "New York, NY", | | | | | "phone": "12127593751" | "name": "Bradley Greenbloom", |
| "name": "Bradley Greenbloom", | | | | |} | "phone": "12127593751" |
| "phone": "12127593751" | | | | | | } |
| } | | | | | |] |
| ], | | | | | | |
| "date": "2017-04-28", | | | | | | |
| "dealership": "Tindel Toyota", | | | | | | |
| "salesperson": { | | | | | | |
| "id": "274", | | | | | | |
| "name": "Greg Northrup" | | | | | | |
| }, | | | | | | |
| "vehicle": [ | | | | | | |
| { | | | | | | |
| "extras": [ | | | | | | |
| "ext warranty", | | | | | | |
| "rust proofing", | | | | | | |
| "fabric protection" | | | | | | |
| ], | | | | | | |
| "make": "Toyota", | | | | | | |
| "model": "Camry", | | | | | | |
| "price": "23500", | | | | | | |
| "year": "2017" | | | | | | |
| } | | | | | | |
| ] | | | | | | |
|} | | | | | | |
----------------------------------------------------------------------------------------------------------------------------------------------------------
Von diesem DataFrame aus können Sie die Felder name
und address
jedes Objekts im Feld VALUE
auswählen:
df.flatten(Functions.col("src").subField("customer")).select(Functions.col("value").subField("name"), Functions.col("value").subField("address")).show();
-------------------------------------------------
|"""VALUE""['NAME']" |"""VALUE""['ADDRESS']" |
-------------------------------------------------
|"Joyce Ridgely" |"San Francisco, CA" |
|"Bradley Greenbloom" |"New York, NY" |
-------------------------------------------------
Der folgende Code ergänzt das vorherige Beispiel, indem die Werte in einen bestimmten Typ umgewandelt und die Namen der Spalten geändert werden:
df.flatten(Functions.col("src").subField("customer")).select(Functions.col("value").subField("name").cast(DataTypes.StringType).as("Customer Name"), Functions.col("value").subField("address").cast(DataTypes.StringType).as("Customer Address")).show();
-------------------------------------------
|"Customer Name" |"Customer Address" |
-------------------------------------------
|Joyce Ridgely |San Francisco, CA |
|Bradley Greenbloom |New York, NY |
-------------------------------------------
Ausführen von SQL-Anweisungen¶
Um eine von Ihnen angegebene SQL-Anweisung auszuführen, rufen Sie die Methode sql
in der Klasse Session
auf und übergeben die auszuführende Anweisung. Diese Methode gibt einen DataFrame zurück.
Beachten Sie, dass die SQL-Anweisung erst ausgeführt wird, wenn Sie eine Aktionsmethode aufrufen.
import java.util.Arrays;
// Get the list of the files in a stage.
// The collect() method causes this SQL statement to be executed.
DataFrame dfStageFiles = session.sql("ls @myStage");
Row[] files = dfStageFiles.collect();
System.out.println(Arrays.toString(files));
// Resume the operation of a warehouse.
// Note that you must call the collect method in order to execute
// the SQL statement.
session.sql("alter warehouse if exists myWarehouse resume if suspended").collect();
DataFrame tableDf = session.table("sample_product_data").select(Functions.col("id"), Functions.col("name"));
// Get the count of rows from the table.
long numRows = tableDf.count();
System.out.println("Count: " + numRows);
Wenn Sie Methoden zur Transformation des DataFrame aufrufen möchten (z. B. Filtern, Auswahl usw.), beachten Sie, dass diese Methoden nur funktionieren, wenn die zugrunde liegende SQL-Anweisung eine SELECT-Anweisung ist. Die Transformationsmethoden werden für andere Arten von SQL-Anweisungen nicht unterstützt.
import java.util.Arrays;
DataFrame df = session.sql("select id, category_id, name from sample_product_data where id > 10");
// Because the underlying SQL statement for the DataFrame is a SELECT statement,
// you can call the filter method to transform this DataFrame.
Row[] results = df.filter(Functions.col("category_id").lt(Functions.lit(10))).select(Functions.col("id")).collect();
System.out.println(Arrays.toString(results));
// In this example, the underlying SQL statement is not a SELECT statement.
DataFrame dfStageFiles = session.sql("ls @myStage");
// Calling the filter method results in an error.
dfStageFiles.filter(...);