Snowpark Javaでの DataFrames の操作¶
Snowpark内でデータをクエリして処理する主な方法は、 DataFrame を使用することです。このトピックでは、 DataFrames の操作方法について説明します。
このトピックの内容:
データを取得して操作するには、 DataFrame クラスを使用します。DataFrame は、遅延評価されるリレーショナルデータセットを表します。これは、特定のアクションがトリガーされたときにのみ実行されます。ある意味で、 DataFrame は、データを取得するために評価する必要があるクエリのようなものです。
データを DataFrame に取得するには、
DataFrame を作成し、データセットのためにデータのソースを指定 します。
たとえば、テーブル、外部 CSV ファイル、または SQL ステートメントの実行からのデータを保持する DataFrame を作成できます。
DataFrame のデータセットを変換する方法を指定 します。
たとえば、どの列を選択するか、行をどのようにフィルタリングするか、結果をどのようにソートおよびグループ化するかなどを指定できます。
ステートメントを実行して、データをDataFrame に取得 します。
データを DataFrame に取得するには、アクションを実行するメソッド(例:
collect()
メソッド)を呼び出す必要があります。
次のセクションでは、これらのステップについて詳しく説明します。
このセクションの例の設定¶
このセクションの例のいくつかは、 DataFrame を使用して sample_product_data
という名前のテーブルをクエリします。これらの例を実行する場合は、次の SQL ステートメントを実行することにより、このテーブルを作成し、テーブルにデータを入力できます。
CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT, amount NUMBER(12, 2), quantity INT, product_date DATE);
INSERT INTO sample_product_data VALUES
(1, 0, 5, 'Product 1', 'prod-1', 1, 10, 1.00, 15, TO_DATE('2021.01.01', 'YYYY.MM.DD')),
(2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20, 2.00, 30, TO_DATE('2021.02.01', 'YYYY.MM.DD')),
(3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30, 3.00, 45, TO_DATE('2021.03.01', 'YYYY.MM.DD')),
(4, 0, 10, 'Product 2', 'prod-2', 2, 40, 4.00, 60, TO_DATE('2021.04.01', 'YYYY.MM.DD')),
(5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50, 5.00, 75, TO_DATE('2021.05.01', 'YYYY.MM.DD')),
(6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60, 6.00, 90, TO_DATE('2021.06.01', 'YYYY.MM.DD')),
(7, 0, 20, 'Product 3', 'prod-3', 3, 70, 7.00, 105, TO_DATE('2021.07.01', 'YYYY.MM.DD')),
(8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80, 7.25, 120, TO_DATE('2021.08.01', 'YYYY.MM.DD')),
(9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90, 7.50, 135, TO_DATE('2021.09.01', 'YYYY.MM.DD')),
(10, 0, 50, 'Product 4', 'prod-4', 4, 100, 7.75, 150, TO_DATE('2021.10.01', 'YYYY.MM.DD')),
(11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100, 8.00, 165, TO_DATE('2021.11.01', 'YYYY.MM.DD')),
(12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100, 8.50, 180, TO_DATE('2021.12.01', 'YYYY.MM.DD'));
テーブルが作成されたことを確認するには、次のコマンドを実行します。
SELECT * FROM sample_product_data;
DataFrame の構築¶
DataFrame を構築するには、 Session
クラスのメソッドを使用できます。次の各メソッドは、異なるタイプのデータソースから DataFrame を作成します。
テーブル、ビュー、またはストリームのデータから DataFrame を作成するには、
table
メソッドを呼び出します。// Create a DataFrame from the data in the "sample_product_data" table. DataFrame dfTable = session.table("sample_product_data"); // Print out the first 10 rows. dfTable.show();
注釈
table
メソッドはUpdatable
オブジェクトを返します。Updatable
はDataFrame
を拡張し、テーブル内のデータを操作するための追加のメソッド(例: データを更新および削除するためのメソッド)を提供します。 テーブル内の行の更新、削除、およびマージ をご参照ください。指定された値から DataFrame を作成するには、
値を含む
Row
オブジェクトの配列を作成します。これらの値のデータ型を記述する
StructType
オブジェクトを作成します。createDataFrame
メソッドを呼び出し、配列とStructType
オブジェクトを渡します。
// Import name from the types package, which contains StructType and StructField. import com.snowflake.snowpark_java.types.*; ... // Create a DataFrame containing specified values. Row[] data = {Row.create(1, "a"), Row.create(2, "b")}; StructType schema = StructType.create( new StructField("num", DataTypes.IntegerType), new StructField("str", DataTypes.StringType)); DataFrame df = session.createDataFrame(data, schema); // Print the contents of the DataFrame. df.show();
注釈
Snowflakeによって予約された単語は、 DataFrame を構築するときの列名としては無効です。予約された単語のリストについては、 予約済みおよび限定キーワード をご参照ください。
値の範囲を含む DataFrame を作成するには、
range
メソッドを呼び出します。// Create a DataFrame from a range DataFrame dfRange = session.range(1, 10, 2); // Print the contents of the DataFrame. dfRange.show();
ステージにファイルの DataFrame を作成する には、
read
を呼び出してDataFrameReader
オブジェクトを取得します。DataFrameReader
オブジェクトで、ファイル内のデータの形式に対応するメソッドを呼び出します。// Create a DataFrame from data in a stage. DataFrame dfJson = session.read().json("@mystage2/data1.json"); // Print the contents of the DataFrame. dfJson.show();
SQL クエリの結果を保持する DataFrame を作成するには、
sql
メソッドを呼び出します。// Create a DataFrame from a SQL query DataFrame dfSql = session.sql("SELECT name from sample_product_data"); // Print the contents of the DataFrame. dfSql.show();
注: このメソッドを使用すると、テーブルおよびステージングされたファイルからデータを取得する SELECT ステートメントを実行できますが、そうではなく、
table
およびread
メソッドを使用するようにします。table
やread
のようなメソッドは、開発ツールでより優れた構文の強調表示、エラーの強調表示、およびインテリジェントなコード補完を提供できます。
データセットの変換方法の指定¶
選択する列と、結果のフィルタリング、並べ替え、グループ化などを指定するには、データセットを変換する DataFrame メソッドを呼び出します。これらのメソッドで列を識別するには、 Functions.col
静的メソッドまたは列に評価される式を使用します。(列と式の指定 を参照。)
例:
返される行を指定するには、
filter
メソッドを呼び出します。// Create a DataFrame for the rows with the ID 1 // in the "sample_product_data" table. DataFrame df = session.table("sample_product_data").filter( Functions.col("id").equal_to(Functions.lit(1))); df.show();
選択する列を指定するには、
select
メソッドを呼び出します。// Create a DataFrame that contains the id, name, and serial_number // columns in te "sample_product_data" table. DataFrame df = session.table("sample_product_data").select( Functions.col("id"), Functions.col("name"), Functions.col("serial_number")); df.show();
各メソッドは、変換された新しい DataFrame オブジェクトを返します。(このメソッドは元の DataFrame オブジェクトには影響を及ぼしません。)つまり、複数の変換を適用する場合は、 チェーンメソッド呼び出し を実行して、前のメソッド呼び出しによって返された、新しい DataFrame オブジェクトに対する後続の各変換メソッドを呼び出すことができます。
これらの変換方法は、Snowflakeデータベースからデータを取得しないことに注意してください。(DataFrame を評価するアクションの実行 で説明されているアクションメソッドは、データ取得を実行します。)変換メソッドは、 SQL ステートメントの作成方法を指定するだけです。
列と式の指定¶
これらの変換メソッドを呼び出すときは、列または列を使用する式を指定する必要がある場合があります。たとえば、 select
メソッドを呼び出すときは、選択する必要のある列を指定する必要があります。
列を参照するには、 Functions.col 静的メソッドを呼び出して、 Column オブジェクトを作成します。
DataFrame dfProductInfo = session.table("sample_product_data").select(Functions.col("id"), Functions.col("name"));
dfProductInfo.show();
注釈
リテラルの Column
オブジェクトを作成するには、 列オブジェクトとしてのリテラルの使用 をご参照ください。
フィルター、射影、結合条件などを指定する場合は、式で Column
オブジェクトを使用できます。例:
filter
メソッドでColumn
オブジェクトを使用して、フィルター条件を指定できます。// Specify the equivalent of "WHERE id = 12" // in an SQL SELECT statement. DataFrame df = session.table("sample_product_data"); df.filter(Functions.col("id").equal_to(Functions.lit(12))).show();
// Specify the equivalent of "WHERE key + category_id < 10" // in an SQL SELECT statement. DataFrame df2 = session.table("sample_product_data"); df2.filter(Functions.col("key").plus(Functions.col("category_id")).lt(Functions.lit(10))).show();
select
メソッドでColumn
オブジェクトを使用して、エイリアスを定義できます。// Specify the equivalent of "SELECT key * 10 AS c" // in an SQL SELECT statement. DataFrame df3 = session.table("sample_product_data"); df3.select(Functions.col("key").multiply(Functions.lit(10)).as("c")).show();
join
メソッドでColumn
オブジェクトを使用して、結合条件を定義できます。// Specify the equivalent of "sample_a JOIN sample_b on sample_a.id_a = sample_b.id_a" // in an SQL SELECT statement. DataFrame dfLhs = session.table("sample_a"); DataFrame dfRhs = session.table("sample_b"); DataFrame dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a").equal_to(dfRhs.col("id_a"))); dfJoined.show();
異なる DataFrames の列の参照¶
同じ名前の2つの異なる DataFrame オブジェクトの列を参照する場合(例: その列の DataFrames を結合する場合)、各 DataFrame オブジェクトにある col
メソッドを使用して、そのオブジェクトにある列を参照できます(例: df1.col("name")
と df2.col("name")
)。
次の例は、 col
メソッドを使用して特定の DataFrame の列を参照する方法を示しています。この例では、両方とも value
という名前の列を持つ2つの DataFrame オブジェクトを結合します。この例では、 Column
オブジェクトの as
メソッドを使用して、新しく作成された DataFrame の列の名前を変更します。
// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.col method to refer to the columns used in the join.
DataFrame dfLhs = session.table("sample_a");
DataFrame dfRhs = session.table("sample_b");
DataFrame dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a").equal_to(dfRhs.col("id_a"))).select(dfLhs.col("value").as("L"), dfRhs.col("value").as("R"));
dfJoined.show();
オブジェクト識別子(テーブル名、列名など)の前後での二重引用符の使用¶
指定するデータベース、スキーマ、テーブル、およびステージの名前は、 Snowflake識別子の要件 に準拠している必要があります。名前を指定すると、Snowflakeはその名前を大文字と見なします。たとえば、次の呼び出しは同等です。
// The following calls are equivalent:
df.select(Functions.col("id123"));
df.select(Functions.col("ID123"));
名前が識別子の要件に準拠していない場合は、名前を二重引用符("
)で囲む必要があります。バックスラッシュ(\
)を使用して、Scala文字列リテラル内の二重引用符をエスケープします。たとえば、次のテーブル名は文字やアンダースコアで始まらないため、名前を二重引用符で囲む必要があります。
DataFrame df = session.table("\"10tablename\"");
列 の名前を指定するときは、名前を二重引用符で囲む必要はありません。名前が識別子の要件に準拠していない場合、Snowparkライブラリは、列名を自動的に二重引用符で囲みます。
// The following calls are equivalent:
df.select(Functions.col("3rdID"));
df.select(Functions.col("\"3rdID\""));
// The following calls are equivalent:
df.select(Functions.col("id with space"));
df.select(Functions.col("\"id with space\""));
すでに列名を二重引用符で囲んでいる場合、ライブラリは名前を二重引用符で囲みません。
場合によっては、列名に二重引用符が含まれることがあります。
describe table quoted;
+------------------------+ ...
| name | ...
|------------------------+ ...
| name_with_"air"_quotes | ...
| "column_name_quoted" | ...
+------------------------+ ...
識別子の要件 で説明されているように、二重引用符で囲まれた識別子内の二重引用符文字ごとに、2つの二重引用符文字(例: "name_with_""air""_quotes"
と """column_name_quoted"""
)を使用する必要があります。
DataFrame dfTable = session.table("quoted");
dfTable.select("\"name_with_\"\"air\"\"_quotes\"");
dfTable.select("\"\"\"column_name_quoted\"\"\"");
識別子が二重引用符で囲まれている場合は(明示的に引用符を追加したか、ライブラリが引用符を追加したかに関係なく)、 Snowflakeは識別子を大文字と小文字を区別する ものとして扱います。
// The following calls are NOT equivalent!
// The Snowpark library adds double quotes around the column name,
// which makes Snowflake treat the column name as case-sensitive.
df.select(Functions.col("id with space"));
df.select(Functions.col("ID WITH SPACE"));
列オブジェクトとしてのリテラルの使用¶
Column
オブジェクト内で渡すメソッドでリテラルを使用するには、リテラルを Functions
クラスにある lit
静的メソッドに渡して、 Column
オブジェクトを作成します。例:
// Show the first 10 rows in which category_id is greater than 5.
// Use `Functions.lit(5)` to create a Column object for the literal 5.
DataFrame df = session.table("sample_product_data");
df.filter(Functions.col("category_id").gt(Functions.lit(5))).show();
リテラルがJavaの浮動小数点または倍精度値である場合(例: 0.05
は デフォルトでDoubleとして扱う)、Snowparkライブラリは SQL を生成し、対応するSnowparkデータ型に値を暗黙的にキャストします(例: 0.05::DOUBLE
)。これにより、指定された正確な数とは異なる概算値が生成される可能性があります。
たとえば、次のコードは、フィルター(0.05
以上の値に一致する)が DataFrame の行に一致する必要がある場合でも、一致する行を表示しません。
// Create a DataFrame that contains the value 0.05.
DataFrame df = session.sql("select 0.05 :: Numeric(5, 2) as a");
// Applying this filter results in no matching rows in the DataFrame.
df.filter(Functions.col("a").leq(Functions.lit(0.06).minus(Functions.lit(0.01)))).show();
問題は、 Functions.lit(0.06)
と Functions.lit(0.01)
が、正確な値ではなく、 0.06
と 0.01
の概算値を生成することです。
この問題を回避するには、使用する Snowparkの型にリテラルをキャスト します。たとえば、精度が5でスケールが2の NUMBER を使用するには、次のようにします。
import com.snowflake.snowpark_java.types.*;
...
df.filter(Functions.col("a").leq(Functions.lit(0.06).cast(DataTypes.createDecimalType(5, 2)).minus(Functions.lit(0.01).cast(DataTypes.createDecimalType(5, 2))))).show();
列オブジェクトの特定型のキャスト¶
Column
オブジェクトを特定の型にキャストするには、 cast メソッドを呼び出し、 com.snowflake.snowpark_java.typesパッケージ から型オブジェクトを渡します。たとえば、リテラルを5の精度と2のスケールで NUMBER としてキャストするには、次のようにします。
// Import for the DecimalType class..
import com.snowflake.snowpark_java.types.*;
Column decimalValue = Functions.lit(0.05).cast(DataTypes.createDecimalType(5,2));
メソッド呼び出しのチェーン¶
DataFrame オブジェクトを変換するメソッド ごとに、変換が適用された新しい DataFrame オブジェクトが返されるため、 メソッド呼び出しのチェーン により、追加の方法で変換される新しい DataFrame を生成できます。
次の例では、次のように構成された DataFrame を返します。
sample_product_data
テーブルをクエリします。id = 1
で行を返します。name
およびserial_number
列を選択します。
DataFrame dfProductInfo = session.table("sample_product_data").filter(Functions.col("id").equal_to(Functions.lit(1))).select(Functions.col("name"), Functions.col("serial_number"));
dfProductInfo.show();
この例では、
session.table("sample_product_data")
は、sample_product_data
テーブルの DataFrame を返します。DataFrame にはまだテーブルのデータが含まれていませんが、オブジェクトにはテーブルの列の定義が含まれています。
filter(Functions.col("id").equal_to(Functions.lit(1)))
は、id = 1
で行を返すように設定されたsample_product_data
テーブルの DataFrame を返します。DataFrame には、テーブルからの一致する行がまだ含まれていないことに改めて注意してください。 アクションメソッドを呼び出す まで、一致する行は取得されません。
select(Functions.col("name"), Functions.col("serial_number"))
は、id = 1
を持つsample_product_data
テーブルの行のname
列とserial_number
列を含む DataFrame を返します。
メソッド呼び出しをチェーンするときは、呼び出しの順序が重要であることに注意してください。各メソッド呼び出しは、変換された DataFrame を返します。後続の呼び出しが変換された DataFrame で機能することを確認します。
たとえば、次のコードでは、 select
メソッドは name
と serial_number
の2つの列のみを含む DataFrame を返します。この DataFrame の filter
メソッド呼び出しは、変換された DataFrame にない id
列を使用しているため失敗します。
// This fails with the error "invalid identifier 'ID'."
DataFrame dfProductInfo = session.table("sample_product_data").select(Functions.col("name"), Functions.col("serial_number")).filter(Functions.col("id").equal_to(Functions.lit(1)));
dfProductInfo.show();
対照的に、次のコードは、 sample_product_data
テーブルのすべての列(id
列を含む)を含む DataFrame で filter()
メソッドが呼び出されるため、正常に実行されます。
// This succeeds because the DataFrame returned by the table() method
// includes the "id" column.
DataFrame dfProductInfo = session.table("sample_product_data").filter(Functions.col("id").equal_to(Functions.lit(1))).select(Functions.col("name"), Functions.col("serial_number"));
dfProductInfo.show();
SQL ステートメントで同等のキーワード(SELECT および WHERE)を使用する場合とは異なる順序で、 select
および filter
メソッドの呼び出しが必要になる場合があることに注意してください。
DataFrame の行数制限¶
DataFrame の行数を制限するには、 limit 変換メソッドを使用できます。
Snowpark API は、限られた数の行を取得して出力するためのアクションメソッドも提供します。
これらのメソッドは、実行される SQL ステートメントに LIMIT 句を効果的に追加します。
LIMIT の使用上の注意 で説明されているように、 LIMIT と組み合わせて並べ替え順序(ORDER BY)を指定する場合を除き、結果は非決定的です。
ORDER BY 句を LIMIT 句と一緒に保持するには(例: ORDER BY が別のサブクエリにないように)、 sort
メソッドによって返される DataFrame の結果を制限するメソッドを呼び出す必要があります。
たとえば、 メソッドの呼び出しをチェーン する場合、
DataFrame df = session.table("sample_product_data");
// Limit the number of rows to 5, sorted by parent_id.
DataFrame dfSubset = df.sort(Functions.col("parent_id")).limit(5);
// Return the first 5 rows, sorted by parent_id.
Row[] arrayOfRows = df.sort(Functions.col("parent_id")).first(5);
// Print the first 5 rows, sorted by parent_id.
df.sort(Functions.col("parent_id")).show(5);
列定義の取得¶
DataFrame のためにデータセット内の列の定義を取得するには、 schema
メソッドを呼び出します。このメソッドは、 StructField
オブジェクトの Array
を含む StructType
オブジェクトを返します。各 StructField
オブジェクトには、列の定義が含まれています。
import com.snowflake.snowpark_java.types.*;
...
// Get the StructType object that describes the columns in the
// underlying rowset.
StructType tableSchema = session.table("sample_product_data").schema();
System.out.println("Schema for sample_product_data: " + tableSchema);
返された StructType
オブジェクトでは、列名は常に正規化されています。引用符で囲まれていない識別子は大文字で返され、引用符で囲まれた識別子は定義された正確な大文字小文字で返されます。
次の例では、 ID
および 3rd
という名前の列を含む DataFrame を作成します。列名 3rd
の場合は、 名前が識別子の要件に準拠していない ため、Snowparkライブラリは名前を自動的に二重引用符("3rd"
)で囲みます。
この例では、 schema
メソッドを呼び出してから、返された StructType
オブジェクトで names
メソッドを呼び出して、列名の配列を取得します。名前は、 schema
メソッドによって返される StructType
で正規化されます。
import java.util.Arrays;
...
// Create a DataFrame containing the "id" and "3rd" columns.
DataFrame dfSelectedColumns = session.table("sample_product_data").select(Functions.col("id"), Functions.col("3rd"));
// Print out the names of the columns in the schema.
System.out.println(Arrays.toString(dfSelectedColumns.schema().names()));
DataFrames の結合¶
DataFrame オブジェクトを結合するには、 join メソッドを呼び出します。
次のセクションでは、 DataFrames を使用して結合を実行する方法について説明します。
結合のサンプルデータの設定¶
次のセクションの例では、次の SQL ステートメントを実行して設定できるサンプルデータを使用しています。
CREATE OR REPLACE TABLE sample_a (
id_a INTEGER,
name_a VARCHAR,
value INTEGER
);
INSERT INTO sample_a (id_a, name_a, value) VALUES
(10, 'A1', 5),
(40, 'A2', 10),
(80, 'A3', 15),
(90, 'A4', 20)
;
CREATE OR REPLACE TABLE sample_b (
id_b INTEGER,
name_b VARCHAR,
id_a INTEGER,
value INTEGER
);
INSERT INTO sample_b (id_b, name_b, id_a, value) VALUES
(4000, 'B1', 40, 10),
(4001, 'B2', 10, 5),
(9000, 'B3', 80, 15),
(9099, 'B4', NULL, 200)
;
CREATE OR REPLACE TABLE sample_c (
id_c INTEGER,
name_c VARCHAR,
id_a INTEGER,
id_b INTEGER
);
INSERT INTO sample_c (id_c, name_c, id_a, id_b) VALUES
(1012, 'C1', 10, NULL),
(1040, 'C2', 40, 4000),
(1041, 'C3', 40, 4001)
;
結合の列の指定¶
DataFrame.join
メソッドを使用すると、次のいずれかの方法で使用する列を指定できます。
結合条件を説明する列式を指定します。
結合の共通列として使用する1つ以上の列を指定します。
次の例では、 id_a
という名前の列で内部結合を実行します。
// Create a DataFrame that joins the DataFrames for the tables
// "sample_a" and "sample_b" on the column named "id_a".
DataFrame dfLhs = session.table("sample_a");
DataFrame dfRhs = session.table("sample_b");
DataFrame dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a").equal_to(dfRhs.col("id_a")));
dfJoined.show();
この例では、 DataFrame.col
メソッドを使用して、結合で使用する条件を指定します。この方法の詳細については、 列と式の指定 をご参照ください。
これにより、次が出力されます。
----------------------------------------------------------------------
|"ID_A" |"NAME_A" |"VALUE" |"ID_B" |"NAME_B" |"ID_A" |"VALUE" |
----------------------------------------------------------------------
|10 |A1 |5 |4001 |B2 |10 |5 |
|40 |A2 |10 |4000 |B1 |40 |10 |
|80 |A3 |15 |9000 |B3 |80 |15 |
----------------------------------------------------------------------
結合の結果で重複する同一の列名¶
結合の結果の DataFrame では、テーブル間で列名が同一であっても、Snowparkライブラリは結合されたテーブルで見つかった列名を使用します。これが発生すると、これらの列名は、結合の結果の DataFrame で重複します。重複する列に名前でアクセスするには、列の元のテーブルを表す DataFrame で col
メソッドを呼び出します。(列の指定の詳細については、 異なる DataFrames の列の参照 をご参照ください。)
次の例のコードは、2つの DataFrames を結合し、結合された DataFrame で select
メソッドを呼び出します。それぞれの DataFrame オブジェクト(dfRhs
および dfLhs
)を表す変数から col
メソッドを呼び出して、選択する列を指定します。 as
メソッドを使用して、 select
メソッドが作成する DataFrame 内の列に新しい名前を付けます。
DataFrame dfLhs = session.table("sample_a");
DataFrame dfRhs = session.table("sample_b");
DataFrame dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a").equal_to(dfRhs.col("id_a")));
DataFrame dfSelected = dfJoined.select(dfLhs.col("value").as("LeftValue"), dfRhs.col("value").as("RightValue"));
dfSelected.show();
これにより、次が出力されます。
------------------------------
|"LEFTVALUE" |"RIGHTVALUE" |
------------------------------
|5 |5 |
|10 |10 |
|15 |15 |
------------------------------
保存またはキャッシュする前に重複する列¶
結合の結果の DataFrame に重複する列名が含まれる場合は、結果をテーブルに保存するか DataFrame をキャッシュする前に、重複を排除するか列の名前を変更して DataFrame の重複をなくす必要があります。テーブルまたはキャッシュに保存する DataFrame 内の重複する列名の場合、Snowparkライブラリは、重複しないようにするために、重複する列名をエイリアスに置き換えます。
次の例は、列名 ID_A
と VALUE
が2つのテーブルからの結合で重複し、結果をキャッシュする前に重複を排除するか名前を変更しない場合に、キャッシュされた DataFrame の出力がどのように表示されるかを示しています。
--------------------------------------------------------------------------------------------------
|"l_ZSz7_ID_A" |"NAME_A" |"l_ZSz7_VALUE" |"ID_B" |"NAME_B" |"r_heec_ID_A" |"r_heec_VALUE" |
--------------------------------------------------------------------------------------------------
|10 |A1 |5 |4001 |B2 |10 |5 |
|40 |A2 |10 |4000 |B1 |40 |10 |
|80 |A3 |15 |9000 |B3 |80 |15 |
--------------------------------------------------------------------------------------------------
自然結合の実行¶
自然結合 (DataFrames は同じ名前の列で結合)を実行するには、 naturalJoin メソッドを呼び出します。
次の例では、テーブル sample_a
と sample_b
の DataFrames を共通の列(列 id_a
)で結合します。
DataFrame dfLhs = session.table("sample_a");
DataFrame dfRhs = session.table("sample_b");
DataFrame dfJoined = dfLhs.naturalJoin(dfRhs);
dfJoined.show();
これにより、次が出力されます。
---------------------------------------------------
|"ID_A" |"VALUE" |"NAME_A" |"ID_B" |"NAME_B" |
---------------------------------------------------
|10 |5 |A1 |4001 |B2 |
|40 |10 |A2 |4000 |B1 |
|80 |15 |A3 |9000 |B3 |
---------------------------------------------------
統合の型の指定¶
デフォルトでは、 DataFrame.join
メソッドは内部結合を作成します。別の型の結合を指定するには、 joinType
引数を次のいずれかの値に設定します。
結合の型 |
|
---|---|
内部結合 |
|
左外部結合 |
|
右外部結合 |
|
完全外部結合 |
|
クロス結合 |
|
例:
// Create a DataFrame that performs a left outer join on
// "sample_a" and "sample_b" on the column named "id_a".
DataFrame dfLhs = session.table("sample_a");
DataFrame dfRhs = session.table("sample_b");
DataFrame dfLeftOuterJoin = dfLhs.join(dfRhs, dfLhs.col("id_a").equal_to(dfRhs.col("id_a")), "left");
dfLeftOuterJoin.show();
これにより、次が出力されます。
----------------------------------------------------------------------
|"ID_A" |"NAME_A" |"VALUE" |"ID_B" |"NAME_B" |"ID_A" |"VALUE" |
----------------------------------------------------------------------
|40 |A2 |10 |4000 |B1 |40 |10 |
|10 |A1 |5 |4001 |B2 |10 |5 |
|80 |A3 |15 |9000 |B3 |80 |15 |
|90 |A4 |20 |NULL |NULL |NULL |NULL |
----------------------------------------------------------------------
複数のテーブルの結合¶
複数のテーブルを結合するには、
テーブルごとに DataFrame を作成します。
最初の DataFrame で
DataFrame.join
メソッドを呼び出し、2番目の DataFrame を渡します。join
メソッドによって返された DataFrame を使用して、join
メソッドを呼び出し、3番目の DataFrame を渡します。
以下に示すように、 join
呼び出しを チェーン することができます。
DataFrame dfFirst = session.table("sample_a");
DataFrame dfSecond = session.table("sample_b");
DataFrame dfThird = session.table("sample_c");
DataFrame dfJoinThreeTables = dfFirst.join(dfSecond, dfFirst.col("id_a").equal_to(dfSecond.col("id_a"))).join(dfThird, dfFirst.col("id_a").equal_to(dfThird.col("id_a")));
dfJoinThreeTables.show();
これにより、次が出力されます。
------------------------------------------------------------------------------------------------------------
|"ID_A" |"NAME_A" |"VALUE" |"ID_B" |"NAME_B" |"ID_A" |"VALUE" |"ID_C" |"NAME_C" |"ID_A" |"ID_B" |
------------------------------------------------------------------------------------------------------------
|10 |A1 |5 |4001 |B2 |10 |5 |1012 |C1 |10 |NULL |
|40 |A2 |10 |4000 |B1 |40 |10 |1040 |C2 |40 |4000 |
|40 |A2 |10 |4000 |B1 |40 |10 |1041 |C3 |40 |4001 |
------------------------------------------------------------------------------------------------------------
自己結合の実行¶
異なる列でテーブルをそれ自体と結合する必要がある場合は、単一の DataFrame で自己結合を実行することはできません。単一の DataFrame を使用して自己結合を実行する次の例は、 "id"
の列式が結合の左側と右側に存在するため、失敗します。
// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
DataFrame df = session.table("sample_product_data");
DataFrame dfJoined = df.join(df, Functions.col("id").equal_to(Functions.col("parent_id")));
// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
DataFrame df = session.table("sample_product_data");
DataFrame dfJoined = df.join(df, df.col("id").equal_to(df.col("parent_id")));
これらの例は両方とも、次の例外を除いて失敗します。
Exception in thread "main" com.snowflake.snowpark_java.SnowparkClientException:
Joining a DataFrame to itself can lead to incorrect results due to ambiguity of column references.
Instead, join this DataFrame to a clone() of itself.
代わりに、 clone メソッドを使用して DataFrame オブジェクトのクローンを作成し、2つの DataFrame オブジェクトを使用して結合を実行します。
// Create a DataFrame object for the "sample_product_data" table for the left-hand side of the join.
DataFrame dfLhs = session.table("sample_product_data");
// Clone the DataFrame object to use as the right-hand side of the join.
DataFrame dfRhs = dfLhs.clone();
// Create a DataFrame that joins the two DataFrames
// for the "sample_product_data" table on the
// "id" and "parent_id" columns.
DataFrame dfJoined = dfLhs.join(dfRhs, dfLhs.col("id").equal_to(dfRhs.col("parent_id")));
dfJoined.show();
同じ列で自己結合を実行する場合は、 USING
句に対して列の名前(または列名の配列)で渡す join
メソッドを呼び出します。
// Create a DataFrame that performs a self-join on a DataFrame
// using the column named "key".
DataFrame df = session.table("sample_product_data");
DataFrame dfJoined = df.join(df, "key");
DataFrame を評価するアクションの実行¶
前述のように、 DataFrame は遅延評価されます。つまり、アクションを実行するまで、 SQL ステートメントは実行のためにサーバーに送信されません。アクションにより、 DataFrame が評価され、対応する SQL ステートメントが実行のためにサーバーに送信されます。
次のセクションでは、 DataFrame で同期的および非同期的にアクションを実行する方法について説明します。
同期的なアクションの実行¶
アクションを同期的に実行するには、次のアクションメソッドのいずれかを呼び出します。
アクションを同期的に実行する方法 |
説明 |
---|---|
|
DataFrame を評価し、結果のデータセットを 行 オブジェクトの |
|
DataFrame を評価し、 Row オブジェクトの |
|
DataFrame を評価し、行数を返します。 |
|
DataFrame を評価し、行をコンソールに出力します。この方法では、行数が10に制限されることに注意してください(デフォルト)。 DataFrame での行の出力 をご参照ください。 |
|
クエリを実行し、仮テーブルを作成して、テーブルに結果を配置します。このメソッドは、この仮テーブルのデータにアクセスするために使用できる |
|
DataFrame のデータを指定したテーブルに保存します。 テーブルへのデータの保存 をご参照ください。 |
|
DataFrame のデータを指定したテーブルにコピーします。 ファイルからテーブルへのデータのコピー をご参照ください。 |
|
指定されたテーブルの行を削除します。 テーブル内の行の更新、削除、およびマージ をご参照ください。 |
|
指定されたテーブルの行を更新します。 テーブル内の行の更新、削除、およびマージ をご参照ください。 |
|
指定されたテーブルに行をマージします。 テーブル内の行の更新、削除、およびマージ をご参照ください。 |
たとえば、クエリを実行して結果の数を返すには、 count
メソッドを呼び出します。
// Create a DataFrame for the "sample_product_data" table.
DataFrame dfProducts = session.table("sample_product_data");
// Send the query to the server for execution and
// print the count of rows in the table.
System.out.println("Rows returned: " + dfProducts.count());
アクションメソッドを呼び出して、次を実行することもできます。
注: DataFrame の列の定義を取得するために schema
メソッドを呼び出す場合は、アクションメソッドを呼び出す必要はありません。
非同期的なアクションの実行¶
注釈
この機能は、Snowpark 0.11.0で導入されました。
アクションを非同期で実行するには、 async
メソッドを呼び出して「非同期アクター」オブジェクト(例: DataFrameAsyncActor
)を返し、そのオブジェクトで非同期アクションメソッドを呼び出します。
非同期アクターオブジェクトのこれらのアクションメソッドは、 TypedAsyncJob
オブジェクトを返します。これを使用して、非同期アクションのステータスを確認し、アクションの結果を取得できます。
次のセクションでは、アクションを非同期で実行して結果を確認する方法について説明します。
非同期アクションの基本的なフローの理解¶
次のメソッドを使用して、アクションを非同期で実行できます。
アクションを非同期的に実行する方法 |
説明 |
---|---|
|
DataFrame を非同期的に評価して、結果のデータセットを 行 オブジェクトの |
|
DataFrame を非同期的に評価して、 Row オブジェクトの |
|
DataFrame を非同期的に評価して、行数を取得します。 |
|
DataFrame のデータを指定したテーブルに非同期的に保存します。 テーブルへのデータの保存 をご参照ください。 |
|
DataFrame のデータを指定したテーブルに非同期的にコピーします。 ファイルからテーブルへのデータのコピー をご参照ください。 |
|
指定されたテーブルの行を非同期的に削除します。 テーブル内の行の更新、削除、およびマージ をご参照ください。 |
|
指定されたテーブルの行を非同期的に更新します。 テーブル内の行の更新、削除、およびマージ をご参照ください。 |
返された TypedAsyncJob オブジェクトから、次の操作を実行できます。
アクションが完了したかどうかを判断するには、
isDone
メソッドを呼び出します。アクションに対応するクエリ ID を取得するには、
getQueryId
メソッドを呼び出します。アクションの結果(例:
collect
メソッドに対するRow
オブジェクトのArray
、またはcount
メソッドの行数)を返すには、getResult
メソッドを呼び出します。getResult
はブロッキング呼び出しであることに注意してください。アクションをキャンセルするには、
cancel
メソッドを呼び出します。
たとえば、クエリを非同期で実行し、結果を Row
オブジェクトの Array
として取得するには、 async().collect()
を呼び出します。
import java.util.Arrays;
// Create a DataFrame with the "id" and "name" columns from the "sample_product_data" table.
// This does not execute the query.
DataFrame df = session.table("sample_product_data").select(Functions.col("id"), Functions.col("name"));
// Execute the query asynchronously.
// This call does not block.
TypedAsyncJob<Row[]> asyncJob = df.async().collect();
// Check if the query has completed execution.
System.out.println("Is query " + asyncJob.getQueryId() + " done? " + asyncJob.isDone());
// Get an Array of Rows containing the results, and print the results.
// Note that getResult is a blocking call.
Row[] results = asyncJob.getResult();
System.out.println(Arrays.toString(results));
クエリを非同期で実行し、結果の数を取得するには、 async().count()
を呼び出します。
// Create a DataFrame for the "sample_product_data" table.
DataFrame dfProducts = session.table("sample_product_data");
// Execute the query asynchronously.
// This call does not block.
TypedAsyncJob<Long> asyncJob = dfProducts.async().count();
// Check if the query has completed execution.
System.out.println("Is query " + asyncJob.getQueryId() + " done? " + asyncJob.isDone());
// Print the count of rows in the table.
// Note that getResult is a blocking call.
System.out.println("Rows returned: " + asyncJob.getResult());
待機する最大秒数の指定¶
getResult
メソッドを呼び出すときは、 maxWaitTimeInSeconds
引数を使用して、クエリが完了するのを待ってから結果を取得するまでの最大秒数を指定できます。例:
// Wait a maximum of 10 seconds for the query to complete before retrieving the results.
Row[] results = asyncJob.getResult(10);
この引数を省略すると、メソッドは Snowparkがリクエストするタイムアウト(秒単位) の構成プロパティで指定された最大秒数で待機します。(これは、 セッションオブジェクトを作成する ときに設定できるプロパティです。)
ID による非同期クエリへのアクセス¶
以前に送信した非同期クエリのクエリ ID がある場合は、 Session.createAsyncJob
メソッドを呼び出して、クエリのステータスの確認、クエリ結果の取得、クエリのキャンセルに使用できる AsyncJob オブジェクトの作成ができます。
TypedAsyncJob
とは異なり、 AsyncJob
は結果を取得するための getResult
メソッドを提供しないことに注意してください。結果を取得する必要がある場合は、代わりに getRows
または getIterator
メソッドを呼び出します。
例:
import java.util.Arrays;
...
AsyncJob asyncJob = session.createAsyncJob(myQueryId);
// Check if the query has completed execution.
System.out.println("Is query " + asyncJob.getQueryId() + " done? " + asyncJob.isDone());
// If you need to retrieve the results, call getRows to return an Array of Rows containing the results.
// Note that getRows is a blocking call.
Row[] rows = asyncJob.getRows();
System.out.println(Arrays.toString(rows));
行の DataFrame への取得¶
DataFrame の変換方法を指定 した後、 アクションメソッドを呼び出して クエリを実行し、結果を返すことができます。 Array
内のすべての行を返すことも、行ごとに結果を反復処理できる Iterator
を返すこともできます。後者の場合、データ量が多いと、大量のデータがメモリにロードされないように、行がチャンクごとにメモリにロードされます。
すべての行を返す¶
すべての行を一度に返すには、 collect メソッドを呼び出します。このメソッドは、 行 オブジェクトの配列を返します。行から値を取得するには、 getType
メソッドを呼び出します(例: getString
、 getInt
など)。
例:
Row[] rows = session.table("sample_product_data").select(Functions.col("name"), Functions.col("category_id")).sort(Functions.col("name")).collect();
for (Row row : rows) {
System.out.println("Name: " + row.getString(0) + "; Category ID: " + row.getInt(1));
}
行の反復子を返す¶
Iterator
を使用して結果の Row オブジェクトを反復処理する場合には、 toLocalIterator を呼び出します。結果のデータ量が多い場合、メソッドは行をチャンクごとにロードして、すべての行を一度にメモリにロードしないようにします。
例:
import java.util.Iterator;
Iterator<Row> rowIterator = session.table("sample_product_data").select(Functions.col("name"), Functions.col("category_id")).sort(Functions.col("name")).toLocalIterator();
while (rowIterator.hasNext()) {
Row row = rowIterator.next();
System.out.println("Name: " + row.getString(0) + "; Category ID: " + row.getInt(1));
}
最初の n
行を返す¶
最初の n
行を返すには、 first メソッドを呼び出し、返す行数を渡します。
DataFrame の行数制限 で説明されているように、結果は非決定論的です。結果を決定論的にしたい場合は、ソートされた DataFrame (df.sort().first()
)でこのメソッドを呼び出します。
例:
import java.util.Arrays;
...
DataFrame df = session.table("sample_product_data");
Row[] rows = df.sort(Functions.col("name")).first(5);
System.out.println(Arrays.toString(rows));
DataFrame での行の出力¶
DataFrame の最初の10行をコンソールに出力するには、 show メソッドを呼び出します。別の行数を出力するには、出力する行数を渡します。
DataFrame の行数制限 で説明されているように、結果は非決定論的です。結果を決定論的にしたい場合は、ソートされた DataFrame (df.sort().show()
)でこのメソッドを呼び出します。
例:
DataFrame df = session.table("sample_product_data");
df.sort(Functions.col("name")).show();
テーブル内の行の更新、削除、およびマージ¶
注釈
この機能は、Snowpark 0.7.0で導入されました。
Session.table
を呼び出してテーブルの DataFrame
オブジェクトを作成すると、メソッドは Updatable
オブジェクトを返します。これは、テーブル内のデータを更新および削除するための追加のメソッドにより、 DataFrame
を拡張します。(Updatable を参照。)
テーブルの行を更新または削除する必要がある場合は、 Updatable
クラスの次のメソッドを使用できます。
update
またはupdateColumn
を呼び出して、テーブル内の既存の行を更新します。 テーブルにある行の更新 をご参照ください。delete
を呼び出して、テーブルから行を削除します。 テーブル内にある行の削除 をご参照ください。merge
を呼び出して、2番目のテーブルまたはサブクエリのデータに基づき、1つのテーブルにある行を挿入、更新、および削除します。(これは、 SQL の MERGE コマンドと同等です。) 行のテーブルへのマージ をご参照ください。
テーブルにある行の更新¶
テーブルの行を更新するには、 update
または updateColumn
メソッドを呼び出し、更新する列とそれらの列に割り当てる対応する値を関連付ける Map
を渡します。
Map
で列名を文字列として指定するには、updateColumn
を呼び出します。Map
でColumn
オブジェクトを指定するには、update
を呼び出します。
どちらのメソッドも、更新された行数を含む UpdateResult
オブジェクトを返します。(UpdateResult を参照。)
注釈
両方のメソッドとも アクションメソッド です。これは、メソッドを呼び出すと、 SQL ステートメントがサーバーに送信されて実行されることを意味します。
たとえば、 count
という名前の列の値を値 1
に置き換え、列名(String
)を対応する値に関連付ける Map
を使用する場合は、 updateColumn
を呼び出します。
import java.util.HashMap;
import java.util.Map;
...
Map<String, Column> assignments = new HashMap<>();
assignments.put("3rd", Functions.lit(1));
Updatable updatableDf = session.table("sample_product_data");
UpdateResult updateResult = updatableDf.updateColumn(assignments);
System.out.println("Number of rows updated: " + updateResult.getRowsUpdated());
Map
で Column
オブジェクトを使用して更新する列を識別する場合は、 update
を呼び出します。
import java.util.HashMap;
import java.util.Map;
...
Map<Column, Column> assignments = new HashMap<>();
assignments.put(Functions.col("3rd"), Functions.lit(1));
Updatable updatableDf = session.table("sample_product_data");
UpdateResult updateResult = updatableDf.update(assignments);
System.out.println("Number of rows updated: " + updateResult.getRowsUpdated());
条件が満たされたときにのみ更新する必要がある場合は、その条件を引数として指定できます。たとえば、 category_id
列の値が 20
である行の count
という名前の列の値を 2
に置き換えるには、次のようにします。
import java.util.HashMap;
import java.util.Map;
...
Map<Column, Column> assignments = new HashMap<>();
assignments.put(Functions.col("3rd"), Functions.lit(2));
Updatable updatableDf = session.table("sample_product_data");
UpdateResult updateResult = updatableDf.update(assignments, Functions.col("category_id").equal_to(Functions.lit(20)));
System.out.println("Number of rows updated: " + updateResult.getRowsUpdated());
別の DataFrame
オブジェクトとの結合に基づいて条件を作成する必要がある場合は、その DataFrame
を引数として渡し、その DataFrame
を条件で使用できます。たとえば、 category_id
列が DataFrame
dfParts
の category_id
と一致する行の count
という名前の列の値を 3
に置き換えるには、次のようにします。
import java.util.HashMap;
import java.util.Map;
...
Map<Column, Column> assignments = new HashMap<>();
assignments.put(Functions.col("3rd"), Functions.lit(3));
Updatable updatableDf = session.table("sample_product_data");
DataFrame dfParts = session.table("parts");
UpdateResult updateResult = updatableDf.update(assignments, updatableDf.col("category_id").equal_to(dfParts.col("category_id")), dfParts);
System.out.println("Number of rows updated: " + updateResult.getRowsUpdated());
テーブル内にある行の削除¶
delete
メソッドの場合は、削除する行を識別する条件を指定でき、その条件は別の DataFrame との結合に基づくことができます。 delete
は、削除された行数を含む DeleteResult
オブジェクトを返します。(DeleteResult を参照。)
注釈
delete
は、 アクションメソッド です。これは、メソッドを呼び出すと、 SQL ステートメントがサーバーに送信されて実行されることを意味します。
たとえば、 category_id
列の値が 1
の行を削除するには、次を実行します。
Updatable updatableDf = session.table("sample_product_data");
DeleteResult deleteResult = updatableDf.delete(updatableDf.col("category_id").equal_to(Functions.lit(1)));
System.out.println("Number of rows deleted: " + deleteResult.getRowsDeleted());
条件が別の DataFrame の列を参照している場合は、その DataFrame を2番目の引数として渡します。たとえば、 category_id
列が DataFrame
dfParts
の category_id
と一致する行を削除するには、 dfParts
を2番目の引数として渡します。
Updatable updatableDf = session.table("sample_product_data");
DeleteResult deleteResult = updatableDf.delete(updatableDf.col("category_id").equal_to(dfParts.col("category_id")), dfParts);
System.out.println("Number of rows deleted: " + deleteResult.getRowsDeleted());
行のテーブルへのマージ¶
2番目のテーブルまたはサブクエリの値に基づいて1つのテーブルの行を挿入、更新、および削除するには(SQL の MERGE コマンドに相当)、次の手順を実行します。
データをマージするテーブルの
Updatable
オブジェクトで、merge
メソッドを呼び出し、他のテーブルのDataFrame
オブジェクトと、結合条件の列式を渡します。これにより、一致する行と一致しない行に対して実行するアクション(例: 挿入、更新、削除)を指定するために使用できる、
MergeBuilder
オブジェクトが返されます。(MergeBuilder を参照。)MergeBuilder
オブジェクトの使用。一致する行に対して実行する必要がある更新または削除を指定するには、
whenMatched
メソッドを呼び出します。行の更新または削除が必要なときに追加の条件を指定する必要がある場合は、その条件の列式を渡すことができます。
このメソッドは、実行するアクションを指定するために使用できる
MatchedClauseBuilder
オブジェクトを返します。(MatchedClauseBuilder を参照。)MatchedClauseBuilder
オブジェクトのupdate
またはdelete
メソッドを呼び出して、一致する行に対して実行する必要がある更新または削除アクションを指定します。これらのメソッドは、追加の句を指定するために使用できるMergeBuilder
オブジェクトを返します。行が一致しない場合に実行する必要のある挿入を指定するには、
whenNotMatched
メソッドを呼び出します。行を挿入する必要があるときに追加の条件を指定する必要がある場合は、その条件の列式を渡すことができます。
このメソッドは、実行するアクションを指定するために使用できる
NotMatchedClauseBuilder
オブジェクトを返します。(NotMatchedClauseBuilder を参照。)NotMatchedClauseBuilder
オブジェクトのinsert
メソッドを呼び出して、行が一致しない場合に実行する必要のある挿入アクションを指定します。これらのメソッドは、追加の句を指定するために使用できるMergeBuilder
オブジェクトを返します。
実行する必要のある挿入、更新、および削除の指定が完了したら、
MergeBuilder
オブジェクトのcollect
メソッドを呼び出して、指定した挿入、更新、および削除をテーブルで実行します。collect
は、挿入、更新、および削除された行数を含むMergeResult
オブジェクトを返します。(MergeResult を参照。)
次の例では、 target
テーブルに一致する ID の行が含まれていない場合に、 source
テーブルの id
列と value
列の行を target
テーブルに挿入します。
MergeResult mergeResult = target.merge(source, target.col("id").equal_to(source.col("id")))
.whenNotMatched().insert([source.col("id"), source.col("value")])
.collect();
次の例では、同じ ID を持つ source
テーブルにある行の value
列の値で、 target
テーブルにある行を更新します。
import java.util.HashMap;
import java.util.Map;
...
Map<String, Column> assignments = new HashMap<>();
assignments.put("value", source.col("value"));
MergeResult mergeResult = target.merge(source, target.col("id").equal_to(source.col("id")))
.whenMatched().update(assignments)
.collect();
テーブルへのデータの保存¶
DataFrame の内容を新規または既存のテーブルに保存できます。これには、次の権限が必要です。
テーブルが存在しない場合は、スキーマに対する CREATE TABLE 権限。
テーブルに対する INSERT 権限。
DataFrame の内容をテーブルに保存するには、
DataFrame の write メソッドを呼び出し、 DataFrameWriter オブジェクトを取得します。
DataFrameWriter
オブジェクトの mode メソッドを呼び出し、テーブルへの書き込みの設定を指定する SaveMode オブジェクトを渡します。行を挿入するには、
SaveMode.Append
を渡します。既存のテーブルを上書きするには、
SaveMode.Overwrite
を渡します。
このメソッドは、指定されたモードで構成された同じ
DataFrameWriter
オブジェクトを返します。既存のテーブル(
SaveMode.Append
)に行を挿入していて、 DataFrame の列名がテーブルの列名と一致する場合は、 DataFrameWriter.option を呼び出し、"columnOrder"
と"name"
を引数として渡します。注釈
このメソッドはSnowpark 1.4.0で導入されました。
デフォルトでは、
columnOrder
オプションは"index"
に設定されています。これは、DataFrameWriter
が列の表示される順序で値を挿入することを意味します。たとえば、DataFrameWriter
は、テーブルの最初の列にある DataFrame から最初の列の値を挿入し、テーブルの2番目の列にある DataFrame から2番目の列の値を挿入します。このメソッドは、指定されたオプションで構成された同じ
DataFrameWriter
オブジェクトを返します。DataFrameWriter
オブジェクトの saveAsTable メソッドを呼び出し、 DataFrame の内容を指定されたテーブルに保存します。データをテーブルに保存する SQL ステートメントを実行するために、別のメソッド(例:
collect
)を呼び出す必要はありません。saveAsTable
は、 SQL ステートメントを実行する アクションメソッド です。
次の例では、既存のテーブル(tableName
変数で識別される)を DataFrame df
のコンテンツで上書きします。
df.write().mode(SaveMode.Overwrite).saveAsTable(tableName);
次の例では、 DataFrame df
から既存のテーブル(tableName
変数で識別される)に行を挿入します。この例では、テーブルと DataFrame の両方に列 c1
と c2
が含まれています。
この例では、 columnOrder
オプションを "name"
(DataFrame 列と同じ名前の値をテーブル列に挿入する)に設定することと、デフォルトの columnOrder
オプション(DataFrame の列の順序に基づいて、テーブル列に値を挿入する)を使用することの違いを示しています。
DataFrame df = session.sql("SELECT 1 AS c2, 2 as c1");
// With the columnOrder option set to "name", the DataFrameWriter uses the column names
// and inserts a row with the values (2, 1).
df.write().mode(SaveMode.Append).option("columnOrder", "name").saveAsTable(tableName);
// With the default value of the columnOrder option ("index"), the DataFrameWriter uses the column positions
// and inserts a row with the values (1, 2).
df.write().mode(SaveMode.Append).saveAsTable(tableName);
DataFrame からのビューの作成¶
DataFrame からビューを作成するには、 createOrReplaceView メソッドを呼び出します。
df.createOrReplaceView("db.schema.viewName");
createOrReplaceView
を呼び出すと、ただちに新しいビューが作成されることに注意してください。さらに重要なことに、 DataFrame が評価されることはありません。(アクションを実行 するまで、 DataFrame 自体は評価されません。)
createOrReplaceView
を呼び出して作成したビューは永続的です。そのビューが不要になった場合は、 ビューを手動でドロップ できます。
セッション専用の仮ビューを作成する必要がある場合は、代わりに createOrReplaceTempView メソッドを呼び出します。
df.createOrReplaceTempView("db.schema.viewName");
DataFrame のキャッシング¶
場合によっては、複雑なクエリを実行し、(同じクエリを再度実行するのではなく)後続の操作で使用するために結果を保持する必要があります。このような場合は、 cacheResult メソッドを呼び出すと、 DataFrame の内容をキャッシュできます。
このメソッドは、
クエリを実行します。
cacheResult
を呼び出す前に、 結果を取得するための別個のアクションメソッドを呼び出す 必要はありません。cacheResult
は、クエリを実行するアクションメソッドです。結果を仮テーブルに保存します
cacheResult
は、仮テーブルを作成するため、使用中のスキーマに対する CREATE TABLE 権限が必要です。仮テーブルの結果へのアクセスを提供する、 HasCachedResult オブジェクトを返します。
HasCachedResult
は、DataFrame
を拡張するため、このキャッシュされたデータに対して、 DataFrame で実行できるのと同じ操作のいくつかを実行できます。
注釈
cacheResult
はクエリを実行して結果をテーブルに保存するため、このメソッドでは計算とストレージのコストが増加する可能性があります。
例:
// Set up a DataFrame to query a table.
DataFrame df = session.table("sample_product_data").filter(Functions.col("category_id").gt(Functions.lit(10)));
// Retrieve the results and cache the data.
HasCachedResult cachedDf = df.cacheResult();
// Create a DataFrame containing a subset of the cached data.
DataFrame dfSubset = cachedDf.filter(Functions.col("category_id").equal_to(Functions.lit(20))).select(Functions.col("name"), Functions.col("category_id"));
dfSubset.show();
このメソッドを呼び出しても、元の DataFrame は影響を受けないことに注意してください。たとえば、 dfTable
がテーブル sample_product_data
の DataFrame であるとします。
HasCachedResult dfTempTable = dfTable.cacheResult();
cacheResult
を呼び出した後も、 dfTable
は sample_product_data
テーブルを指しており、引き続き dfTable
を使用してそのテーブルをクエリおよび更新できます。
一時テーブルにキャッシュされたデータを使用するには、 dfTempTable
(cacheResult
によって返される HasCachedResult
オブジェクト)を使用します。
ステージでのファイルの操作¶
Snowparkライブラリは、ステージにあるファイルを使用して、 Snowflakeにデータをロード し、 Snowflake からデータをアンロードするために使用できるクラスとメソッドを提供します。
注釈
これらのクラスとメソッドをステージで使用するには、 ステージを操作するための権限 が必要です。
次のセクションでは、これらのクラスとメソッドの使用方法について説明します。
ステージでのファイルのアップロードとダウンロード¶
ステージでファイルをアップロードおよびダウンロードするには、 FileOperation オブジェクトの put
メソッドと get
メソッドを使用します。
ステージへのファイルのアップロード¶
ステージにファイルをアップロードするには、
ファイルをステージにアップロードするための権限 があることを確認します。
Session
オブジェクトの file メソッドを使用して、セッションの FileOperation オブジェクトにアクセスします。FileOperation
オブジェクトの put メソッドを呼び出して、ファイルをステージにアップロードします。このメソッドは、 SQL PUT コマンドを実行します。
PUT コマンドに オプションのパラメーター を指定するには、パラメーターと値の
Map
を作成し、Map
をoptions
引数として渡します。例:import java.util.HashMap; import java.util.Map; ... // Upload a file to a stage without compressing the file. Map<String, String> putOptions = new HashMap<>(); putOptions.put("AUTO_COMPRESS", "FALSE"); PutResult[] putResults = session.file().put("file:///tmp/myfile.csv", "@myStage", putOptions);
localFileName
引数では、ワイルドカード(*
および?
)を使用して、アップロードするファイルのセットを識別できます。例:// Upload the CSV files in /tmp with names that start with "file". // You can use the wildcard characters "*" and "?" to match multiple files. PutResult[] putResults = session.file().put("file:///tmp/file*.csv", "@myStage/prefix2")
put
メソッドによって返された PutResult オブジェクトのArray
をチェックして、ファイルが正常にアップロードされたかどうかを確認します。たとえば、そのファイルのファイル名と PUT 操作のステータスを出力するには、// Print the filename and the status of the PUT operation. for (PutResult result : putResults) { System.out.println(result.getSourceFileName() + ": " + result.getStatus()); }
ステージからのファイルのダウンロード¶
ステージからファイルをダウンロードするには、
ファイルをステージからダウンロードするための権限 があることを確認します。
Session
オブジェクトの file メソッドを使用して、セッションの FileOperation オブジェクトにアクセスします。FileOperation
オブジェクトの get メソッドを呼び出して、ステージからファイルをダウンロードします。このメソッドは、 SQL GET コマンドを実行します。
GET コマンドに オプションのパラメーター を指定するには、パラメーターと値の
Map
を作成し、Map
をoptions
引数として渡します。例:import java.util.HashMap; import java.util.Map; ... // Upload a file to a stage without compressing the file. // Download files with names that match a regular expression pattern. Map<String, String> getOptions = new HashMap<>(); getOptions.put("PATTERN", "'.*file_.*.csv.gz'"); GetResult[] getResults = session.file().get("@myStage", "file:///tmp", getOptions);
get
メソッドによって返された GetResult オブジェクトのArray
をチェックして、ファイルが正常にダウンロードされたかどうかを確認します。たとえば、そのファイルのファイル名と GET 操作のステータスを出力するには、// Print the filename and the status of the GET operation. for (GetResult result : getResults) { System.out.println(result.getFileName() + ": " + result.getStatus()); }
ステージにデータをアップロードおよびダウンロードする際の入力ストリームの使用¶
注釈
この機能は、Snowpark 1.4.0で導入されました。
入力ストリームを使用してステージ上のファイルにデータをアップロードし、ステージ上のファイルからデータをダウンロードするには、 FileOperation オブジェクトの uploadStream
メソッドと downloadStream
メソッドを使用します。
ステージ上のファイルにデータをアップロードする際の入力ストリームの使用¶
java.io.InputStream オブジェクトからステージ上のファイルにデータをアップロードするには、
ファイルをステージにアップロードするための権限 があることを確認します。
Session
オブジェクトの file メソッドを使用して、セッションの FileOperation オブジェクトにアクセスします。FileOperation
オブジェクトの uploadStream メソッドを呼び出します。データを書き込むステージ上のファイルと
InputStream
オブジェクトへの完全なパスを渡します。さらに、compress
引数を使用して、データをアップロードする前にデータを圧縮するかどうかを指定します。
例:
import java.io.InputStream;
...
boolean compressData = true;
String pathToFileOnStage = "@myStage/path/file";
session.file().uploadStream(pathToFileOnStage, new ByteArrayInputStream(fileContent.getBytes()), compressData);
ステージ上のファイルからデータをダウンロードする際の入力ストリームの使用¶
ステージ上のファイルから java.io.InputStream オブジェクトにデータをダウンロードするには、
ファイルをステージからダウンロードするための権限 があることを確認します。
Session
オブジェクトの file メソッドを使用して、セッションの FileOperation オブジェクトにアクセスします。FileOperation
オブジェクトの downloadStream メソッドを呼び出します。ダウンロードするデータを含むステージ上のファイルへの完全なパスを渡します。
decompress
引数を使用して、ファイル内のデータを圧縮するかどうかを指定します。
例:
import java.io.InputStream;
...
boolean isDataCompressed = true;
String pathToFileOnStage = "@myStage/path/file";
InputStream is = session.file().downloadStream(pathToFileOnStage, isDataCompressed);
ステージ内におけるファイルの DataFrame の設定¶
このセクションでは、Snowflakeステージでファイルの DataFrame を設定する方法について説明します。この DataFrame を作成すると、 DataFrame を使用して次のことができます。
Snowflakeステージのファイルに DataFrame を設定するには、 DataFrameReader
クラスを使用します。
次の権限があることを確認します。
次のいずれかを使用します。
ステージングされたファイルからデータをコピーする方法を決定する コピーオプション を指定する場合は、スキーマに対する CREATE TABLE 権限。
それ以外では、スキーマに対する CREATE FILE FORMAT 権限。
Session
クラスのread
メソッドを呼び出して、DataFrameReader
オブジェクトにアクセスします。ファイルが CSV 形式の場合は、ファイルのフィールドを記述します。これを実行するには、
ファイル内のフィールドを記述する StructField オブジェクトの配列で構成される StructType オブジェクトを作成します。
StructField
オブジェクトごとに、以下を指定します。フィールドの名前。
フィールドのデータ型(
com.snowflake.snowpark_java.types
パッケージでオブジェクトとして指定)。フィールドがNULL可能かどうか。
例:
import com.snowflake.snowpark_java.types.*; ... StructType schemaForDataFile = StructType.create( new StructField("id", DataTypes.StringType, true), new StructField("name", DataTypes.StringType, true));
DataFrameReader
オブジェクトでschema
メソッドを呼び出し、StructType
オブジェクトを渡します。例:
DataFrameReader dfReader = session.read().schema(schemaForDataFile);
schema
メソッドは、指定されたフィールドを含むファイルを読み取るように構成されたDataFrameReader
オブジェクトを返します。他の形式(JSON など)のファイルでは、これを実行する必要はありません。これらのファイルの場合、
DataFrameReader
は、データをフィールド名$1
の VARIANT 型の単一フィールドとして扱います。
データの読み取り方法に関する追加情報を指定する必要がある(例: データが圧縮されているか、 CSV ファイルがフィールドを区切るためにコンマではなくセミコロンを使用している)場合は、 DataFrameReader.option メソッドまたは DataFrameReader.options メソッドを呼び出します。
設定するオプションの名前と値を渡します。次のタイプのオプションを設定できます。
CREATE FILE FORMAT のドキュメント で説明されている ファイル形式オプション。
COPY INTO TABLE ドキュメント で説明されている コピーオプション。
コピーオプションを設定すると、 DataFrame にデータを取得 するときに、よりコストのかかる実行戦略の実行される可能性があることに注意してください。
次の例では、圧縮されておらず、フィールド区切り文字にセミコロンを使用する CSV ファイル内のデータをクエリするように、
DataFrameReader
オブジェクトを設定します。dfReader = dfReader.option("field_delimiter", ";").option("COMPRESSION", "NONE");
option
メソッドは、指定されたオプションで構成されたDataFrameReader
オブジェクトを返します。複数のオプションを設定するには、
option
メソッドに 呼び出しをチェーン する(上記の例に示すように)か、 DataFrameReader.options メソッドを呼び出し、オプションの名前と値のMap
で渡します。ファイルの形式に対応するメソッドを呼び出します。次のいずれかのメソッドを呼び出すことができます。
これらのメソッドを呼び出すときは、読み取るファイルのステージ位置を渡します。例:
DataFrame df = dfReader.csv("@mystage/myfile.csv");
同じプレフィックスで始まる複数のファイルを指定するには、ステージ名の後にプレフィックスを指定します。たとえば、ステージ
@mystage
からプレフィックスcsv_
を持つファイルをロードするには、DataFrame df = dfReader.csv("@mystage/csv_");
ファイルの形式に対応するメソッドは、そのファイルの CopyableDataFrame オブジェクトを返します。
CopyableDataFrame
はDataFrame
を拡張し、ステージングされたファイルにあるデータを処理するための追加のメソッドを提供します。アクションメソッドを呼び出して、次を実行します。
テーブルの DataFrames の場合と同様、データは、 アクションメソッド を呼び出すまで DataFrame に取得されません。
ファイルから DataFrame へのデータのロード¶
ステージのファイルに DataFrame を設定 した後、ファイルから DataFrame にデータをロードできます。
DataFrame オブジェクトメソッドを使用して、データセットで必要な 変換を実行 します(例: 特定のフィールドの選択、行のフィルタリングなど)。
たとえば、
mystage
という名前のステージにあるdata.json
という名前の JSON ファイルからcolor
要素を抽出するには、DataFrame df = session.read().json("@mystage/data.json").select(Functions.col("$1").subField("color"));
前に説明したように、 CSV 以外の形式のファイル(例: JSON)の場合、
DataFrameReader
は、ファイル内のデータを$1
という名前の単一の VARIANT 列として扱います。DataFrame.collect
メソッドを呼び出してデータをロードします。例:Row[] results = df.collect();
ファイルからテーブルへのデータのコピー¶
ステージのファイルに DataFrame を設定 した後、 copyInto メソッドを呼び出してデータをテーブルにコピーできます。このメソッドは COPY INTO <テーブル> コマンドを実行します。
注釈
copyInto
を呼び出す前に collect
メソッドを呼び出す必要はありません。 copyInto
を呼び出す前に、ファイルのデータが DataFrame にある必要はありません。
たとえば、次のコードは、 myFileStage
で指定された CSV ファイルからテーブル mytable
にデータをロードします。データは CSV ファイルにあるため、コードは、合わせて ファイルのフィールドを記述している 必要があります。この例では、 DataFrameReader
オブジェクトの schema メソッドを呼び出し、フィールドを記述する StructField オブジェクトの配列を含む StructType オブジェクト(schemaForDataFile
)を渡すことにより、これを実行します。
CopyableDataFrame copyableDf = session.read().schema(schemaForDataFile).csv("@mystage/myfile.csv");
copyableDf.copyInto("mytable");
ステージにあるファイルへの DataFrame の保存¶
注釈
この機能は、Snowpark 1.5.0で導入されました。
ステージ上のファイルに DataFrame を保存する必要がある場合は、ファイルの形式に対応する DataFrameWriter メソッド(例: CSV ファイルに書き込む csv
メソッド)を呼び出して、ファイルを保存する必要のあるステージ位置に渡します。これらの DataFrameWriter
メソッドは、 COPY INTO <場所> コマンドを実行します。
注釈
これらの DataFrameWriter
メソッドを呼び出す前に collect
メソッドを呼び出す必要はありません。これらのメソッドを呼び出す前に、ファイルのデータが DataFrame にある必要はありません。
DataFrame の内容をステージ上のファイルに保存するには、
DataFrame の write メソッドを呼び出し、 DataFrameWriter オブジェクトを取得します。たとえば、
sample_product_data
という名前のテーブルを表す DataFrame のDataFrameWriter
オブジェクトを取得するには、次のようにします。DataFrameWriter dfWriter = session.table("sample_product_data").write();
ファイルの内容を上書きする場合(ファイルが存在する場合)は、
DataFrameWriter
オブジェクトの mode メソッドを呼び出し、SaveMode.Overwrite
を渡します。それ以外でステージ上の指定されたファイルがすでに存在する場合、デフォルトで、
DataFrameWriter
はエラーを報告します。mode
メソッドは、指定されたモードで構成された同じDataFrameWriter
オブジェクトを返します。たとえば、
DataFrameWriter
がステージ上のファイルを上書きするように指定するには、次のようにします。dfWriter = dfWriter.mode(SaveMode.Overwrite);
データの保存方法に関する追加情報を指定する必要がある場合(たとえば、データを圧縮する必要がある場合や、セミコロンを使用してCSVファイルのフィールドを区切る場合)は、 DataFrameWriter.option メソッドまたは DataFrameWriter.options メソッドを呼び出します。
設定するオプションの名前と値を渡します。次のタイプのオプションを設定できます。
COPY INTO <場所> のドキュメント で説明されている ファイル形式オプション。
COPY INTO <場所> のドキュメントで説明されている コピーオプション。
次のオプションの設定には、
option
メソッドは使用できないことに注意してください。TYPE 形式タイプオプション。
OVERWRITE コピーオプションこのオプションを設定するには、代わりに
mode
メソッドを呼び出します(前のステップで説明のとおり)。
次の例では、フィールド区切り文字としてセミコロン(コンマではなく)を使用して、データを非圧縮形式で CSV ファイルに保存するように
DataFrameWriter
オブジェクトを設定します。dfWriter = dfWriter.option("field_delimiter", ";").option("COMPRESSION", "NONE");
option
メソッドは、指定されたオプションで構成されたDataFrameWriter
オブジェクトを返します。複数のオプションを設定するには、
option
メソッドに 呼び出しをチェーン する(上記の例に示すように)か、 DataFrameWriter.options メソッドを呼び出し、オプションの名前と値のMap
で渡します。保存された各ファイルの詳細を返すには、
DETAILED_OUTPUT
コピーオプション をTRUE
に設定します。デフォルトでは、
DETAILED_OUTPUT
はFALSE
です。これは、メソッドがフィールド"rows_unloaded"
、"input_bytes"
、および"output_bytes"
を含む単一行の出力を返すことを意味します。DETAILED_OUTPUT
をTRUE
に設定すると、メソッドは保存されたファイルごとに出力の行を返します。各行には、フィールドFILE_NAME
、FILE_SIZE
、およびROW_COUNT
が含まれています。ファイルの形式に対応するメソッドを呼び出して、データをファイルに保存します。次のいずれかのメソッドを呼び出すことができます。
これらのメソッドを呼び出すときは、データを書き込む必要のあるファイルのステージ位置を渡します(例:
@mystage
)。デフォルトでは、このメソッドはプレフィックスが
data_
のファイル名にデータを保存します(例:@mystage/data_0_0_0.csv
)。ファイルに別のプレフィックスを付けて名前を付ける場合は、ステージ名の後にプレフィックスを指定します。例:WriteFileResult writeFileResult = dfWriter.csv("@mystage/saved_data");
この例では、 DataFrame の内容をプレフィックス
saved_data
で始まるファイル(例:@mystage/saved_data_0_0_0.csv
)に保存します。ファイルに書き込まれるデータ量に関する情報については、返された WriteFileResult オブジェクトを確認します。
WriteFileResult
オブジェクトから、 COPY INTO <場所> コマンドによって生成された出力にアクセスできます。Row オブジェクトの配列として出力の行にアクセスするには、
getRows
メソッドを呼び出します。行に存在するフィールドを判別するには、
getSchema
メソッドを呼び出します。このメソッドは、行のフィールドを記述する StructType を返します。
たとえば、出力行のフィールドと値の名前を出力するには、次のようにします。
WriteFileResult writeFileResult = dfWriter.csv("@mystage/saved_data"); Row[] rows = writeFileResult.getRows(); StructType schema = writeFileResult.getSchema(); for (int i = 0 ; i < rows.length ; i++) { System.out.println("Row:" + i); Row row = rows[i]; for (int j = 0; j < schema.size(); j++) { System.out.println(schema.get(j).name() + ": " + row.get(j)); } }
次の例では、 DataFrame を使用して、 car_sales
という名前のテーブルの内容をステージ @mystage
でプレフィックス saved_data
が付いた JSON ファイルに保存します(例: @mystage/saved_data_0_0_0.json
)。サンプルコード:
ファイルがステージ上にすでに存在する場合は、ファイルを上書きします。
保存操作に関する詳細な出力を返します。
データを非圧縮で保存します。
最後に、サンプルコードは、返された出力行の各フィールドと値を出力します。
DataFrame df = session.table("car_sales");
WriteFileResult writeFileResult = df.write().mode(SaveMode.Overwrite).option("DETAILED_OUTPUT", "TRUE").option("compression", "none").json("@mystage/saved_data");
Row[] rows = writeFileResult.getRows();
StructType schema = writeFileResult.getSchema();
for (int i = 0 ; i < rows.length ; i++) {
System.out.println("Row:" + i);
Row row = rows[i];
for (int j = 0; j < schema.size(); j++) {
System.out.println(schema.get(j).name() + ": " + row.get(j));
}
}
半構造化データの操作¶
DataFrame を使用すると、 半構造化データ (例: JSON データ)へのクエリとアクセスができます。次のセクションでは、 DataFrame 内の半構造化データの操作方法について説明します。
注釈
これらのセクションの例では、 例で使用されるサンプルデータ のサンプルデータを使用しています。
半構造化データの走査¶
半構造化データの特定のフィールドまたは要素を参照するには、 Column オブジェクトの次のメソッドを使用します。
subField("<フィールド名>") を使用して、 OBJECT (または OBJECT を含む VARIANT)のフィールドの
Column
オブジェクトを返します。subField(<インデックス>) を使用して、 ARRAY (または ARRAY を含む VARIANT)の要素の
Column
オブジェクトを返します。
注釈
パス内のフィールド名または要素が不規則であり、 Column.apply
メソッドの使用が困難な場合は、 Functions.get、 Functions.get_ignore_case、または Functions.get_path を代替として使用できます。
たとえば、次のコードは、 サンプルデータ の src
列にあるオブジェクトの dealership
フィールドを選択します。
DataFrame df = session.table("car_sales");
df.select(Functions.col("src").subField("dealership")).show();
このコードは、次を出力します。
----------------------------
|"""SRC""['DEALERSHIP']" |
----------------------------
|"Valley View Auto Sales" |
|"Tindel Toyota" |
----------------------------
注釈
DataFrame の値は文字列リテラルとして返されるため、二重引用符で囲まれています。これらの値を特定の型にキャストするには、 半構造化データへの明示的な値のキャスト をご参照ください。
メソッド呼び出しのチェーン を使用して、特定のフィールドまたは要素へのパスを走査することもできます。
たとえば、次のコードは salesperson
オブジェクトの name
フィールドを選択します。
DataFrame df = session.table("car_sales");
df.select(Functions.col("src").subField("salesperson").subField("name")).show();
このコードは、次を出力します。
------------------------------------
|"""SRC""['SALESPERSON']['NAME']" |
------------------------------------
|"Frank Beasley" |
|"Greg Northrup" |
------------------------------------
別の例として、次のコードは、車両の配列を保持する vehicle
フィールドの最初の要素を選択します。この例では、最初の要素から price
フィールドも選択しています。
DataFrame df = session.table("car_sales");
df.select(Functions.col("src").subField("vehicle").subField(0)).show();
df.select(Functions.col("src").subField("vehicle").subField(0).subField("price")).show();
このコードは、次を出力します。
---------------------------
|"""SRC""['VEHICLE'][0]" |
---------------------------
|{ |
| "extras": [ |
| "ext warranty", |
| "paint protection" |
| ], |
| "make": "Honda", |
| "model": "Civic", |
| "price": "20275", |
| "year": "2017" |
|} |
|{ |
| "extras": [ |
| "ext warranty", |
| "rust proofing", |
| "fabric protection" |
| ], |
| "make": "Toyota", |
| "model": "Camry", |
| "price": "23500", |
| "year": "2017" |
|} |
---------------------------
------------------------------------
|"""SRC""['VEHICLE'][0]['PRICE']" |
------------------------------------
|"20275" |
|"23500" |
------------------------------------
パス内のフィールド名または要素が不規則なために Column.subField
メソッドの使用が困難な場合は、 apply
メソッドの代わりに、 Functions.get、 Functions.get_ignore_case、または Functions.get_path 関数を使用できます。
たとえば、次のコード行は両方とも、オブジェクトの指定されたフィールドの値を出力します。
df.select(Functions.get(Functions.col("src"), Functions.lit("dealership"))).show();
df.select(Functions.col("src").subField("dealership")).show();
同様に、次のコード行は両方とも、オブジェクトの指定されたパスにあるフィールドの値を出力します。
df.select(Functions.get_path(Functions.col("src"), Functions.lit("vehicle[0].make"))).show();
df.select(Functions.col("src").subField("vehicle").subField(0).subField("make")).show();
半構造化データへの明示的な値のキャスト¶
デフォルトでは、上記の例に示すように、フィールドと要素の値は文字列リテラル(二重引用符を含む)として返されます。
予期しない結果を回避するには、 cast メソッドを呼び出して、値を特定の型にキャストします。たとえば、次のコードは、キャストなしとキャストありの値を出力します。
// Import the objects for the data types, including StringType.
import com.snowflake.snowpark_java.types.*;
...
DataFrame df = session.table("car_sales");
df.select(Functions.col("src").subField("salesperson").subField("id")).show();
df.select(Functions.col("src").subField("salesperson").subField("id").cast(DataTypes.StringType)).show();
このコードは、次を出力します。
----------------------------------
|"""SRC""['SALESPERSON']['ID']" |
----------------------------------
|"55" |
|"274" |
----------------------------------
---------------------------------------------------
|"CAST (""SRC""['SALESPERSON']['ID'] AS STRING)" |
---------------------------------------------------
|55 |
|274 |
---------------------------------------------------
オブジェクト配列の行へのフラット化¶
半構造化データを DataFrame に「フラット化」する必要がある場合(例: 配列内にあるすべてのオブジェクトの行を生成する場合)は、 flatten メソッドを呼び出します。このメソッドは、 FLATTEN SQL 関数と同等です。オブジェクトまたは配列へのパスを渡すと、メソッドは、各フィールドの行か、オブジェクトまたは配列の要素の行を含む DataFrame を返します。
たとえば、 サンプルデータ では、 src:customer
は顧客に関する情報を含むオブジェクトの配列です。各オブジェクトには、 name
および address
フィールドが含まれています。
このパスを flatten
関数に渡すと、次のようになります。
DataFrame df = session.table("car_sales");
df.flatten(Functions.col("src").subField("customer")).show();
メソッドは DataFrame を返します。
----------------------------------------------------------------------------------------------------------------------------------------------------------
|"SRC" |"SEQ" |"KEY" |"PATH" |"INDEX" |"VALUE" |"THIS" |
----------------------------------------------------------------------------------------------------------------------------------------------------------
|{ |1 |NULL |[0] |0 |{ |[ |
| "customer": [ | | | | | "address": "San Francisco, CA", | { |
| { | | | | | "name": "Joyce Ridgely", | "address": "San Francisco, CA", |
| "address": "San Francisco, CA", | | | | | "phone": "16504378889" | "name": "Joyce Ridgely", |
| "name": "Joyce Ridgely", | | | | |} | "phone": "16504378889" |
| "phone": "16504378889" | | | | | | } |
| } | | | | | |] |
| ], | | | | | | |
| "date": "2017-04-28", | | | | | | |
| "dealership": "Valley View Auto Sales", | | | | | | |
| "salesperson": { | | | | | | |
| "id": "55", | | | | | | |
| "name": "Frank Beasley" | | | | | | |
| }, | | | | | | |
| "vehicle": [ | | | | | | |
| { | | | | | | |
| "extras": [ | | | | | | |
| "ext warranty", | | | | | | |
| "paint protection" | | | | | | |
| ], | | | | | | |
| "make": "Honda", | | | | | | |
| "model": "Civic", | | | | | | |
| "price": "20275", | | | | | | |
| "year": "2017" | | | | | | |
| } | | | | | | |
| ] | | | | | | |
|} | | | | | | |
|{ |2 |NULL |[0] |0 |{ |[ |
| "customer": [ | | | | | "address": "New York, NY", | { |
| { | | | | | "name": "Bradley Greenbloom", | "address": "New York, NY", |
| "address": "New York, NY", | | | | | "phone": "12127593751" | "name": "Bradley Greenbloom", |
| "name": "Bradley Greenbloom", | | | | |} | "phone": "12127593751" |
| "phone": "12127593751" | | | | | | } |
| } | | | | | |] |
| ], | | | | | | |
| "date": "2017-04-28", | | | | | | |
| "dealership": "Tindel Toyota", | | | | | | |
| "salesperson": { | | | | | | |
| "id": "274", | | | | | | |
| "name": "Greg Northrup" | | | | | | |
| }, | | | | | | |
| "vehicle": [ | | | | | | |
| { | | | | | | |
| "extras": [ | | | | | | |
| "ext warranty", | | | | | | |
| "rust proofing", | | | | | | |
| "fabric protection" | | | | | | |
| ], | | | | | | |
| "make": "Toyota", | | | | | | |
| "model": "Camry", | | | | | | |
| "price": "23500", | | | | | | |
| "year": "2017" | | | | | | |
| } | | | | | | |
| ] | | | | | | |
|} | | | | | | |
----------------------------------------------------------------------------------------------------------------------------------------------------------
この DataFrame から、 VALUE
フィールドにある各オブジェクトからの name
フィールドと address
フィールドを選択できます。
df.flatten(Functions.col("src").subField("customer")).select(Functions.col("value").subField("name"), Functions.col("value").subField("address")).show();
-------------------------------------------------
|"""VALUE""['NAME']" |"""VALUE""['ADDRESS']" |
-------------------------------------------------
|"Joyce Ridgely" |"San Francisco, CA" |
|"Bradley Greenbloom" |"New York, NY" |
-------------------------------------------------
次のコードは、 特定の型に値をキャスト し、列の名前を変更することにより、前の例に追加します。
df.flatten(Functions.col("src").subField("customer")).select(Functions.col("value").subField("name").cast(DataTypes.StringType).as("Customer Name"), Functions.col("value").subField("address").cast(DataTypes.StringType).as("Customer Address")).show();
-------------------------------------------
|"Customer Name" |"Customer Address" |
-------------------------------------------
|Joyce Ridgely |San Francisco, CA |
|Bradley Greenbloom |New York, NY |
-------------------------------------------
SQL ステートメントの実行¶
指定した SQL ステートメントを実行するには、 Session
クラスの sql
メソッドを呼び出し、実行するステートメントを渡します。メソッドは DataFrame を返します。
アクションメソッドの呼び出し があるまで、 SQL ステートメントは実行されないことに注意してください。
import java.util.Arrays;
// Get the list of the files in a stage.
// The collect() method causes this SQL statement to be executed.
DataFrame dfStageFiles = session.sql("ls @myStage");
Row[] files = dfStageFiles.collect();
System.out.println(Arrays.toString(files));
// Resume the operation of a warehouse.
// Note that you must call the collect method in order to execute
// the SQL statement.
session.sql("alter warehouse if exists myWarehouse resume if suspended").collect();
DataFrame tableDf = session.table("sample_product_data").select(Functions.col("id"), Functions.col("name"));
// Get the count of rows from the table.
long numRows = tableDf.count();
System.out.println("Count: " + numRows);
メソッドを呼び出して DataFrame を変換 する場合(例: フィルター、選択など)、これらのメソッドは、基になる SQL ステートメントが SELECT ステートメントである場合にのみ機能することに注意してください。変換メソッドは、他の種類の SQL ステートメントではサポートされていません。
import java.util.Arrays;
DataFrame df = session.sql("select id, category_id, name from sample_product_data where id > 10");
// Because the underlying SQL statement for the DataFrame is a SELECT statement,
// you can call the filter method to transform this DataFrame.
Row[] results = df.filter(Functions.col("category_id").lt(Functions.lit(10))).select(Functions.col("id")).collect();
System.out.println(Arrays.toString(results));
// In this example, the underlying SQL statement is not a SELECT statement.
DataFrame dfStageFiles = session.sql("ls @myStage");
// Calling the filter method results in an error.
dfStageFiles.filter(...);