Snowpark Scalaでの DataFrames の操作¶
Snowpark内でデータをクエリして処理する主な方法は、 DataFrame を使用することです。このトピックでは、 DataFrames の操作方法について説明します。
このトピックの内容:
データの取得と操作には、 DataFrame クラスを使用します。DataFrame は、遅延評価されるリレーショナルデータセットを表します。これは、特定のアクションがトリガーされたときにのみ実行されます。ある意味、 DataFrame は、データを取得するために評価する必要があるクエリのようなものです。
データを DataFrame に取得するには、
DataFrame を作成し、データセットのためにデータのソースを指定 します。
たとえば、テーブル、外部 CSV ファイル、または SQL ステートメントの実行からのデータを保持する DataFrame を作成できます。
DataFrame のデータセットを変換する方法を指定 します。
たとえば、どの列を選択するか、行をどのようにフィルタリングするか、結果をどのようにソートおよびグループ化するかなどを指定できます。
ステートメントを実行して、データをDataFrame に取得 します。
データを DataFrame に取得するには、アクションを実行するメソッド(例:
collect()
メソッド)を呼び出す必要があります。
次のセクションでは、これらのステップについて詳しく説明します。
このセクションの例の設定¶
このセクションの例のいくつかは、 DataFrame を使用して sample_product_data
という名前のテーブルをクエリします。これらの例を実行する場合は、次の SQL ステートメントを実行することにより、このテーブルを作成し、テーブルにデータを入力できます。
CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT);
INSERT INTO sample_product_data VALUES
(1, 0, 5, 'Product 1', 'prod-1', 1, 10),
(2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20),
(3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30),
(4, 0, 10, 'Product 2', 'prod-2', 2, 40),
(5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50),
(6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60),
(7, 0, 20, 'Product 3', 'prod-3', 3, 70),
(8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80),
(9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90),
(10, 0, 50, 'Product 4', 'prod-4', 4, 100),
(11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100),
(12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100);
テーブルが作成されたことを確認するには、次のコマンドを実行します。
SELECT * FROM sample_product_data;
DataFrame の構築¶
DataFrame を構築するには、 Session
クラスのメソッドを使用できます。次の各メソッドは、異なるタイプのデータソースから DataFrame を構築します。
テーブル、ビュー、またはストリームのデータから DataFrame を作成するには、
table
メソッドを呼び出します。// Create a DataFrame from the data in the "sample_product_data" table. val dfTable = session.table("sample_product_data") // To print out the first 10 rows, call: // dfTable.show()
注釈
session.table
メソッドはUpdatable
オブジェクトを返します。Updatable
はDataFrame
を拡張し、テーブル内のデータを操作するための追加のメソッド(例: データを更新および削除するためのメソッド)を提供します。 テーブル内の行の更新、削除、およびマージ をご参照ください。一連の値から DataFrame を作成するには、
createDataFrame
メソッドを呼び出します。// Create a DataFrame containing a sequence of values. // In the DataFrame, name the columns "i" and "s". val dfSeq = session.createDataFrame(Seq((1, "one"), (2, "two"))).toDF("i", "s")
注釈
Snowflakeによって予約された単語は、 DataFrame を構築するときの列名としては無効です。予約された単語のリストについては、 予約済みおよび限定キーワード をご参照ください。
値の範囲を含む DataFrame を作成するには、
range
メソッドを呼び出します。// Create a DataFrame from a range val dfRange = session.range(1, 10, 2)
ステージにファイルの DataFrame を作成する には、
read
を呼び出してDataFrameReader
オブジェクトを取得します。DataFrameReader
オブジェクトで、ファイル内のデータの形式に対応するメソッドを呼び出します。// Create a DataFrame from data in a stage. val dfJson = session.read.json("@mystage2/data1.json")
SQL クエリの結果を保持する DataFrame を作成するには、
sql
メソッドを呼び出します。// Create a DataFrame from a SQL query val dfSql = session.sql("SELECT name from products")
注: このメソッドを使用すると、テーブルおよびステージングされたファイルからデータを取得する SELECT ステートメントを実行できますが、そうではなく、
table
およびread
メソッドを使用するようにします。table
やread
のようなメソッドは、開発ツールでより優れた構文の強調表示、エラーの強調表示、およびインテリジェントなコード補完を提供できます。
データセットの変換方法の指定¶
選択する列と、結果のフィルタリング、並べ替え、グループ化などを指定するには、データセットを変換する DataFrame メソッドを呼び出します。これらのメソッドで列を識別するには、 col
関数または列に評価される式を使用します。(列と式の指定 をご参照ください。)
例:
返される行を指定するには、
filter
メソッドを呼び出します。// Import the col function from the functions object. import com.snowflake.snowpark.functions._ // Create a DataFrame for the rows with the ID 1 // in the "sample_product_data" table. // // This example uses the === operator of the Column object to perform an // equality check. val df = session.table("sample_product_data").filter(col("id") === 1) df.show()
選択する列を指定するには、
select
メソッドを呼び出します。// Import the col function from the functions object. import com.snowflake.snowpark.functions._ // Create a DataFrame that contains the id, name, and serial_number // columns in te "sample_product_data" table. val df = session.table("sample_product_data").select(col("id"), col("name"), col("serial_number")) df.show()
各メソッドは、変換された新しい DataFrame オブジェクトを返します。(メソッドは、元の DataFrame オブジェクトには影響しません。)つまり、複数の変換を適用する場合は、 チェーンメソッド呼び出し を実行して、前のメソッド呼び出しによって返された、新しい DataFrame オブジェクトに対する後続の各変換メソッドを呼び出すことができます。
これらの変換方法は、Snowflakeデータベースからデータを取得しないことに注意してください。(DataFrame を評価するアクションの実行 で説明されているアクションメソッドは、データ取得を実行します。)変換メソッドは、 SQL ステートメントの作成方法を指定するだけです。
列と式の指定¶
これらの変換メソッドを呼び出すときは、列または列を使用する式を指定する必要がある場合があります。たとえば、 select
メソッドを呼び出すときは、選択する必要のある列を指定する必要があります。
列を参照するには、 com.snowflake.snowpark.functions
オブジェクトで col 関数を呼び出して、 Column オブジェクトを作成します。
// Import the col function from the functions object.
import com.snowflake.snowpark.functions._
val dfProductInfo = session.table("sample_product_data").select(col("id"), col("name"))
dfProductInfo.show()
注釈
リテラルの Column
オブジェクトを作成するには、 列オブジェクトとしてのリテラルの使用 をご参照ください。
フィルター、プロジェクション、結合条件などを指定する場合は、式で Column
オブジェクトを使用できます。たとえば、
filter
メソッドでColumn
オブジェクトを使用して、フィルター条件を指定できます。// Specify the equivalent of "WHERE id = 20" // in an SQL SELECT statement. df.filter(col("id") === 20)
// Specify the equivalent of "WHERE a + b < 10" // in an SQL SELECT statement. df.filter((col("a") + col("b")) < 10)
select
メソッドでColumn
オブジェクトを使用して、エイリアスを定義できます。// Specify the equivalent of "SELECT b * 10 AS c" // in an SQL SELECT statement. df.select((col("b") * 10) as "c")
join
メソッドでColumn
オブジェクトを使用して、結合条件を定義できます。// Specify the equivalent of "X JOIN Y on X.a_in_X = Y.b_in_Y" // in an SQL SELECT statement. dfX.join(dfY, col("a_in_X") === col("b_in_Y"))
異なる DataFrames の列の参照¶
同じ名前の2つの異なる DataFrame オブジェクトの列を参照する場合(例: その列の DataFrames を結合する場合)、1つの DataFrame オブジェクトにある DataFrame.col
メソッドを使用して、そのオブジェクトにある列(例: df1.col("name")
と df2.col("name")
)を参照できます。
次の例は、 DataFrame.col
メソッドを使用して特定の DataFrame の列を参照する方法を示しています。この例では、 key
という名前の列がある2つの DataFrame オブジェクトの両方を結合します。この例では、 Column.as
メソッドを使用して、新しく作成された DataFrame の列の名前を変更します。
// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.col method to refer to the columns used in the join.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("key") === dfRhs.col("key")).select(dfLhs.col("value").as("L"), dfRhs.col("value").as("R"))
apply
メソッドを使用した列の参照¶
DataFrame.col
メソッドの代わりに、 DataFrame.apply
メソッドを使用して特定の DataFrame の列を参照できます。 DataFrame.col
メソッドと同様に、 DataFrame.apply
メソッドは入力として列名を受け入れ、 Column
オブジェクトを返します。
オブジェクトでScalaに apply
メソッドがある場合は、関数であるかのようにオブジェクトを呼び出すことで apply
メソッドを呼び出すことができます。たとえば、 df.apply("column_name")
を呼び出すには、単純に df("column_name")
を記述します。次の呼び出しは同等です。
df.col("<列名>")
df.apply("<列名>")
df("<列名>")
次の例は前の例と同じですが、結合操作で列を参照するために DataFrame.apply
メソッドを使用します。
// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.apply method to refer to the columns used in the join.
// Note that dfLhs("key") is shorthand for dfLhs.apply("key").
val dfJoined = dfLhs.join(dfRhs, dfLhs("key") === dfRhs("key")).select(dfLhs("value").as("L"), dfRhs("value").as("R"))
列オブジェクトの短縮形の使用¶
col
関数を使用する代わりに、次のいずれかの方法で列を参照できます。
引用符で囲まれた列名(
$"column_name"
)の前にドル記号を使用する。引用符で囲まれていない列名(
'column_name
)の前にアポストロフィ(一重引用符)を使用する。
これを実行するには、 Session
オブジェクトを作成した後、 implicits
オブジェクトから名前をインポートします。
val session = Session.builder.configFile("/path/to/properties").create
// Import this after you create the session.
import session.implicits._
// Use the $ (dollar sign) shorthand.
val df = session.table("T").filter($"id" === 10).filter(($"a" + $"b") < 10)
// Use ' (apostrophe) shorthand.
val df = session.table("T").filter('id === 10).filter(('a + 'b) < 10).select('b * 10)
オブジェクト識別子(テーブル名、列名など)の前後での二重引用符の使用¶
指定するデータベース、スキーマ、テーブル、およびステージの名前は、 Snowflake識別子の要件 に準拠している必要があります。名前を指定すると、Snowflakeはその名前を大文字と見なします。たとえば、次の呼び出しは同等です。
// The following calls are equivalent:
df.select(col("id123"))
df.select(col("ID123"))
名前が識別子の要件に準拠していない場合は、名前を二重引用符("
)で囲む必要があります。バックスラッシュ(\
)を使用して、Scala文字列リテラル内の二重引用符をエスケープします。たとえば、次のテーブル名は文字やアンダースコアで始まらないため、名前を二重引用符で囲む必要があります。
val df = session.table("\"10tablename\"")
列 の名前を指定するときは、名前を二重引用符で囲む必要はありません。名前が識別子の要件に準拠していない場合、Snowparkライブラリは、列名を自動的に二重引用符で囲みます。
// The following calls are equivalent:
df.select(col("3rdID"))
df.select(col("\"3rdID\""))
// The following calls are equivalent:
df.select(col("id with space"))
df.select(col("\"id with space\""))
すでに列名を二重引用符で囲んでいる場合、ライブラリは名前を二重引用符で囲みません。
場合によっては、列名に二重引用符が含まれることがあります。
describe table quoted;
+------------------------+ ...
| name | ...
|------------------------+ ...
| name_with_"air"_quotes | ...
| "column_name_quoted" | ...
+------------------------+ ...
識別子の要件 で説明されているように、二重引用符で囲まれた識別子内の二重引用符文字ごとに、2つの二重引用符文字(例: "name_with_""air""_quotes"
と """column_name_quoted"""
)を使用する必要があります。
val dfTable = session.table("quoted")
dfTable.select("\"name_with_\"\"air\"\"_quotes\"").show()
dfTable.select("\"\"\"column_name_quoted\"\"\"").show()
識別子が二重引用符で囲まれている場合は(明示的に引用符を追加したか、ライブラリが引用符を追加したかに関係なく)、 Snowflakeは識別子を大文字と小文字を区別する ものとして扱います。
// The following calls are NOT equivalent!
// The Snowpark library adds double quotes around the column name,
// which makes Snowflake treat the column name as case-sensitive.
df.select(col("id with space"))
df.select(col("ID WITH SPACE"))
列オブジェクトとしてのリテラルの使用¶
Column
オブジェクト内で渡すメソッドでリテラルを使用するには、リテラルを com.snowflake.snowpark.functions
オブジェクトの lit
関数に渡して、リテラルの Column
オブジェクトを作成します。たとえば、
// Import for the lit and col functions.
import com.snowflake.snowpark.functions._
// Show the first 10 rows in which num_items is greater than 5.
// Use `lit(5)` to create a Column object for the literal 5.
df.filter(col("num_items").gt(lit(5))).show()
リテラルがScalaの浮動小数点またはdouble値である場合(例: 0.05
は デフォルトでDoubleとして扱う)、Snowparkライブラリは SQL を生成し、対応するSnowparkデータ型に値を暗黙的にキャストします(例: 0.05::DOUBLE
)。これにより、指定された正確な数とは異なる概算値が生成される可能性があります。
たとえば、次のコードは、フィルター(0.05
以上の値に一致する)が DataFrame の行に一致する必要がある場合でも、一致する行を表示しません。
// Create a DataFrame that contains the value 0.05.
val df = session.sql("select 0.05 :: Numeric(5, 2) as a")
// Applying this filter results in no matching rows in the DataFrame.
df.filter(col("a") <= lit(0.06) - lit(0.01)).show()
問題は、 lit(0.06)
と lit(0.01)
が、正確な値ではなく、 0.06
と 0.01
の概算値を生成することです。
この問題を回避するには、次のいずれかの方法を使用できます。
オプション1: 使用する Snowparkの型にリテラルをキャスト する。たとえば、精度が5でスケールが2の NUMBER を使用するには、次のようにします。
df.filter(col("a") <= lit(0.06).cast(new DecimalType(5, 2)) - lit(0.01).cast(new DecimalType(5, 2))).show()
オプション2: 値を
lit
関数に渡す前に、使用する型に値をキャストする。たとえば、 BigDecimal 型 を使用する場合は、df.filter(col("a") <= lit(BigDecimal(0.06)) - lit(BigDecimal(0.01))).show()
列オブジェクトの特定型へのキャスト¶
Column
オブジェクトを特定の型にキャストするには、 Column.cast メソッドを呼び出し、 com.snowflake.snowpark.types package から型オブジェクトを渡します。たとえば、リテラルを5の精度と2のスケールで NUMBER としてキャストするには、次のようにします。
// Import for the lit function.
import com.snowflake.snowpark.functions._
// Import for the DecimalType class..
import com.snowflake.snowpark.types._
val decimalValue = lit(0.05).cast(new DecimalType(5,2))
メソッド呼び出しのチェーン¶
DataFrame オブジェクトを変換するメソッド ごとに、変換が適用された新しい DataFrame オブジェクトが返されるため、 メソッド呼び出しのチェーン により、追加の方法で変換される新しい DataFrame を生成できます。
次の例では、次のように構成された DataFrame を返します。
sample_product_data
テーブルをクエリします。id = 1
で行を返します。name
およびserial_number
列を選択します。
val dfProductInfo = session.table("sample_product_data").filter(col("id") === 1).select(col("name"), col("serial_number"))
dfProductInfo.show()
この例では、
session.table("sample_product_data")
は、sample_product_data
テーブルの DataFrame を返します。DataFrame にはまだテーブルのデータが含まれていませんが、オブジェクトにはテーブルの列の定義が含まれています。
filter(col("id") === 1)
は、id = 1
で行を返すように設定されたsample_product_data
テーブルの DataFrame を返します。DataFrame には、テーブルからの一致する行がまだ含まれていないことに改めて注意してください。 アクションメソッドを呼び出す まで、一致する行は取得されません。
select(col("name"), col("serial_number"))
は、id = 1
を持つsample_product_data
テーブルの行のname
列とserial_number
列を含む DataFrame を返します。
メソッド呼び出しをチェーンするときは、呼び出しの順序が重要であることに注意してください。各メソッド呼び出しは、変換された DataFrame を返します。後続の呼び出しが変換された DataFrame で作動することを確認します。
たとえば、次のコードでは、 select
メソッドは name
と serial_number
の2つの列のみを含む DataFrame を返します。この DataFrame の filter
メソッド呼び出しは、変換された DataFrame にはない id
列を使用しているため失敗します。
// This fails with the error "invalid identifier 'ID'."
val dfProductInfo = session.table("sample_product_data").select(col("name"), col("serial_number")).filter(col("id") === 1)
対照的に、次のコードは、 sample_product_data
テーブルのすべての列(id
列を含む)を含む DataFrame で filter()
メソッドが呼び出されるため、正常に実行されます。
// This succeeds because the DataFrame returned by the table() method
// includes the "id" column.
val dfProductInfo = session.table("sample_product_data").filter(col("id") === 1).select(col("name"), col("serial_number"))
dfProductInfo.show()
SQL ステートメントで同等のキーワード(SELECT および WHERE)を使用する場合とは異なる順序で、 select
および filter
メソッドの呼び出しが必要になる場合があることに注意してください。
DataFrame の行数制限¶
DataFrame の行数を制限するには、 DataFrame.limit 変換メソッドを使用できます。
Snowpark API は、限られた数の行を取得して出力するためのアクションメソッドも提供します。
DataFrame.first アクションメソッド(クエリを実行して最初の
n
行を返すため)DataFrame.show アクションメソッド(クエリを実行し、最初の
n
行を出力するため)
これらのメソッドは、実行される SQL ステートメントに LIMIT 句を効果的に追加します。
LIMIT の使用上の注意 で説明されているように、 LIMIT と組み合わせて並べ替え順序(ORDER BY)を指定する場合を除き、結果は非決定的です。
ORDER BY 句を LIMIT 句と一緒に保持するには(例: ORDER BY が別のサブクエリにないように)、 sort
メソッドによって返される DataFrame の結果を制限するメソッドを呼び出す必要があります。
たとえば、 メソッドの呼び出しをチェーン する場合、
// Limit the number of rows to 5, sorted by parent_id.
var dfSubset = df.sort(col("parent_id")).limit(5);
// Return the first 5 rows, sorted by parent_id.
var arrayOfRows = df.sort(col("parent_id")).first(5)
// Print the first 5 rows, sorted by parent_id.
df.sort(col("parent_id")).show(5)
列定義の取得¶
DataFrame のためにデータセット内の列の定義を取得するには、 schema
メソッドを呼び出します。このメソッドは、 StructField
オブジェクトの Array
を含む StructType
オブジェクトを返します。各 StructField
オブジェクトには、列の定義が含まれます。
// Get the StructType object that describes the columns in the
// underlying rowset.
val tableSchema = session.table("sample_product_data").schema
println("Schema for sample_product_data: " + tableSchema);
返された StructType
オブジェクトでは、列名は常に正規化されています。引用符で囲まれていない識別子は大文字で返され、引用符で囲まれた識別子は定義された正確な大文字小文字で返されます。
次の例では、 ID
および 3rd
という名前の列を含む DataFrame を作成します。列名 3rd
の場合、 名前が識別子の要件に準拠していない ため、Snowparkライブラリは名前を自動的に二重引用符("3rd"
)で囲みます。
この例では、 schema
メソッドを呼び出してから、返された StructType
オブジェクトで names
メソッドを呼び出して、列名の ArraySeq
を取得します。名前は、 schema
メソッドによって返される StructType
で正規化されます。
// Create a DataFrame containing the "id" and "3rd" columns.
val dfSelectedColumns = session.table("sample_product_data").select(col("id"), col("3rd"))
// Print out the names of the columns in the schema. This prints out:
// ArraySeq(ID, "3rd")
println(dfSelectedColumns.schema.names.toSeq)
DataFrames の結合¶
DataFrame オブジェクトを結合するには、 DataFrame.join メソッドを呼び出します。
次のセクションでは、 DataFrames を使用して結合を実行する方法について説明します。
結合のサンプルデータの設定¶
次のセクションの例では、次の SQL ステートメントを実行して設定できるサンプルデータを使用しています。
create or replace table sample_a (
id_a integer,
name_a varchar,
value integer
);
insert into sample_a (id_a, name_a, value) values
(10, 'A1', 5),
(40, 'A2', 10),
(80, 'A3', 15),
(90, 'A4', 20)
;
create or replace table sample_b (
id_b integer,
name_b varchar,
id_a integer,
value integer
);
insert into sample_b (id_b, name_b, id_a, value) values
(4000, 'B1', 40, 10),
(4001, 'B2', 10, 5),
(9000, 'B3', 80, 15),
(9099, 'B4', null, 200)
;
create or replace table sample_c (
id_c integer,
name_c varchar,
id_a integer,
id_b integer
);
insert into sample_c (id_c, name_c, id_a, id_b) values
(1012, 'C1', 10, null),
(1040, 'C2', 40, 4000),
(1041, 'C3', 40, 4001)
;
結合の列の指定¶
DataFrame.join
メソッドを使用すると、次のいずれかの方法で使用する列を指定できます。
結合条件を説明する列式を指定します。
結合の共通列として使用する1つ以上の列を指定します。
次の例では、 id_a
という名前の列で内部結合を実行します。
// Create a DataFrame that joins the DataFrames for the tables
// "sample_a" and "sample_b" on the column named "id_a".
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"))
dfJoined.show()
この例では、 DataFrame.col
メソッドを使用して、結合で使用する条件を指定します。このメソッドの詳細については、 列と式の指定 をご参照ください。
これにより、次が出力されます。
----------------------------------------------------------------------
|"ID_A" |"NAME_A" |"VALUE" |"ID_B" |"NAME_B" |"ID_A" |"VALUE" |
----------------------------------------------------------------------
|10 |A1 |5 |4001 |B2 |10 |5 |
|40 |A2 |10 |4000 |B1 |40 |10 |
|80 |A3 |15 |9000 |B3 |80 |15 |
----------------------------------------------------------------------
結合の結果で重複する同一の列名¶
結合の結果の DataFrame では、テーブル間で列名が同一であっても、Snowparkライブラリは結合されたテーブルで見つかった列名を使用します。この場合、これらの列名は、結合の結果である DataFrame で重複しています。重複する列に名前でアクセスするには、列の元のテーブルを表す DataFrame で col
メソッドを呼び出します。(列の指定の詳細については、 異なる DataFrames の列の参照 をご参照ください)。
次の例のコードは、2つの DataFrames を結合し、結合された DataFrame で select
メソッドを呼び出します。これは、それぞれの DataFrame オブジェクト(dfRhs
および dfLhs
)を表す変数から col
メソッドを呼び出して、選択する列を指定します。これは、 as
メソッドを使用して、 select
メソッドが作成する DataFrame 内の列に新しい名前を付けます。
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"))
val dfSelected = dfJoined.select(dfLhs.col("value").as("LeftValue"), dfRhs.col("value").as("RightValue"))
dfSelected.show()
これにより、次が出力されます。
------------------------------
|"LEFTVALUE" |"RIGHTVALUE" |
------------------------------
|5 |5 |
|10 |10 |
|15 |15 |
------------------------------
保存またはキャッシュする前に重複する列¶
結合の結果の DataFrame に重複する列名が含まれる場合は、結果をテーブルに保存するか DataFrame をキャッシュする前に、重複を排除するか列の名前を変更して DataFrame の重複をなくす必要があります。テーブルまたはキャッシュに保存する DataFrame 内の重複する列名の場合、Snowparkライブラリは、重複しないようにするために、重複する列名をエイリアスに置き換えます。
次の例は、列名 ID_A
と VALUE
が2つのテーブルからの結合で重複し、結果をキャッシュする前に重複を排除するか名前を変更しない場合に、キャッシュされた DataFrame の出力がどのように表示されるかを示しています。
--------------------------------------------------------------------------------------------------
|"l_ZSz7_ID_A" |"NAME_A" |"l_ZSz7_VALUE" |"ID_B" |"NAME_B" |"r_heec_ID_A" |"r_heec_VALUE" |
--------------------------------------------------------------------------------------------------
|10 |A1 |5 |4001 |B2 |10 |5 |
|40 |A2 |10 |4000 |B1 |40 |10 |
|80 |A3 |15 |9000 |B3 |80 |15 |
--------------------------------------------------------------------------------------------------
自然結合の実行¶
自然結合 (DataFrames は同じ名前の列で結合)を実行するには、 DataFrame.naturalJoin メソッドを呼び出します。
次の例では、テーブル sample_a
と sample_b
の DataFrames を共通の列(列 id_a
)で結合します。
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.naturalJoin(dfRhs)
dfJoined.show()
これにより、次が出力されます。
---------------------------------------------------
|"ID_A" |"VALUE" |"NAME_A" |"ID_B" |"NAME_B" |
---------------------------------------------------
|10 |5 |A1 |4001 |B2 |
|40 |10 |A2 |4000 |B1 |
|80 |15 |A3 |9000 |B3 |
---------------------------------------------------
統合の型の指定¶
デフォルトでは、 DataFrame.join
メソッドは内部結合を作成します。別の型の結合を指定するには、 joinType
引数を次のいずれかの値に設定します。
結合の型 |
|
---|---|
内部結合 |
|
左外部結合 |
|
右外部結合 |
|
完全外部結合 |
|
クロス結合 |
|
例:
// Create a DataFrame that performs a left outer join on
// "sample_a" and "sample_b" on the column named "id_a".
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfLeftOuterJoin = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"), "left")
dfLeftOuterJoin.show()
これにより、次が出力されます。
----------------------------------------------------------------------
|"ID_A" |"NAME_A" |"VALUE" |"ID_B" |"NAME_B" |"ID_A" |"VALUE" |
----------------------------------------------------------------------
|40 |A2 |10 |4000 |B1 |40 |10 |
|10 |A1 |5 |4001 |B2 |10 |5 |
|80 |A3 |15 |9000 |B3 |80 |15 |
|90 |A4 |20 |NULL |NULL |NULL |NULL |
----------------------------------------------------------------------
複数のテーブルの結合¶
複数のテーブルを結合するには、
テーブルごとに DataFrame を作成します。
最初の DataFrame で
DataFrame.join
メソッドを呼び出し、2番目の DataFrame を渡します。join
メソッドによって返された DataFrame を使用して、join
メソッドを呼び出し、3番目の DataFrame を渡します。
以下に示すように、 join
呼び出しを チェーン することができます。
val dfFirst = session.table("sample_a")
val dfSecond = session.table("sample_b")
val dfThird = session.table("sample_c")
val dfJoinThreeTables = dfFirst.join(dfSecond, dfFirst.col("id_a") === dfSecond.col("id_a")).join(dfThird, dfFirst.col("id_a") === dfThird.col("id_a"))
dfJoinThreeTables.show()
これにより、次が出力されます。
------------------------------------------------------------------------------------------------------------
|"ID_A" |"NAME_A" |"VALUE" |"ID_B" |"NAME_B" |"ID_A" |"VALUE" |"ID_C" |"NAME_C" |"ID_A" |"ID_B" |
------------------------------------------------------------------------------------------------------------
|10 |A1 |5 |4001 |B2 |10 |5 |1012 |C1 |10 |NULL |
|40 |A2 |10 |4000 |B1 |40 |10 |1040 |C2 |40 |4000 |
|40 |A2 |10 |4000 |B1 |40 |10 |1041 |C3 |40 |4001 |
------------------------------------------------------------------------------------------------------------
自己結合の実行¶
異なる列でテーブルをそれ自体と結合する必要がある場合は、単一の DataFrame で自己結合を実行することはできません。単一の DataFrame を使用して自己結合を実行する次の例は、 "id"
の列式が結合の左側と右側に存在するため、失敗します。
// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
val df = session.table("sample_product_data");
val dfJoined = df.join(df, col("id") === col("parent_id"))
// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
val df = session.table("sample_product_data");
val dfJoined = df.join(df, df("id") === df("parent_id"))
これらの例は両方とも、次の例外を除いて失敗します。
Exception in thread "main" com.snowflake.snowpark.SnowparkClientException:
Joining a DataFrame to itself can lead to incorrect results due to ambiguity of column references.
Instead, join this DataFrame to a clone() of itself.
代わりに、 DataFrame.clone メソッドを使用して DataFrame オブジェクトのクローンを作成し、2つの DataFrame オブジェクトを使用して結合を実行します。
// Create a DataFrame object for the "sample_product_data" table for the left-hand side of the join.
val dfLhs = session.table("sample_product_data")
// Clone the DataFrame object to use as the right-hand side of the join.
val dfRhs = dfLhs.clone()
// Create a DataFrame that joins the two DataFrames
// for the "sample_product_data" table on the
// "id" and "parent_id" columns.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id") === dfRhs.col("parent_id"))
dfJoined.show()
同じ列で自己結合を実行する場合は、 USING
句の列式の Seq
を渡す join
メソッドを呼び出します。
// Create a DataFrame that performs a self-join on a DataFrame
// using the column named "key".
val df = session.table("sample_product_data");
val dfJoined = df.join(df, Seq("key"))
DataFrame を評価するアクションの実行¶
前述のように、 DataFrame は遅延評価されます。つまり、アクションを実行するまで、 SQL ステートメントは実行のためにサーバーに送信されません。アクションにより、 DataFrame が評価され、対応する SQL ステートメントが実行のためにサーバーに送信されます。
次のセクションでは、 DataFrame で同期的および非同期的にアクションを実行する方法について説明します。
同期的なアクションの実行¶
アクションを同期的に実行するには、次のアクションメソッドのいずれかを呼び出します。
アクションを同期的に実行する方法 |
説明 |
---|---|
|
DataFrame を評価し、結果のデータセットを 行 オブジェクトの |
|
DataFrame を評価し、 行 オブジェクトの 反復子 を返します。結果セットが大きい場合は、この方法を使用して、すべての結果を一度にメモリにロードすることを回避します。 行の反復子を返す をご参照ください。 |
|
DataFrame を評価し、行数を返します。 |
|
DataFrame を評価し、行をコンソールに表示します。このメソッドでは、行数は10行に制限されることに注意してください(デフォルト)。 DataFrame での行の出力 をご参照ください。 |
|
クエリを実行し、仮テーブルを作成して、テーブルに結果を配置します。このメソッドは、この仮テーブルのデータにアクセスするために使用できる |
|
DataFrame のデータを指定したテーブルに保存します。 テーブルへのデータの保存 をご参照ください。 |
|
DataFrame をステージ上の指定されたファイルに保存します。 ステージにあるファイルへの DataFrame の保存 をご参照ください。 |
|
DataFrame のデータを指定したテーブルにコピーします。 ファイルからテーブルへのデータのコピー をご参照ください。 |
|
指定されたテーブルの行を削除します。 テーブル内の行の更新、削除、およびマージ をご参照ください。 |
|
指定されたテーブルの行を更新します。 テーブル内の行の更新、削除、およびマージ をご参照ください。 |
|
指定されたテーブルに行をマージします。 テーブル内の行の更新、削除、およびマージ をご参照ください。 |
クエリを実行して結果の数を返すには、 count
メソッドを呼び出します。
// Create a DataFrame for the "sample_product_data" table.
val dfProducts = session.table("sample_product_data")
// Send the query to the server for execution and
// print the count of rows in the table.
println("Rows returned: " + dfProducts.count())
アクションメソッドを呼び出して、次を実行することもできます。
注: DataFrame の列の定義を取得するために schema
メソッドを呼び出す場合は、アクションメソッドを呼び出す必要はありません。
非同期的なアクションの実行¶
注釈
この機能は、Snowpark 0.11.0で導入されました。
アクションを非同期で実行するには、 async
メソッドを呼び出して「非同期アクター」オブジェクト(例: DataFrameAsyncActor
)を返し、そのオブジェクトで非同期アクションメソッドを呼び出します。
非同期アクターオブジェクトのこれらのアクションメソッドは、 TypedAsyncJob
オブジェクトを返します。これを使用して、非同期アクションのステータスを確認し、アクションの結果を取得できます。
次のセクションでは、アクションを非同期で実行して結果を確認する方法について説明します。
非同期アクションの基本的なフローの理解¶
次のメソッドを使用して、アクションを非同期で実行できます。
アクションを非同期的に実行する方法 |
説明 |
---|---|
|
DataFrame を非同期的に評価して、結果のデータセットを 行 オブジェクトの |
|
DataFrame を非同期的に評価して、 行 オブジェクトの 反復子 を取得します。結果セットが大きい場合は、この方法を使用して、すべての結果を一度にメモリにロードすることを回避します。 行の反復子を返す をご参照ください。 |
|
DataFrame を非同期的に評価して、行数を取得します。 |
|
DataFrame のデータを指定したテーブルに非同期的に保存します。 テーブルへのデータの保存 をご参照ください。 |
|
DataFrame をステージ上の指定されたファイルに保存します。 ステージにあるファイルへの DataFrame の保存 をご参照ください。 |
|
DataFrame のデータを指定したテーブルに非同期的にコピーします。 ファイルからテーブルへのデータのコピー をご参照ください。 |
|
指定されたテーブルの行を非同期的に削除します。 テーブル内の行の更新、削除、およびマージ をご参照ください。 |
|
指定されたテーブルの行を非同期的に更新します。 テーブル内の行の更新、削除、およびマージ をご参照ください。 |
|
非同期で指定されたテーブルに行をマージします。バージョン1.3.0以降でサポートされています。 テーブル内の行の更新、削除、およびマージ をご参照ください。 |
返された TypedAsyncJob オブジェクトから、次の操作を実行できます。
アクションが完了したかどうかを判断するには、
isDone
メソッドを呼び出します。アクションに対応するクエリ ID を取得するには、
getQueryId
メソッドを呼び出します。アクションの結果(例:
collect
メソッドに対するRow
オブジェクトのArray
、またはcount
メソッドの行数)を返すには、getResult
メソッドを呼び出します。getResult
はブロッキング呼び出しであることに注意してください。アクションをキャンセルするには、
cancel
メソッドを呼び出します。
たとえば、クエリを非同期で実行し、結果を Row
オブジェクトの Array
として取得するには、 DataFrame.async.collect
を呼び出します。
// Create a DataFrame with the "id" and "name" columns from the "sample_product_data" table.
// This does not execute the query.
val df = session.table("sample_product_data").select(col("id"), col("name"))
// Execute the query asynchronously.
// This call does not block.
val asyncJob = df.async.collect()
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// Get an Array of Rows containing the results, and print the results.
// Note that getResult is a blocking call.
val results = asyncJob.getResult()
results.foreach(println)
クエリを非同期で実行し、結果の数を取得するには、 DataFrame.async.count
を呼び出します。
// Create a DataFrame for the "sample_product_data" table.
val dfProducts = session.table("sample_product_data")
// Execute the query asynchronously.
// This call does not block.
val asyncJob = df.async.count()
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// Print the count of rows in the table.
// Note that getResult is a blocking call.
println("Rows returned: " + asyncJob.getResult())
待機する最大秒数の指定¶
getResult
メソッドを呼び出すときは、 maxWaitTimeInSeconds
引数を使用して、クエリが完了するのを待ってから結果を取得するまでの最大秒数を指定できます。たとえば、
// Wait a maximum of 10 seconds for the query to complete before retrieving the results.
val results = asyncJob.getResult(10)
この引数を省略すると、メソッドは Snowparkがリクエストするタイムアウト(秒単位) の構成プロパティで指定された最大秒数で待機します。(これは、 セッションオブジェクトを作成する ときに設定できるプロパティです。)
ID による非同期クエリへのアクセス¶
以前に送信した非同期クエリのクエリ ID がある場合は、 Session.createAsyncJob
メソッドを呼び出して、クエリのステータスの確認、クエリ結果の取得、クエリのキャンセルに使用できる AsyncJob オブジェクトの作成ができます。
TypedAsyncJob
とは異なり、 AsyncJob
は結果を取得するための getResult
メソッドを提供しないことに注意してください。結果を取得する必要がある場合は、代わりに getRows
または getIterator
メソッドを呼び出します。
例:
val asyncJob = session.createAsyncJob(myQueryId)
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// If you need to retrieve the results, call getRows to return an Array of Rows containing the results.
// Note that getRows is a blocking call.
val rows = asyncJob.getRows()
rows.foreach(println)
行の DataFrame への取得¶
DataFrame の変換方法を指定 した後、 アクションメソッドを呼び出して クエリを実行し、結果を返すことができます。 Array
内のすべての行を返すことも、行ごとに結果を反復処理できる 反復子 を返すこともできます。後者では、データ量が多い場合、大量のデータがメモリにロードされないように、行がチャンクごとにメモリにロードされます。
すべての行を返す¶
すべての行を一度に返すには、 DataFrame.collect メソッドを呼び出します。このメソッドは、 行 オブジェクトの配列を返します。行から値を取得するには、 getType
メソッドを呼び出します(例: getString
、 getInt
など)。
例:
import com.snowflake.snowpark.functions_
val rows = session.table("sample_product_data").select(col("name"), col("category_id")).sort(col("name")).collect()
for (row <- rows) {
println(s"Name: ${row.getString(0)}; Category ID: ${row.getInt(1)}")
}
行の反復子を返す¶
反復子 を使用して結果の 行 オブジェクトを反復処理する場合は、 DataFrame.toLocalIterator を呼び出します。結果のデータ量が多い場合、メソッドは行をチャンクごとにロードして、すべての行を一度にメモリにロードしないようにします。
例:
import com.snowflake.snowpark.functions_
while (rowIterator.hasNext) {
val row = rowIterator.next()
println(s"Name: ${row.getString(0)}; Category ID: ${row.getInt(1)}")
}
最初の n
行を返す¶
最初の n
行を返すには、 DataFrame.first メソッドを呼び出し、返す行数を渡します。
DataFrame の行数制限 で説明されているように、結果は非決定的です。結果を決定的にする場合は、ソートされた DataFrame (df.sort().first()
)でこのメソッドを呼び出します。
例:
import com.snowflake.snowpark.functions_
val df = session.table("sample_product_data")
val rows = df.sort(col("name")).first(5)
rows.foreach(println)
DataFrame での行の出力¶
DataFrame の最初の10行をコンソールに出力するには、 DataFrame.show メソッドを呼び出します。別の行数を出力するには、出力する行数を渡します。
DataFrame の行数制限 で説明されているように、結果は非決定的です。結果を決定的にする場合は、ソートされた DataFrame (df.sort().show()
)でこのメソッドを呼び出します。
例:
import com.snowflake.snowpark.functions_
val df = session.table("sample_product_data")
df.sort(col("name")).show()
テーブル内の行の更新、削除、およびマージ¶
注釈
この機能は、Snowpark 0.7.0で導入されました。
Session.table
を呼び出してテーブルの DataFrame
オブジェクトを作成すると、メソッドは Updatable
オブジェクトを返します。これは、テーブル内のデータを更新および削除するための追加のメソッドにより、 DataFrame
を拡張します。(Updatable をご参照ください。)
テーブルの行を更新または削除する必要がある場合は、 Updatable
クラスの次のメソッドを使用できます。
update
を呼び出して、テーブル内の既存の行を更新します。 テーブルにある行の更新 をご参照ください。delete
を呼び出して、テーブルから行を削除します。 テーブル内にある行の削除 をご参照ください。merge
を呼び出して、2番目のテーブルまたはサブクエリのデータに基づき、1つのテーブルにある行を挿入、更新、および削除します。(SQL の 行のテーブルへのマージ コマンドと同等。) MERGE をご参照ください。
テーブルにある行の更新¶
update
メソッドの場合は、更新する列とそれらの列に割り当てる対応する値を関連付ける、 Map
で渡します。 update
は、更新された行数を含む UpdateResult
オブジェクトを返します。(UpdateResult をご参照ください。)
注釈
update
は、 アクションメソッド です。これは、メソッドを呼び出すと、 SQL ステートメントがサーバーに送信されて実行されることを意味します。
たとえば、 count
という名前の列の値を値 1
に置き換えるには、
val updatableDf = session.table("sample_product_data")
val updateResult = updatableDf.update(Map("count" -> lit(1)))
println(s"Number of rows updated: ${updateResult.rowsUpdated}")
上記の例では、列の名前を使用して列を識別しています。列式を使用することもできます。
val updateResult = updatableDf.update(Map(col("count") -> lit(1)))
条件が満たされたときにのみ更新する必要がある場合は、その条件を引数として指定できます。たとえば、 category_id
列の値が 20
である行の count
という名前の列の値を置き換えるには、次のようにします。
val updateResult = updatableDf.update(Map(col("count") -> lit(1)), col("category_id") === 20)
別の DataFrame
オブジェクトとの結合に基づいて条件を作成する必要がある場合は、その DataFrame
を引数として渡し、その DataFrame
を条件で使用できます。たとえば、 category_id
列が DataFrame
dfParts
の category_id
と一致する行の、 count
という名前の列の値を置き換えるには、次のようにします。
val updatableDf = session.table("sample_product_data")
val dfParts = session.table("parts")
val updateResult = updatableDf.update(Map(col("count") -> lit(1)), updatableDf("category_id") === dfParts("category_id"), dfParts)
テーブル内にある行の削除¶
delete
メソッドの場合は、削除する行を識別する条件を指定でき、その条件は別の DataFrame との結合に基づくことができます。 delete
は、削除された行数を含む DeleteResult
オブジェクトを返します。(DeleteResult をご参照ください。)
注釈
delete
は、 アクションメソッド です。これは、メソッドを呼び出すと、 SQL ステートメントがサーバーに送信されて実行されることを意味します。
たとえば、 category_id
列の値が 1
の行を削除するには、次を実行します。
val updatableDf = session.table("sample_product_data")
val deleteResult = updatableDf.delete(updatableDf("category_id") === 1)
println(s"Number of rows deleted: ${deleteResult.rowsDeleted}")
条件が別の DataFrame の列を参照している場合は、その DataFrame を2番目の引数として渡します。たとえば、 category_id
列が DataFrame
dfParts
の category_id
と一致する行を削除するには、 dfParts
を2番目の引数として渡します。
val updatableDf = session.table("sample_product_data")
val deleteResult = updatableDf.delete(updatableDf("category_id") === dfParts("category_id"), dfParts)
println(s"Number of rows deleted: ${deleteResult.rowsDeleted}")
行のテーブルへのマージ¶
2番目のテーブルまたはサブクエリの値に基づいて1つのテーブルの行を挿入、更新、および削除するには(SQL の MERGE コマンドに相当)、次の手順を実行します。
データをマージするテーブルの
Updatable
オブジェクトで、merge
メソッドを呼び出し、他のテーブルのDataFrame
オブジェクトと、結合条件の列式を渡します。これにより、一致する行と一致しない行に対して実行するアクション(例: 挿入、更新、削除)を指定するために使用できる、
MergeBuilder
オブジェクトが返されます。(MergeBuilder をご参照ください。)MergeBuilder
オブジェクトの使用。一致する行に対して実行する必要がある更新または削除を指定するには、
whenMatched
メソッドを呼び出します。行の更新または削除が必要なときに追加の条件を指定する必要がある場合は、その条件の列式を渡すことができます。
このメソッドは、実行するアクションを指定するために使用できる
MatchedClauseBuilder
オブジェクトを返します。(MatchedClauseBuilder をご参照ください。)MatchedClauseBuilder
オブジェクトのupdate
またはdelete
メソッドを呼び出して、一致する行に対して実行する必要がある更新または削除アクションを指定します。これらのメソッドは、追加の句を指定するために使用できるMergeBuilder
オブジェクトを返します。行が一致しない場合に実行する必要のある挿入を指定するには、
whenNotMatched
メソッドを呼び出します。行を挿入する必要があるときに追加の条件を指定する必要がある場合は、その条件の列式を渡すことができます。
このメソッドは、実行するアクションを指定するために使用できる
NotMatchedClauseBuilder
オブジェクトを返します。(NotMatchedClauseBuilder をご参照ください。)NotMatchedClauseBuilder
オブジェクトのinsert
メソッドを呼び出して、行が一致しない場合に実行する必要のある挿入アクションを指定します。これらのメソッドは、追加の句を指定するために使用できるMergeBuilder
オブジェクトを返します。
実行する必要のある挿入、更新、および削除の指定が完了したら、
MergeBuilder
オブジェクトのcollect
メソッドを呼び出して、指定した挿入、更新、および削除をテーブルで実行します。collect
は、挿入、更新、および削除された行数を含むMergeResult
オブジェクトを返します。(MergeResult をご参照ください。)
次の例では、 target
テーブルに一致する ID の行が含まれていない場合に、 source
テーブルの id
列と value
列の行を target
テーブルに挿入します。
val mergeResult = target.merge(source, target("id") === source("id"))
.whenNotMatched.insert(Seq(source("id"), source("value")))
.collect()
次の例では、同じ ID を持つ source
テーブルにある行の value
列の値で、 target
テーブルにある行を更新します。
val mergeResult = target.merge(source, target("id") === source("id"))
.whenMatched.update(Map("value" -> source("value")))
.collect()
テーブルへのデータの保存¶
DataFrame の内容を新規または既存のテーブルに保存できます。これには、次の権限が必要です。
テーブルが存在しない場合は、スキーマに対する CREATE TABLE 権限。
テーブルに対する INSERT 権限。
DataFrame の内容をテーブルに保存するには、
DataFrame.write メソッドを呼び出し、 DataFrameWriter オブジェクトを取得します。
DataFrameWriter.mode メソッドを呼び出し、テーブルへの書き込みの設定を指定する SaveMode オブジェクトを渡します。
行を挿入するには、
SaveMode.Append
を渡します。既存のテーブルを上書きするには、
SaveMode.Overwrite
を渡します。
このメソッドは、指定されたモードで構成された同じ
DataFrameWriter
オブジェクトを返します。既存のテーブル(
SaveMode.Append
)に行を挿入していて、 DataFrame の列名がテーブルの列名と一致する場合は、 DataFrameWriter.option メソッドを呼び出し、"columnOrder"
と"name"
を引数として渡します。注釈
このメソッドはSnowpark 1.4.0で導入されました。
デフォルトでは、
columnOrder
オプションは"index"
に設定されています。これは、DataFrameWriter
が列の表示される順序で値を挿入することを意味します。たとえば、DataFrameWriter
は、テーブルの最初の列にある DataFrame から最初の列の値を挿入し、テーブルの2番目の列にある DataFrame から2番目の列の値を挿入します。このメソッドは、指定されたオプションで構成された同じ
DataFrameWriter
オブジェクトを返します。DataFrameWriter.saveAsTable を呼び出し、 DataFrame の内容を指定されたテーブルに保存します。
データをテーブルに保存する SQL ステートメントを実行するために、別のメソッド(例:
collect
)を呼び出す必要はありません。saveAsTable
は、 SQL ステートメントを実行する アクションメソッド です。
次の例では、既存のテーブル(tableName
変数で識別される)を DataFrame df
のコンテンツで上書きします。
df.write.mode(SaveMode.Overwrite).saveAsTable(tableName)
次の例では、 DataFrame df
から既存のテーブル(tableName
変数で識別される)に行を挿入します。この例では、テーブルと DataFrame の両方に列 c1
と c2
が含まれています。
この例では、 columnOrder
オプションを "name"
(DataFrame 列と同じ名前の値をテーブル列に挿入する)に設定することと、デフォルトの columnOrder
オプション(DataFrame の列の順序に基づいて、テーブル列に値を挿入する)を使用することの違いを示しています。
val df = session.sql("SELECT 1 AS c2, 2 as c1")
// With the columnOrder option set to "name", the DataFrameWriter uses the column names
// and inserts a row with the values (2, 1).
df.write.mode(SaveMode.Append).option("columnOrder", "name").saveAsTable(tableName)
// With the default value of the columnOrder option ("index"), the DataFrameWriter the uses column positions
// and inserts a row with the values (1, 2).
df.write.mode(SaveMode.Append).saveAsTable(tableName)
DataFrame からのビューの作成¶
DataFrame からビューを作成するには、 DataFrame.createOrReplaceView メソッドを呼び出します。
df.createOrReplaceView("db.schema.viewName")
createOrReplaceView
を呼び出すと、すぐに新しいビューが作成されることに注意してください。さらに重要なことに、 DataFrame が評価されることはありません。(アクションを実行 するまで、 DataFrame 自体は評価されません。)
createOrReplaceView
を呼び出して作成したビューは永続的です。そのビューが不要になった場合は、 ビューを手動でドロップ できます。
セッション専用の仮ビューを作成する必要がある場合は、代わりに DataFrame.createOrReplaceTempView メソッドを呼び出します。
df.createOrReplaceTempView("db.schema.viewName")
DataFrame のキャッシング¶
場合によっては、複雑なクエリを実行し、(同じクエリを再度実行するのではなく)後続の操作で使用するために結果を保持する必要があります。このような場合は、 DataFrame.cacheResult メソッドを呼び出すことで、 DataFrame の内容をキャッシュできます。
このメソッドは、
クエリを実行します。
cacheResult
を呼び出す前に、 結果を取得するための別個のアクションメソッドを呼び出す 必要はありません。cacheResult
は、クエリを実行するアクションメソッドです。結果を仮テーブルに保存します
cacheResult
は、仮テーブルを作成するため、使用中のスキーマに対する CREATE TABLE 権限が必要です。仮テーブルの結果へのアクセスを提供する、 HasCachedResult オブジェクトを返します。
HasCachedResult
は、DataFrame
を拡張するため、このキャッシュされたデータに対して、 DataFrame で実行できるのと同じ操作のいくつかを実行できます。
注釈
cacheResult
はクエリを実行して結果をテーブルに保存するため、このメソッドでは計算とストレージのコストが増加する可能性があります。
例:
import com.snowflake.snowpark.functions_
// Set up a DataFrame to query a table.
val df = session.table("sample_product_data").filter(col("category_id") > 10)
// Retrieve the results and cache the data.
val cachedDf = df.cacheResult()
// Create a DataFrame containing a subset of the cached data.
val dfSubset = cachedDf.filter(col("category_id") === lit(20)).select(col("name"), col("category_id"))
dfSubset.show()
このメソッドを呼び出しても、元の DataFrame は影響を受けないことに注意してください。たとえば、 dfTable
がテーブル sample_product_data
の DataFrame であるとします。
val dfTempTable = dfTable.cacheResult()
cacheResult
を呼び出した後も、 dfTable
は sample_product_data
テーブルを指しており、引き続き dfTable
を使用してそのテーブルをクエリおよび更新できます。
一時テーブルにキャッシュされたデータを使用するには、 dfTempTable
(cacheResult
によって返される HasCachedResult
オブジェクト)を使用します。
ステージでのファイルの操作¶
Snowparkライブラリは、ステージにあるファイルを使用して、 Snowflakeにデータをロード し、 Snowflake からデータをアンロードするために使用できるクラスとメソッドを提供します。
注釈
これらのクラスとメソッドをステージで使用するには、 ステージを操作するための権限 が必要です。
次のセクションでは、これらのクラスとメソッドの使用方法について説明します。
ステージでのファイルのアップロードとダウンロード¶
ステージでファイルをアップロードおよびダウンロードするには、 FileOperation オブジェクトを使用します。
ステージへのファイルのアップロード¶
ステージにファイルをアップロードするには、
ファイルをステージにアップロードするための権限 があることを確認します。
Session.file を使用して、セッションの FileOperation オブジェクトにアクセスします。
FileOperation.put メソッドを呼び出して、ファイルをステージにアップロードします。
このメソッドは、 SQL PUT コマンドを実行します。
PUT コマンドに オプションのパラメーター を指定するには、パラメーターと値の
Map
を作成し、Map
をoptions
引数として渡します。たとえば、// Upload a file to a stage without compressing the file. val putOptions = Map("AUTO_COMPRESS" -> "FALSE") val putResults = session.file.put("file:///tmp/myfile.csv", "@myStage", putOptions)
localFilePath
引数では、ワイルドカード(*
および?
)を使用して、アップロードするファイルのセットを識別できます。たとえば、// Upload the CSV files in /tmp with names that start with "file". // You can use the wildcard characters "*" and "?" to match multiple files. val putResults = session.file.put("file:///tmp/file*.csv", "@myStage/prefix2")
put
メソッドによって返された PutResult オブジェクトのArray
をチェックして、ファイルが正常にアップロードされたかどうかを確認します。たとえば、そのファイルのファイル名と PUT 操作のステータスを出力するには、// Print the filename and the status of the PUT operation. putResults.foreach(r => println(s" ${r.sourceFileName}: ${r.status}"))
ステージからのファイルのダウンロード¶
ステージからファイルをダウンロードするには、
ファイルをステージからダウンロードするための権限 があることを確認します。
Session.file を使用して、セッションの FileOperation オブジェクトにアクセスします。
FileOperation.get メソッドを呼び出して、ステージからファイルをダウンロードします。
このメソッドは、 SQL GET コマンドを実行します。
GET コマンドに オプションのパラメーター を指定するには、パラメーターと値の
Map
を作成し、Map
をoptions
引数として渡します。たとえば、// Download files with names that match a regular expression pattern. val getOptions = Map("PATTERN" -> s"'.*file_.*.csv.gz'") val getResults = session.file.get("@myStage", "file:///tmp", getOptions)
get
メソッドによって返された GetResult オブジェクトのArray
をチェックして、ファイルが正常にダウンロードされたかどうかを確認します。たとえば、そのファイルのファイル名と GET 操作のステータスを出力するには、// Print the filename and the status of the GET operation. getResults.foreach(r => println(s" ${r.fileName}: ${r.status}"))
ステージにデータをアップロードおよびダウンロードする際の入力ストリームの使用¶
注釈
この機能は、Snowpark 1.4.0で導入されました。
入力ストリームを使用してステージ上のファイルにデータをアップロードし、ステージ上のファイルからデータをダウンロードするには、 FileOperation オブジェクトの uploadStream
メソッドと downloadStream
メソッドを使用します。
ステージ上のファイルにデータをアップロードする際の入力ストリームの使用¶
java.io.InputStream オブジェクトからステージ上のファイルにデータをアップロードするには、
ファイルをステージにアップロードするための権限 があることを確認します。
Session.file を使用して、セッションの FileOperation オブジェクトにアクセスします。
FileOperation.uploadStream メソッドを呼び出します。
データを書き込むステージ上のファイルと
InputStream
オブジェクトへの完全なパスを渡します。さらに、compress
引数を使用して、データをアップロードする前にデータを圧縮するかどうかを指定します。
例:
import java.io.InputStream
...
val compressData = true
val pathToFileOnStage = "@myStage/path/file"
session.file.uploadStream(pathToFileOnStage, new ByteArrayInputStream(fileContent.getBytes()), compressData)
ステージ上のファイルからデータをダウンロードする際の入力ストリームの使用¶
ステージ上のファイルから java.io.InputStream オブジェクトにデータをダウンロードするには、
ファイルをステージからダウンロードするための権限 があることを確認します。
Session.file を使用して、セッションの FileOperation オブジェクトにアクセスします。
FileOperation.downloadStream メソッドを呼び出します。
ダウンロードするデータを含むステージ上のファイルへの完全なパスを渡します。
decompress
引数を使用して、ファイル内のデータを圧縮するかどうかを指定します。
例:
import java.io.InputStream
...
val isDataCompressed = true
val pathToFileOnStage = "@myStage/path/file"
val is = session.file.downloadStream(pathToFileOnStage, isDataCompressed)
ステージ内におけるファイルの DataFrame の設定¶
このセクションでは、Snowflakeステージでファイルの DataFrame を設定する方法について説明します。この DataFrame を作成すると、 DataFrame を使用して次のことができます。
Snowflakeステージのファイルに DataFrame を設定するには、 DataFrameReader
クラスを使用します。
次の権限があることを確認します。
次のいずれかを使用します。
ステージングされたファイルからデータをコピーする方法を決定する コピーオプション を指定する場合は、スキーマに対する CREATE TABLE 権限。
それ以外では、スキーマに対する CREATE FILE FORMAT 権限。
Session
クラスのread
メソッドを呼び出して、DataFrameReader
オブジェクトにアクセスします。ファイルが CSV 形式の場合は、ファイルのフィールドを記述します。そのためには、
ファイル内のフィールドを記述する一連の StructField オブジェクトで構成される StructType オブジェクトを作成します。
StructField
オブジェクトごとに、以下を指定します。フィールドの名前。
フィールドのデータ型(
com.snowflake.snowpark.types
パッケージでオブジェクトとして指定)。フィールドがNULL可能かどうか。
例:
import com.snowflake.snowpark.types._ val schemaForDataFile = StructType( Seq( StructField("id", StringType, true), StructField("name", StringType, true)))
DataFrameReader
オブジェクトでschema
メソッドを呼び出し、StructType
オブジェクトを渡します。例:
var dfReader = session.read.schema(schemaForDataFile)
schema
メソッドは、指定されたフィールドを含むファイルを読み取るように構成されたDataFrameReader
オブジェクトを返します。他の形式(JSON など)のファイルでは、これを実行する必要はありません。これらのファイルの場合、
DataFrameReader
は、データをフィールド名$1
の VARIANT 型の単一フィールドとして扱います。
データの読み取り方法に関する追加情報を指定する必要がある(例: データが圧縮されているか、 CSV ファイルがフィールドを区切るためにコンマではなくセミコロンを使用している)場合は、 DataFrameReader.option メソッドまたは DataFrameReader.options メソッドを呼び出します。
設定するオプションの名前と値を渡します。次のタイプのオプションを設定できます。
CREATE FILE FORMAT のドキュメント で説明されている ファイル形式オプション。
COPY INTO TABLE ドキュメント で説明されている コピーオプション。
コピーオプションを設定すると、 DataFrame にデータを取得 するときに、よりコストのかかる実行戦略の実行される可能性があることに注意してください。
次の例では、圧縮されておらず、フィールド区切り文字にセミコロンを使用する CSV ファイル内のデータをクエリするように、
DataFrameReader
オブジェクトを設定します。dfReader = dfReader.option("field_delimiter", ";").option("COMPRESSION", "NONE")
option
メソッドは、指定されたオプションで構成されたDataFrameReader
オブジェクトを返します。複数のオプションを設定するには、
option
メソッドに 呼び出しをチェーン する(上記の例に示すように)か、 DataFrameReader.options メソッドを呼び出し、オプションの名前と値のMap
で渡します。ファイルの形式に対応したメソッドを呼び出します。以下のいずれかのメソッドを呼び出すことができます。
これらのメソッドを呼び出すときは、読み取るファイルのステージ位置を渡します。たとえば、
val df = dfReader.csv("@s3_ts_stage/emails/data_0_0_0.csv")
同じプレフィックスで始まる複数のファイルを指定するには、ステージ名の後にプレフィックスを指定します。たとえば、ステージ
@mystage
からプレフィックスcsv_
を持つファイルをロードするには、val df = dfReader.csv("@mystage/csv_")
ファイルの形式に対応するメソッドは、そのファイルの CopyableDataFrame オブジェクトを返します。
CopyableDataFrame
はDataFrame
を拡張し、ステージングされたファイルにあるデータを処理するための追加のメソッドを提供します。アクションメソッドを呼び出して、次を実行します。
テーブルの DataFrames の場合と同様、データは、 アクションメソッド を呼び出すまで DataFrame に取得されません。
ファイルから DataFrame へのデータのロード¶
ステージのファイルに DataFrame を設定 した後、ファイルから DataFrame にデータをロードできます。
DataFrame オブジェクトメソッドを使用して、データセットで必要な 変換を実行 します(例: 特定のフィールドの選択、行のフィルタリングなど)。
たとえば、
mystage
という名前のステージにあるdata.json
という名前の JSON ファイルからcolor
要素を抽出するには、val df = session.read.json("@mystage/data.json").select(col("$1")("color"))
前に説明したように、 CSV 以外の形式のファイル(例: JSON)の場合、
DataFrameReader
は、ファイル内のデータを$1
という名前の単一の VARIANT 列として扱います。DataFrame.collect
メソッドを呼び出してデータをロードします。たとえば、val results = df.collect()
ファイルからテーブルへのデータのコピー¶
ステージのファイルに DataFrame を設定 した後、 CopyableDataFrame.copyInto メソッドを呼び出してデータをテーブルにコピーできます。このメソッドは、 COPY INTO <テーブル> コマンドを実行します。
注釈
copyInto
を呼び出す前に collect
メソッドを呼び出す必要はありません。 copyInto
を呼び出す前に、ファイルのデータが DataFrame にある必要はありません。
たとえば、次のコードは、 myFileStage
で指定された CSV ファイルからテーブル mytable
にデータをロードします。データは CSV ファイルにあるため、コードは、合わせて ファイルのフィールドを記述する 必要があります。この例では、 DataFrameReader.schema メソッドを呼び出し、フィールドを説明する一連の StructField オブジェクトを含む StructType オブジェクト(csvFileSchema
)を渡しています。
val df = session.read.schema(csvFileSchema).csv(myFileStage)
df.copyInto("mytable")
ステージにあるファイルへの DataFrame の保存¶
注釈
この機能は、Snowpark 1.5.0で導入されました。
ステージ上のファイルに DataFrame を保存する必要がある場合は、ファイルの形式に対応する DataFrameWriter メソッド(例: CSV ファイルに書き込む csv
メソッド)を呼び出して、ファイルを保存する必要のあるステージ位置に渡します。これらの DataFrameWriter
メソッドは、 COPY INTO <場所> コマンドを実行します。
注釈
これらの DataFrameWriter
メソッドを呼び出す前に collect
メソッドを呼び出す必要はありません。これらのメソッドを呼び出す前に、ファイルのデータが DataFrame にある必要はありません。
DataFrame の内容をステージ上のファイルに保存するには、
DataFrame.write メソッドを呼び出し、 DataFrameWriter オブジェクトを取得します。たとえば、
sample_product_data
という名前のテーブルを表す DataFrame のDataFrameWriter
オブジェクトを取得するには、次のようにします。dfWriter = session.table("sample_product_data").write
ファイルの内容を上書きする場合(ファイルが存在する場合)は、 DataFrameWriter.mode メソッドを呼び出し、
SaveMode.Overwrite
を渡します。それ以外でステージ上の指定されたファイルがすでに存在する場合、デフォルトで、
DataFrameWriter
はエラーを報告します。mode
メソッドは、指定されたモードで構成された同じDataFrameWriter
オブジェクトを返します。たとえば、
DataFrameWriter
がステージ上のファイルを上書きするように指定するには、次のようにします。dfWriter = dfWriter.mode(SaveMode.Overwrite)
データの保存方法に関する追加情報を指定する必要がある場合(たとえば、データを圧縮する必要がある場合や、セミコロンを使用してCSVファイルのフィールドを区切る場合)は、 DataFrameWriter.option メソッドまたは DataFrameWriter.options メソッドを呼び出します。
設定するオプションの名前と値を渡します。次のタイプのオプションを設定できます。
COPY INTO <場所> のドキュメント で説明されている ファイル形式オプション。
COPY INTO <場所> のドキュメントで説明されている コピーオプション。
次のオプションの設定には、
option
メソッドは使用できないことに注意してください。TYPE 形式タイプオプション。
OVERWRITE コピーオプション。このオプションを設定するには、代わりに
mode
メソッドを呼び出します(前のステップで説明のとおり)。
次の例では、フィールド区切り文字としてセミコロン(コンマではなく)を使用して、データを非圧縮形式で CSV ファイルに保存するように
DataFrameWriter
オブジェクトを設定します。dfWriter = dfWriter.option("field_delimiter", ";").option("COMPRESSION", "NONE")
option
メソッドは、指定されたオプションで構成されたDataFrameWriter
オブジェクトを返します。複数のオプションを設定するには、
option
メソッドに 呼び出しをチェーン する(上記の例に示すように)か、 DataFrameWriter.options メソッドを呼び出し、オプションの名前と値のMap
で渡します。保存された各ファイルの詳細を返すには、
DETAILED_OUTPUT
コピーオプション をTRUE
に設定します。デフォルトでは、
DETAILED_OUTPUT
はFALSE
です。これは、メソッドがフィールド"rows_unloaded"
、"input_bytes"
、および"output_bytes"
を含む単一行の出力を返すことを意味します。DETAILED_OUTPUT
をTRUE
に設定すると、メソッドは保存されたファイルごとに出力の行を返します。各行には、フィールドFILE_NAME
、FILE_SIZE
、およびROW_COUNT
が含まれています。ファイルの形式に対応するメソッドを呼び出して、データをファイルに保存します。次のいずれかのメソッドを呼び出すことができます。
これらのメソッドを呼び出すときは、データを書き込む必要のあるファイルのステージ位置を渡します(例:
@mystage
)。ファイルに別のプレフィックスを付けて名前を付ける場合は、ステージ名の後にプレフィックスを指定します。デフォルトでは、このメソッドはプレフィックスが
data_
のファイル名にデータを保存します(例:@mystage/data_0_0_0.csv
)。たとえば、val writeFileResult = dfWriter.csv("@mystage/saved_data")
この例では、 DataFrame の内容をプレフィックス
saved_data
で始まるファイル(例:@mystage/saved_data_0_0_0.csv
)に保存します。ファイルに書き込まれるデータ量に関する情報については、返された WriteFileResult オブジェクトを確認します。
WriteFileResult
オブジェクトから、 COPY INTO <場所> コマンドによって生成された出力にアクセスできます。Row オブジェクトの配列として出力の行にアクセスするには、
rows
値のメンバーを使用します。行に存在するフィールドを判別するには、行のフィールドを記述する StructType である
schema
値のメンバーを使用します。
たとえば、出力行のフィールドと値の名前を出力するには、次のようにします。
val writeFileResult = dfWriter.csv("@mystage/saved_data") for ((row, index) <- writeFileResult.rows.zipWithIndex) { (writeFileResult.schema.fields, writeFileResult.rows(index).toSeq).zipped.foreach { (structField, element) => println(s"${structField.name}: $element") } }
次の例では、 DataFrame を使用して、 car_sales
という名前のテーブルの内容をステージ @mystage
でプレフィックス saved_data
が付いた JSON ファイルに保存します(例: @mystage/saved_data_0_0_0.json
)。たとえば、
ファイルがステージ上にすでに存在する場合は、ファイルを上書きします。
保存操作に関する詳細な出力を返します。
データを非圧縮で保存します。
最後に、サンプルコードは、返された出力行の各フィールドと値を出力します。
val df = session.table("car_sales")
val writeFileResult = df.write.mode(SaveMode.Overwrite).option("DETAILED_OUTPUT", "TRUE").option("compression", "none").json("@mystage/saved_data")
for ((row, index) <- writeFileResult.rows.zipWithIndex) {
println(s"Row: $index")
(writeFileResult.schema.fields, writeFileResult.rows(index).toSeq).zipped.foreach {
(structField, element) => println(s"${structField.name}: $element")
}
}
半構造化データの操作¶
DataFrame を使用すると、 半構造化データ (例: JSON データ)へのクエリとアクセスができます。次のセクションでは、 DataFrame 内の半構造化データの操作方法について説明します。
注釈
これらのセクションの例では、 例で使用されるサンプルデータ のサンプルデータを使用しています。
半構造化データの走査¶
半構造化データの特定のフィールドまたは要素を参照するには、 Column オブジェクトの次のメソッドを使用します。
Column.apply("<フィールド名>") を使用して、 OBJECT (または OBJECT を含む VARIANT)のフィールドの
Column
オブジェクトを返します。Column.apply(<インデックス>) を使用して、 ARRAY (または ARRAY を含む VARIANT)の要素の
Column
オブジェクトを返します。
注釈
パス内のフィールド名または要素が不規則であり、 Column.apply
メソッドの使用が困難な場合は、 get、 get_ignore_case、または get_path を代替として使用できます。
apply メソッドを使用した列の参照 で説明したように、メソッド名 apply
は省略できます。
col("column_name")("field_name")
col("column_name")(index)
たとえば、次のコードは、 サンプルデータ の src
列にあるオブジェクトの dealership
フィールドを選択します。
val df = session.table("car_sales")
df.select(col("src")("dealership")).show()
このコードは、次を出力します。
----------------------------
|"""SRC""['DEALERSHIP']" |
----------------------------
|"Valley View Auto Sales" |
|"Tindel Toyota" |
----------------------------
注釈
DataFrame の値は文字列リテラルとして返されるため、二重引用符で囲まれています。これらの値を特定の型にキャストするには、 半構造化データへの明示的な値のキャスト をご参照ください。
メソッド呼び出しのチェーン を使用して、特定のフィールドまたは要素へのパスを走査することもできます。
たとえば、次のコードは salesperson
オブジェクトの name
フィールドを選択します。
val df = session.table("car_sales")
df.select(col("src")("salesperson")("name")).show()
このコードは、次を出力します。
------------------------------------
|"""SRC""['SALESPERSON']['NAME']" |
------------------------------------
|"Frank Beasley" |
|"Greg Northrup" |
------------------------------------
別の例として、次のコードは、車両の配列を保持する vehicle
フィールドの最初の要素を選択します。この例では、最初の要素から price
フィールドも選択しています。
val df = session.table("car_sales")
df.select(col("src")("vehicle")(0)).show()
df.select(col("src")("vehicle")(0)("price")).show()
このコードは、次を出力します。
---------------------------
|"""SRC""['VEHICLE'][0]" |
---------------------------
|{ |
| "extras": [ |
| "ext warranty", |
| "paint protection" |
| ], |
| "make": "Honda", |
| "model": "Civic", |
| "price": "20275", |
| "year": "2017" |
|} |
|{ |
| "extras": [ |
| "ext warranty", |
| "rust proofing", |
| "fabric protection" |
| ], |
| "make": "Toyota", |
| "model": "Camry", |
| "price": "23500", |
| "year": "2017" |
|} |
---------------------------
------------------------------------
|"""SRC""['VEHICLE'][0]['PRICE']" |
------------------------------------
|"20275" |
|"23500" |
------------------------------------
パス内のフィールド名または要素が不規則なために Column.apply
メソッドの使用が困難な場合は、 apply
メソッドの代わりに、 get、 get_ignore_case、または get_path 関数を使用できます。
たとえば、次のコード行は両方とも、オブジェクトの指定されたフィールドの値を出力します。
df.select(get(col("src"), lit("dealership"))).show()
df.select(col("src")("dealership")).show()
同様に、次のコード行は両方とも、オブジェクトの指定されたパスにあるフィールドの値を出力します。
df.select(get_path(col("src"), lit("vehicle[0].make"))).show()
df.select(col("src")("vehicle")(0)("make")).show()
半構造化データへの明示的な値のキャスト¶
デフォルトでは、上記の例に示すように、フィールドと要素の値は文字列リテラル(二重引用符を含む)として返されます。
予期しない結果を回避するには、 キャスト メソッドを呼び出して、値を特定の型にキャストします。たとえば、次のコードは、キャストなしとキャストありの値を出力します。
// Import the objects for the data types, including StringType.
import com.snowflake.snowpark.types._
...
val df = session.table("car_sales")
df.select(col("src")("salesperson")("id")).show()
df.select(col("src")("salesperson")("id").cast(StringType)).show()
このコードは、次を出力します。
----------------------------------
|"""SRC""['SALESPERSON']['ID']" |
----------------------------------
|"55" |
|"274" |
----------------------------------
---------------------------------------------------
|"CAST (""SRC""['SALESPERSON']['ID'] AS STRING)" |
---------------------------------------------------
|55 |
|274 |
---------------------------------------------------
オブジェクト配列の行へのフラット化¶
半構造化データを DataFrame に「フラット化」する必要がある場合(例: 配列内にあるすべてのオブジェクトの行を生成する場合)は、 DataFrame.flatten メソッドを呼び出します。このメソッドは、 FLATTEN SQL 関数と同等です。オブジェクトまたは配列へのパスを渡すと、メソッドは、各フィールドの行か、オブジェクトまたは配列の要素の行を含む DataFrame を返します。
たとえば、 サンプルデータ では、 src:customer
は顧客に関する情報を含むオブジェクトの配列です。各オブジェクトには、 name
および address
フィールドが含まれています。
このパスを flatten
関数に渡すと、次のようになります。
val df = session.table("car_sales")
df.flatten(col("src")("customer")).show()
メソッドは DataFrame を返します。
----------------------------------------------------------------------------------------------------------------------------------------------------------
|"SRC" |"SEQ" |"KEY" |"PATH" |"INDEX" |"VALUE" |"THIS" |
----------------------------------------------------------------------------------------------------------------------------------------------------------
|{ |1 |NULL |[0] |0 |{ |[ |
| "customer": [ | | | | | "address": "San Francisco, CA", | { |
| { | | | | | "name": "Joyce Ridgely", | "address": "San Francisco, CA", |
| "address": "San Francisco, CA", | | | | | "phone": "16504378889" | "name": "Joyce Ridgely", |
| "name": "Joyce Ridgely", | | | | |} | "phone": "16504378889" |
| "phone": "16504378889" | | | | | | } |
| } | | | | | |] |
| ], | | | | | | |
| "date": "2017-04-28", | | | | | | |
| "dealership": "Valley View Auto Sales", | | | | | | |
| "salesperson": { | | | | | | |
| "id": "55", | | | | | | |
| "name": "Frank Beasley" | | | | | | |
| }, | | | | | | |
| "vehicle": [ | | | | | | |
| { | | | | | | |
| "extras": [ | | | | | | |
| "ext warranty", | | | | | | |
| "paint protection" | | | | | | |
| ], | | | | | | |
| "make": "Honda", | | | | | | |
| "model": "Civic", | | | | | | |
| "price": "20275", | | | | | | |
| "year": "2017" | | | | | | |
| } | | | | | | |
| ] | | | | | | |
|} | | | | | | |
|{ |2 |NULL |[0] |0 |{ |[ |
| "customer": [ | | | | | "address": "New York, NY", | { |
| { | | | | | "name": "Bradley Greenbloom", | "address": "New York, NY", |
| "address": "New York, NY", | | | | | "phone": "12127593751" | "name": "Bradley Greenbloom", |
| "name": "Bradley Greenbloom", | | | | |} | "phone": "12127593751" |
| "phone": "12127593751" | | | | | | } |
| } | | | | | |] |
| ], | | | | | | |
| "date": "2017-04-28", | | | | | | |
| "dealership": "Tindel Toyota", | | | | | | |
| "salesperson": { | | | | | | |
| "id": "274", | | | | | | |
| "name": "Greg Northrup" | | | | | | |
| }, | | | | | | |
| "vehicle": [ | | | | | | |
| { | | | | | | |
| "extras": [ | | | | | | |
| "ext warranty", | | | | | | |
| "rust proofing", | | | | | | |
| "fabric protection" | | | | | | |
| ], | | | | | | |
| "make": "Toyota", | | | | | | |
| "model": "Camry", | | | | | | |
| "price": "23500", | | | | | | |
| "year": "2017" | | | | | | |
| } | | | | | | |
| ] | | | | | | |
|} | | | | | | |
----------------------------------------------------------------------------------------------------------------------------------------------------------
この DataFrame から、 VALUE
フィールドにある各オブジェクトからの name
フィールドと address
フィールドを選択できます。
df.flatten(col("src")("customer")).select(col("value")("name"), col("value")("address")).show()
-------------------------------------------------
|"""VALUE""['NAME']" |"""VALUE""['ADDRESS']" |
-------------------------------------------------
|"Joyce Ridgely" |"San Francisco, CA" |
|"Bradley Greenbloom" |"New York, NY" |
-------------------------------------------------
次のコードは、 特定の型に値をキャスト し、列の名前を変更することにより、前の例に追加します。
df.flatten(col("src")("customer")).select(col("value")("name").cast(StringType).as("Customer Name"), col("value")("address").cast(StringType).as("Customer Address")).show()
-------------------------------------------
|"Customer Name" |"Customer Address" |
-------------------------------------------
|Joyce Ridgely |San Francisco, CA |
|Bradley Greenbloom |New York, NY |
-------------------------------------------
SQL ステートメントの実行¶
指定した SQL ステートメントを実行するには、 Session
クラスの sql
メソッドを呼び出し、実行するステートメントを渡します。メソッドは DataFrame を返します。
アクションメソッドの呼び出し があるまで、 SQL ステートメントは実行されないことに注意してください。
// Get the list of the files in a stage.
// The collect() method causes this SQL statement to be executed.
val dfStageFiles = session.sql("ls @myStage")
val files = dfStageFiles.collect()
files.foreach(println)
// Resume the operation of a warehouse.
// Note that you must call the collect method in order to execute
// the SQL statement.
session.sql("alter warehouse if exists myWarehouse resume if suspended").collect()
val tableDf = session.table("table").select(col("a"), col("b"))
// Get the count of rows from the table.
val numRows = tableDf.count()
println("Count: " + numRows);
メソッドを呼び出して DataFrame を変換 する場合(例: フィルター、選択など)、これらのメソッドは、基になる SQL ステートメントが SELECT ステートメントである場合にのみ機能することに注意してください。変換メソッドは、他の種類の SQL ステートメントではサポートされていません。
val df = session.sql("select id, category_id, name from sample_product_data where id > 10")
// Because the underlying SQL statement for the DataFrame is a SELECT statement,
// you can call the filter method to transform this DataFrame.
val results = df.filter(col("category_id") < 10).select(col("id")).collect()
results.foreach(println)
// In this example, the underlying SQL statement is not a SELECT statement.
val dfStageFiles = session.sql("ls @myStage")
// Calling the filter method results in an error.
dfStageFiles.filter(...)