Snowpark Scalaでの DataFrames の操作

Snowpark内でデータをクエリして処理する主な方法は、 DataFrame を使用することです。このトピックでは、 DataFrames の操作方法について説明します。

このトピックの内容:

データの取得と操作には、 DataFrame クラスを使用します。DataFrame は、遅延評価されるリレーショナルデータセットを表します。これは、特定のアクションがトリガーされたときにのみ実行されます。ある意味、 DataFrame は、データを取得するために評価する必要があるクエリのようなものです。

データを DataFrame に取得するには、

  1. DataFrame を作成し、データセットのためにデータのソースを指定 します。

    たとえば、テーブル、外部 CSV ファイル、または SQL ステートメントの実行からのデータを保持する DataFrame を作成できます。

  2. DataFrame のデータセットを変換する方法を指定 します。

    たとえば、どの列を選択するか、行をどのようにフィルタリングするか、結果をどのようにソートおよびグループ化するかなどを指定できます。

  3. ステートメントを実行して、データをDataFrame に取得 します。

    データを DataFrame に取得するには、アクションを実行するメソッド(例: collect() メソッド)を呼び出す必要があります。

次のセクションでは、これらのステップについて詳しく説明します。

このセクションの例の設定

このセクションの例のいくつかは、 DataFrame を使用して sample_product_data という名前のテーブルをクエリします。これらの例を実行する場合は、次の SQL ステートメントを実行することにより、このテーブルを作成し、テーブルにデータを入力できます。

CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT);
INSERT INTO sample_product_data VALUES
    (1, 0, 5, 'Product 1', 'prod-1', 1, 10),
    (2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20),
    (3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30),
    (4, 0, 10, 'Product 2', 'prod-2', 2, 40),
    (5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50),
    (6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60),
    (7, 0, 20, 'Product 3', 'prod-3', 3, 70),
    (8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80),
    (9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90),
    (10, 0, 50, 'Product 4', 'prod-4', 4, 100),
    (11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100),
    (12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100);
Copy

テーブルが作成されたことを確認するには、次のコマンドを実行します。

SELECT * FROM sample_product_data;
Copy

DataFrame の構築

DataFrame を構築するには、 Session クラスのメソッドを使用できます。次の各メソッドは、異なるタイプのデータソースから DataFrame を構築します。

  • テーブル、ビュー、またはストリームのデータから DataFrame を作成するには、 table メソッドを呼び出します。

    // Create a DataFrame from the data in the "sample_product_data" table.
    val dfTable = session.table("sample_product_data")
    
    // To print out the first 10 rows, call:
    //   dfTable.show()
    
    Copy

    注釈

    session.table メソッドは Updatable オブジェクトを返します。 UpdatableDataFrame を拡張し、テーブル内のデータを操作するための追加のメソッド(例: データを更新および削除するためのメソッド)を提供します。 テーブル内の行の更新、削除、およびマージ をご参照ください。

  • 一連の値から DataFrame を作成するには、 createDataFrame メソッドを呼び出します。

    // Create a DataFrame containing a sequence of values.
    // In the DataFrame, name the columns "i" and "s".
    val dfSeq = session.createDataFrame(Seq((1, "one"), (2, "two"))).toDF("i", "s")
    
    Copy

    注釈

    Snowflakeによって予約された単語は、 DataFrame を構築するときの列名としては無効です。予約された単語のリストについては、 予約済みおよび限定キーワード をご参照ください。

  • 値の範囲を含む DataFrame を作成するには、 range メソッドを呼び出します。

    // Create a DataFrame from a range
    val dfRange = session.range(1, 10, 2)
    
    Copy
  • ステージにファイルの DataFrame を作成する には、 read を呼び出して DataFrameReader オブジェクトを取得します。 DataFrameReader オブジェクトで、ファイル内のデータの形式に対応するメソッドを呼び出します。

    // Create a DataFrame from data in a stage.
    val dfJson = session.read.json("@mystage2/data1.json")
    
    Copy
  • SQL クエリの結果を保持する DataFrame を作成するには、 sql メソッドを呼び出します。

    // Create a DataFrame from a SQL query
    val dfSql = session.sql("SELECT name from products")
    
    Copy

    注: このメソッドを使用すると、テーブルおよびステージングされたファイルからデータを取得する SELECT ステートメントを実行できますが、そうではなく、 table および read メソッドを使用するようにします。 tableread のようなメソッドは、開発ツールでより優れた構文の強調表示、エラーの強調表示、およびインテリジェントなコード補完を提供できます。

データセットの変換方法の指定

選択する列と、結果のフィルタリング、並べ替え、グループ化などを指定するには、データセットを変換する DataFrame メソッドを呼び出します。これらのメソッドで列を識別するには、 col 関数または列に評価される式を使用します。(列と式の指定 をご参照ください。)

例:

  • 返される行を指定するには、 filter メソッドを呼び出します。

    // Import the col function from the functions object.
    import com.snowflake.snowpark.functions._
    
    // Create a DataFrame for the rows with the ID 1
    // in the "sample_product_data" table.
    //
    // This example uses the === operator of the Column object to perform an
    // equality check.
    val df = session.table("sample_product_data").filter(col("id") === 1)
    df.show()
    
    Copy
  • 選択する列を指定するには、 select メソッドを呼び出します。

    // Import the col function from the functions object.
    import com.snowflake.snowpark.functions._
    
    // Create a DataFrame that contains the id, name, and serial_number
    // columns in te "sample_product_data" table.
    val df = session.table("sample_product_data").select(col("id"), col("name"), col("serial_number"))
    df.show()
    
    Copy

各メソッドは、変換された新しい DataFrame オブジェクトを返します。(メソッドは、元の DataFrame オブジェクトには影響しません。)つまり、複数の変換を適用する場合は、 チェーンメソッド呼び出し を実行して、前のメソッド呼び出しによって返された、新しい DataFrame オブジェクトに対する後続の各変換メソッドを呼び出すことができます。

これらの変換方法は、Snowflakeデータベースからデータを取得しないことに注意してください。(DataFrame を評価するアクションの実行 で説明されているアクションメソッドは、データ取得を実行します。)変換メソッドは、 SQL ステートメントの作成方法を指定するだけです。

列と式の指定

これらの変換メソッドを呼び出すときは、列または列を使用する式を指定する必要がある場合があります。たとえば、 select メソッドを呼び出すときは、選択する必要のある列を指定する必要があります。

列を参照するには、 com.snowflake.snowpark.functions オブジェクトで col 関数を呼び出して、 Column オブジェクトを作成します。

// Import the col function from the functions object.
import com.snowflake.snowpark.functions._

val dfProductInfo = session.table("sample_product_data").select(col("id"), col("name"))
dfProductInfo.show()
Copy

注釈

リテラルの Column オブジェクトを作成するには、 列オブジェクトとしてのリテラルの使用 をご参照ください。

フィルター、プロジェクション、結合条件などを指定する場合は、式で Column オブジェクトを使用できます。たとえば、

  • filter メソッドで Column オブジェクトを使用して、フィルター条件を指定できます。

    // Specify the equivalent of "WHERE id = 20"
    // in an SQL SELECT statement.
    df.filter(col("id") === 20)
    
    Copy
    // Specify the equivalent of "WHERE a + b < 10"
    // in an SQL SELECT statement.
    df.filter((col("a") + col("b")) < 10)
    
    Copy
  • select メソッドで Column オブジェクトを使用して、エイリアスを定義できます。

    // Specify the equivalent of "SELECT b * 10 AS c"
    // in an SQL SELECT statement.
    df.select((col("b") * 10) as "c")
    
    Copy
  • join メソッドで Column オブジェクトを使用して、結合条件を定義できます。

    // Specify the equivalent of "X JOIN Y on X.a_in_X = Y.b_in_Y"
    // in an SQL SELECT statement.
    dfX.join(dfY, col("a_in_X") === col("b_in_Y"))
    
    Copy

異なる DataFrames の列の参照

同じ名前の2つの異なる DataFrame オブジェクトの列を参照する場合(例: その列の DataFrames を結合する場合)、1つの DataFrame オブジェクトにある DataFrame.col メソッドを使用して、そのオブジェクトにある列(例: df1.col("name")df2.col("name"))を参照できます。

次の例は、 DataFrame.col メソッドを使用して特定の DataFrame の列を参照する方法を示しています。この例では、 key という名前の列がある2つの DataFrame オブジェクトの両方を結合します。この例では、 Column.as メソッドを使用して、新しく作成された DataFrame の列の名前を変更します。

// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.col method to refer to the columns used in the join.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("key") === dfRhs.col("key")).select(dfLhs.col("value").as("L"), dfRhs.col("value").as("R"))
Copy

apply メソッドを使用した列の参照

DataFrame.col メソッドの代わりに、 DataFrame.apply メソッドを使用して特定の DataFrame の列を参照できます。 DataFrame.col メソッドと同様に、 DataFrame.apply メソッドは入力として列名を受け入れ、 Column オブジェクトを返します。

オブジェクトでScalaに apply メソッドがある場合は、関数であるかのようにオブジェクトを呼び出すことで apply メソッドを呼び出すことができます。たとえば、 df.apply("column_name") を呼び出すには、単純に df("column_name") を記述します。次の呼び出しは同等です。

  • df.col("<列名>")

  • df.apply("<列名>")

  • df("<列名>")

次の例は前の例と同じですが、結合操作で列を参照するために DataFrame.apply メソッドを使用します。

// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.apply method to refer to the columns used in the join.
// Note that dfLhs("key") is shorthand for dfLhs.apply("key").
val dfJoined = dfLhs.join(dfRhs, dfLhs("key") === dfRhs("key")).select(dfLhs("value").as("L"), dfRhs("value").as("R"))
Copy

列オブジェクトの短縮形の使用

col 関数を使用する代わりに、次のいずれかの方法で列を参照できます。

  • 引用符で囲まれた列名($"column_name")の前にドル記号を使用する。

  • 引用符で囲まれていない列名('column_name)の前にアポストロフィ(一重引用符)を使用する。

これを実行するには、 Session オブジェクトを作成した後、 implicits オブジェクトから名前をインポートします。

val session = Session.builder.configFile("/path/to/properties").create

// Import this after you create the session.
import session.implicits._

// Use the $ (dollar sign) shorthand.
val df = session.table("T").filter($"id" === 10).filter(($"a" + $"b") < 10)

// Use ' (apostrophe) shorthand.
val df = session.table("T").filter('id === 10).filter(('a + 'b) < 10).select('b * 10)
Copy

オブジェクト識別子(テーブル名、列名など)の前後での二重引用符の使用

指定するデータベース、スキーマ、テーブル、およびステージの名前は、 Snowflake識別子の要件 に準拠している必要があります。名前を指定すると、Snowflakeはその名前を大文字と見なします。たとえば、次の呼び出しは同等です。

// The following calls are equivalent:
df.select(col("id123"))
df.select(col("ID123"))
Copy

名前が識別子の要件に準拠していない場合は、名前を二重引用符(")で囲む必要があります。バックスラッシュ(\)を使用して、Scala文字列リテラル内の二重引用符をエスケープします。たとえば、次のテーブル名は文字やアンダースコアで始まらないため、名前を二重引用符で囲む必要があります。

val df = session.table("\"10tablename\"")
Copy

の名前を指定するときは、名前を二重引用符で囲む必要はありません。名前が識別子の要件に準拠していない場合、Snowparkライブラリは、列名を自動的に二重引用符で囲みます。

// The following calls are equivalent:
df.select(col("3rdID"))
df.select(col("\"3rdID\""))

// The following calls are equivalent:
df.select(col("id with space"))
df.select(col("\"id with space\""))
Copy

すでに列名を二重引用符で囲んでいる場合、ライブラリは名前を二重引用符で囲みません。

場合によっては、列名に二重引用符が含まれることがあります。

describe table quoted;
+------------------------+ ...
| name                   | ...
|------------------------+ ...
| name_with_"air"_quotes | ...
| "column_name_quoted"   | ...
+------------------------+ ...
Copy

識別子の要件 で説明されているように、二重引用符で囲まれた識別子内の二重引用符文字ごとに、2つの二重引用符文字(例: "name_with_""air""_quotes""""column_name_quoted""")を使用する必要があります。

val dfTable = session.table("quoted")
dfTable.select("\"name_with_\"\"air\"\"_quotes\"").show()
dfTable.select("\"\"\"column_name_quoted\"\"\"").show()
Copy

識別子が二重引用符で囲まれている場合は(明示的に引用符を追加したか、ライブラリが引用符を追加したかに関係なく)、 Snowflakeは識別子を大文字と小文字を区別する ものとして扱います。

// The following calls are NOT equivalent!
// The Snowpark library adds double quotes around the column name,
// which makes Snowflake treat the column name as case-sensitive.
df.select(col("id with space"))
df.select(col("ID WITH SPACE"))
Copy

列オブジェクトとしてのリテラルの使用

Column オブジェクト内で渡すメソッドでリテラルを使用するには、リテラルを com.snowflake.snowpark.functions オブジェクトの lit 関数に渡して、リテラルの Column オブジェクトを作成します。たとえば、

// Import for the lit and col functions.
import com.snowflake.snowpark.functions._

// Show the first 10 rows in which num_items is greater than 5.
// Use `lit(5)` to create a Column object for the literal 5.
df.filter(col("num_items").gt(lit(5))).show()
Copy

リテラルがScalaの浮動小数点またはdouble値である場合(例: 0.05デフォルトでDoubleとして扱う)、Snowparkライブラリは SQL を生成し、対応するSnowparkデータ型に値を暗黙的にキャストします(例: 0.05::DOUBLE)。これにより、指定された正確な数とは異なる概算値が生成される可能性があります。

たとえば、次のコードは、フィルター(0.05 以上の値に一致する)が DataFrame の行に一致する必要がある場合でも、一致する行を表示しません。

// Create a DataFrame that contains the value 0.05.
val df = session.sql("select 0.05 :: Numeric(5, 2) as a")

// Applying this filter results in no matching rows in the DataFrame.
df.filter(col("a") <= lit(0.06) - lit(0.01)).show()
Copy

問題は、 lit(0.06)lit(0.01) が、正確な値ではなく、 0.060.01 の概算値を生成することです。

この問題を回避するには、次のいずれかの方法を使用できます。

  • オプション1: 使用する Snowparkの型にリテラルをキャスト する。たとえば、精度が5でスケールが2の NUMBER を使用するには、次のようにします。

    df.filter(col("a") <= lit(0.06).cast(new DecimalType(5, 2)) - lit(0.01).cast(new DecimalType(5, 2))).show()
    
    Copy
  • オプション2: 値を lit 関数に渡す前に、使用する型に値をキャストする。たとえば、 BigDecimal 型 を使用する場合は、

    df.filter(col("a") <= lit(BigDecimal(0.06)) - lit(BigDecimal(0.01))).show()
    
    Copy

列オブジェクトの特定型へのキャスト

Column オブジェクトを特定の型にキャストするには、 Column.cast メソッドを呼び出し、 com.snowflake.snowpark.types package から型オブジェクトを渡します。たとえば、リテラルを5の精度と2のスケールで NUMBER としてキャストするには、次のようにします。

// Import for the lit function.
import com.snowflake.snowpark.functions._
// Import for the DecimalType class..
import com.snowflake.snowpark.types._

val decimalValue = lit(0.05).cast(new DecimalType(5,2))
Copy

メソッド呼び出しのチェーン

DataFrame オブジェクトを変換するメソッド ごとに、変換が適用された新しい DataFrame オブジェクトが返されるため、 メソッド呼び出しのチェーン により、追加の方法で変換される新しい DataFrame を生成できます。

次の例では、次のように構成された DataFrame を返します。

  • sample_product_data テーブルをクエリします。

  • id = 1 で行を返します。

  • name および serial_number 列を選択します。

val dfProductInfo = session.table("sample_product_data").filter(col("id") === 1).select(col("name"), col("serial_number"))
dfProductInfo.show()
Copy

この例では、

  • session.table("sample_product_data") は、 sample_product_data テーブルの DataFrame を返します。

    DataFrame にはまだテーブルのデータが含まれていませんが、オブジェクトにはテーブルの列の定義が含まれています。

  • filter(col("id") === 1) は、 id = 1 で行を返すように設定された sample_product_data テーブルの DataFrame を返します。

    DataFrame には、テーブルからの一致する行がまだ含まれていないことに改めて注意してください。 アクションメソッドを呼び出す まで、一致する行は取得されません。

  • select(col("name"), col("serial_number")) は、 id = 1 を持つ sample_product_data テーブルの行の name 列と serial_number 列を含む DataFrame を返します。

メソッド呼び出しをチェーンするときは、呼び出しの順序が重要であることに注意してください。各メソッド呼び出しは、変換された DataFrame を返します。後続の呼び出しが変換された DataFrame で作動することを確認します。

たとえば、次のコードでは、 select メソッドは nameserial_number の2つの列のみを含む DataFrame を返します。この DataFrame の filter メソッド呼び出しは、変換された DataFrame にはない id 列を使用しているため失敗します。

// This fails with the error "invalid identifier 'ID'."
val dfProductInfo = session.table("sample_product_data").select(col("name"), col("serial_number")).filter(col("id") === 1)
Copy

対照的に、次のコードは、 sample_product_data テーブルのすべての列(id 列を含む)を含む DataFrame で filter() メソッドが呼び出されるため、正常に実行されます。

// This succeeds because the DataFrame returned by the table() method
// includes the "id" column.
val dfProductInfo = session.table("sample_product_data").filter(col("id") === 1).select(col("name"), col("serial_number"))
dfProductInfo.show()
Copy

SQL ステートメントで同等のキーワード(SELECT および WHERE)を使用する場合とは異なる順序で、 select および filter メソッドの呼び出しが必要になる場合があることに注意してください。

DataFrame の行数制限

DataFrame の行数を制限するには、 DataFrame.limit 変換メソッドを使用できます。

Snowpark API は、限られた数の行を取得して出力するためのアクションメソッドも提供します。

  • DataFrame.first アクションメソッド(クエリを実行して最初の n 行を返すため)

  • DataFrame.show アクションメソッド(クエリを実行し、最初の n 行を出力するため)

これらのメソッドは、実行される SQL ステートメントに LIMIT 句を効果的に追加します。

LIMIT の使用上の注意 で説明されているように、 LIMIT と組み合わせて並べ替え順序(ORDER BY)を指定する場合を除き、結果は非決定的です。

ORDER BY 句を LIMIT 句と一緒に保持するには(例: ORDER BY が別のサブクエリにないように)、 sort メソッドによって返される DataFrame の結果を制限するメソッドを呼び出す必要があります。

たとえば、 メソッドの呼び出しをチェーン する場合、

// Limit the number of rows to 5, sorted by parent_id.
var dfSubset = df.sort(col("parent_id")).limit(5);

// Return the first 5 rows, sorted by parent_id.
var arrayOfRows = df.sort(col("parent_id")).first(5)

// Print the first 5 rows, sorted by parent_id.
df.sort(col("parent_id")).show(5)
Copy

列定義の取得

DataFrame のためにデータセット内の列の定義を取得するには、 schema メソッドを呼び出します。このメソッドは、 StructField オブジェクトの Array を含む StructType オブジェクトを返します。各 StructField オブジェクトには、列の定義が含まれます。

// Get the StructType object that describes the columns in the
// underlying rowset.
val tableSchema = session.table("sample_product_data").schema
println("Schema for sample_product_data: " + tableSchema);
Copy

返された StructType オブジェクトでは、列名は常に正規化されています。引用符で囲まれていない識別子は大文字で返され、引用符で囲まれた識別子は定義された正確な大文字小文字で返されます。

次の例では、 ID および 3rd という名前の列を含む DataFrame を作成します。列名 3rd の場合、 名前が識別子の要件に準拠していない ため、Snowparkライブラリは名前を自動的に二重引用符("3rd")で囲みます。

この例では、 schema メソッドを呼び出してから、返された StructType オブジェクトで names メソッドを呼び出して、列名の ArraySeq を取得します。名前は、 schema メソッドによって返される StructType で正規化されます。

// Create a DataFrame containing the "id" and "3rd" columns.
val dfSelectedColumns = session.table("sample_product_data").select(col("id"), col("3rd"))
// Print out the names of the columns in the schema. This prints out:
//   ArraySeq(ID, "3rd")
println(dfSelectedColumns.schema.names.toSeq)
Copy

DataFrames の結合

DataFrame オブジェクトを結合するには、 DataFrame.join メソッドを呼び出します。

次のセクションでは、 DataFrames を使用して結合を実行する方法について説明します。

結合のサンプルデータの設定

次のセクションの例では、次の SQL ステートメントを実行して設定できるサンプルデータを使用しています。

create or replace table sample_a (
  id_a integer,
  name_a varchar,
  value integer
);
insert into sample_a (id_a, name_a, value) values
  (10, 'A1', 5),
  (40, 'A2', 10),
  (80, 'A3', 15),
  (90, 'A4', 20)
;
create or replace table sample_b (
  id_b integer,
  name_b varchar,
  id_a integer,
  value integer
);
insert into sample_b (id_b, name_b, id_a, value) values
  (4000, 'B1', 40, 10),
  (4001, 'B2', 10, 5),
  (9000, 'B3', 80, 15),
  (9099, 'B4', null, 200)
;
create or replace table sample_c (
  id_c integer,
  name_c varchar,
  id_a integer,
  id_b integer
);
insert into sample_c (id_c, name_c, id_a, id_b) values
  (1012, 'C1', 10, null),
  (1040, 'C2', 40, 4000),
  (1041, 'C3', 40, 4001)
;
Copy

結合の列の指定

DataFrame.join メソッドを使用すると、次のいずれかの方法で使用する列を指定できます。

  • 結合条件を説明する列式を指定します。

  • 結合の共通列として使用する1つ以上の列を指定します。

次の例では、 id_a という名前の列で内部結合を実行します。

// Create a DataFrame that joins the DataFrames for the tables
// "sample_a" and "sample_b" on the column named "id_a".
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"))
dfJoined.show()
Copy

この例では、 DataFrame.col メソッドを使用して、結合で使用する条件を指定します。このメソッドの詳細については、 列と式の指定 をご参照ください。

これにより、次が出力されます。

----------------------------------------------------------------------
|"ID_A"  |"NAME_A"  |"VALUE"  |"ID_B"  |"NAME_B"  |"ID_A"  |"VALUE"  |
----------------------------------------------------------------------
|10      |A1        |5        |4001    |B2        |10      |5        |
|40      |A2        |10       |4000    |B1        |40      |10       |
|80      |A3        |15       |9000    |B3        |80      |15       |
----------------------------------------------------------------------
Copy
結合の結果で重複する同一の列名

結合の結果の DataFrame では、テーブル間で列名が同一であっても、Snowparkライブラリは結合されたテーブルで見つかった列名を使用します。この場合、これらの列名は、結合の結果である DataFrame で重複しています。重複する列に名前でアクセスするには、列の元のテーブルを表す DataFrame で col メソッドを呼び出します。(列の指定の詳細については、 異なる DataFrames の列の参照 をご参照ください)。

次の例のコードは、2つの DataFrames を結合し、結合された DataFrame で select メソッドを呼び出します。これは、それぞれの DataFrame オブジェクト(dfRhs および dfLhs)を表す変数から col メソッドを呼び出して、選択する列を指定します。これは、 as メソッドを使用して、 select メソッドが作成する DataFrame 内の列に新しい名前を付けます。

val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"))
val dfSelected = dfJoined.select(dfLhs.col("value").as("LeftValue"), dfRhs.col("value").as("RightValue"))
dfSelected.show()
Copy

これにより、次が出力されます。

------------------------------
|"LEFTVALUE"  |"RIGHTVALUE"  |
------------------------------
|5            |5             |
|10           |10            |
|15           |15            |
------------------------------
Copy
保存またはキャッシュする前に重複する列

結合の結果の DataFrame に重複する列名が含まれる場合は、結果をテーブルに保存するか DataFrame をキャッシュする前に、重複を排除するか列の名前を変更して DataFrame の重複をなくす必要があります。テーブルまたはキャッシュに保存する DataFrame 内の重複する列名の場合、Snowparkライブラリは、重複しないようにするために、重複する列名をエイリアスに置き換えます。

次の例は、列名 ID_AVALUE が2つのテーブルからの結合で重複し、結果をキャッシュする前に重複を排除するか名前を変更しない場合に、キャッシュされた DataFrame の出力がどのように表示されるかを示しています。

--------------------------------------------------------------------------------------------------
|"l_ZSz7_ID_A"  |"NAME_A"  |"l_ZSz7_VALUE"  |"ID_B"  |"NAME_B"  |"r_heec_ID_A"  |"r_heec_VALUE"  |
--------------------------------------------------------------------------------------------------
|10             |A1        |5               |4001    |B2        |10             |5               |
|40             |A2        |10              |4000    |B1        |40             |10              |
|80             |A3        |15              |9000    |B3        |80             |15              |
--------------------------------------------------------------------------------------------------
Copy

自然結合の実行

自然結合 (DataFrames は同じ名前の列で結合)を実行するには、 DataFrame.naturalJoin メソッドを呼び出します。

次の例では、テーブル sample_asample_b の DataFrames を共通の列(列 id_a)で結合します。

val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfJoined = dfLhs.naturalJoin(dfRhs)
dfJoined.show()
Copy

これにより、次が出力されます。

---------------------------------------------------
|"ID_A"  |"VALUE"  |"NAME_A"  |"ID_B"  |"NAME_B"  |
---------------------------------------------------
|10      |5        |A1        |4001    |B2        |
|40      |10       |A2        |4000    |B1        |
|80      |15       |A3        |9000    |B3        |
---------------------------------------------------
Copy

統合の型の指定

デフォルトでは、 DataFrame.join メソッドは内部結合を作成します。別の型の結合を指定するには、 joinType 引数を次のいずれかの値に設定します。

結合の型

joinType

内部結合

inner (デフォルト)

左外部結合

left

右外部結合

right

完全外部結合

full

クロス結合

cross

例:

// Create a DataFrame that performs a left outer join on
// "sample_a" and "sample_b" on the column named "id_a".
val dfLhs = session.table("sample_a")
val dfRhs = session.table("sample_b")
val dfLeftOuterJoin = dfLhs.join(dfRhs, dfLhs.col("id_a") === dfRhs.col("id_a"), "left")
dfLeftOuterJoin.show()
Copy

これにより、次が出力されます。

----------------------------------------------------------------------
|"ID_A"  |"NAME_A"  |"VALUE"  |"ID_B"  |"NAME_B"  |"ID_A"  |"VALUE"  |
----------------------------------------------------------------------
|40      |A2        |10       |4000    |B1        |40      |10       |
|10      |A1        |5        |4001    |B2        |10      |5        |
|80      |A3        |15       |9000    |B3        |80      |15       |
|90      |A4        |20       |NULL    |NULL      |NULL    |NULL     |
----------------------------------------------------------------------
Copy

複数のテーブルの結合

複数のテーブルを結合するには、

  1. テーブルごとに DataFrame を作成します。

  2. 最初の DataFrame で DataFrame.join メソッドを呼び出し、2番目の DataFrame を渡します。

  3. join メソッドによって返された DataFrame を使用して、 join メソッドを呼び出し、3番目の DataFrame を渡します。

以下に示すように、 join 呼び出しを チェーン することができます。

val dfFirst = session.table("sample_a")
val dfSecond  = session.table("sample_b")
val dfThird = session.table("sample_c")
val dfJoinThreeTables = dfFirst.join(dfSecond, dfFirst.col("id_a") === dfSecond.col("id_a")).join(dfThird, dfFirst.col("id_a") === dfThird.col("id_a"))
dfJoinThreeTables.show()
Copy

これにより、次が出力されます。

------------------------------------------------------------------------------------------------------------
|"ID_A"  |"NAME_A"  |"VALUE"  |"ID_B"  |"NAME_B"  |"ID_A"  |"VALUE"  |"ID_C"  |"NAME_C"  |"ID_A"  |"ID_B"  |
------------------------------------------------------------------------------------------------------------
|10      |A1        |5        |4001    |B2        |10      |5        |1012    |C1        |10      |NULL    |
|40      |A2        |10       |4000    |B1        |40      |10       |1040    |C2        |40      |4000    |
|40      |A2        |10       |4000    |B1        |40      |10       |1041    |C3        |40      |4001    |
------------------------------------------------------------------------------------------------------------
Copy

自己結合の実行

異なる列でテーブルをそれ自体と結合する必要がある場合は、単一の DataFrame で自己結合を実行することはできません。単一の DataFrame を使用して自己結合を実行する次の例は、 "id" の列式が結合の左側と右側に存在するため、失敗します。

// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
val df = session.table("sample_product_data");
val dfJoined = df.join(df, col("id") === col("parent_id"))
Copy
// This fails because columns named "id" and "parent_id"
// are in the left and right DataFrames in the join.
val df = session.table("sample_product_data");
val dfJoined = df.join(df, df("id") === df("parent_id"))
Copy

これらの例は両方とも、次の例外を除いて失敗します。

Exception in thread "main" com.snowflake.snowpark.SnowparkClientException:
  Joining a DataFrame to itself can lead to incorrect results due to ambiguity of column references.
  Instead, join this DataFrame to a clone() of itself.
Copy

代わりに、 DataFrame.clone メソッドを使用して DataFrame オブジェクトのクローンを作成し、2つの DataFrame オブジェクトを使用して結合を実行します。

// Create a DataFrame object for the "sample_product_data" table for the left-hand side of the join.
val dfLhs = session.table("sample_product_data")
// Clone the DataFrame object to use as the right-hand side of the join.
val dfRhs = dfLhs.clone()

// Create a DataFrame that joins the two DataFrames
// for the "sample_product_data" table on the
// "id" and "parent_id" columns.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id") === dfRhs.col("parent_id"))
dfJoined.show()
Copy

同じ列で自己結合を実行する場合は、 USING 句の列式の Seq を渡す join メソッドを呼び出します。

// Create a DataFrame that performs a self-join on a DataFrame
// using the column named "key".
val df = session.table("sample_product_data");
val dfJoined = df.join(df, Seq("key"))
Copy

DataFrame を評価するアクションの実行

前述のように、 DataFrame は遅延評価されます。つまり、アクションを実行するまで、 SQL ステートメントは実行のためにサーバーに送信されません。アクションにより、 DataFrame が評価され、対応する SQL ステートメントが実行のためにサーバーに送信されます。

次のセクションでは、 DataFrame で同期的および非同期的にアクションを実行する方法について説明します。

同期的なアクションの実行

アクションを同期的に実行するには、次のアクションメソッドのいずれかを呼び出します。

アクションを同期的に実行する方法

説明

DataFrame.collect

DataFrame を評価し、結果のデータセットを オブジェクトの Array として返します。 すべての行を返す をご参照ください。

DataFrame.toLocalIterator

DataFrame を評価し、 オブジェクトの 反復子 を返します。結果セットが大きい場合は、この方法を使用して、すべての結果を一度にメモリにロードすることを回避します。 行の反復子を返す をご参照ください。

DataFrame.count

DataFrame を評価し、行数を返します。

DataFrame.show

DataFrame を評価し、行をコンソールに表示します。このメソッドでは、行数は10行に制限されることに注意してください(デフォルト)。 DataFrame での行の出力 をご参照ください。

DataFrame.cacheResult

クエリを実行し、仮テーブルを作成して、テーブルに結果を配置します。このメソッドは、この仮テーブルのデータにアクセスするために使用できる HasCachedResult オブジェクトを返します。 DataFrame のキャッシング をご参照ください。

DataFrame.write.saveAsTable

DataFrame のデータを指定したテーブルに保存します。 テーブルへのデータの保存 をご参照ください。

DataFrame.write.(csv |json| parquet)

DataFrame をステージ上の指定されたファイルに保存します。 ステージにあるファイルへの DataFrame の保存 をご参照ください。

DataFrame.read.fileformat.copyInto('tableName')

DataFrame のデータを指定したテーブルにコピーします。 ファイルからテーブルへのデータのコピー をご参照ください。

Session.table('tableName').delete

指定されたテーブルの行を削除します。 テーブル内の行の更新、削除、およびマージ をご参照ください。

Session.table('tableName').update

指定されたテーブルの行を更新します。 テーブル内の行の更新、削除、およびマージ をご参照ください。

Session.table('tableName').merge.methods.collect

指定されたテーブルに行をマージします。 テーブル内の行の更新、削除、およびマージ をご参照ください。

クエリを実行して結果の数を返すには、 count メソッドを呼び出します。

// Create a DataFrame for the "sample_product_data" table.
val dfProducts = session.table("sample_product_data")

// Send the query to the server for execution and
// print the count of rows in the table.
println("Rows returned: " + dfProducts.count())
Copy

アクションメソッドを呼び出して、次を実行することもできます。

注: DataFrame の列の定義を取得するために schema メソッドを呼び出す場合は、アクションメソッドを呼び出す必要はありません。

非同期的なアクションの実行

注釈

この機能は、Snowpark 0.11.0で導入されました。

アクションを非同期で実行するには、 async メソッドを呼び出して「非同期アクター」オブジェクト(例: DataFrameAsyncActor)を返し、そのオブジェクトで非同期アクションメソッドを呼び出します。

非同期アクターオブジェクトのこれらのアクションメソッドは、 TypedAsyncJob オブジェクトを返します。これを使用して、非同期アクションのステータスを確認し、アクションの結果を取得できます。

次のセクションでは、アクションを非同期で実行して結果を確認する方法について説明します。

非同期アクションの基本的なフローの理解

次のメソッドを使用して、アクションを非同期で実行できます。

アクションを非同期的に実行する方法

説明

DataFrame.async.collect

DataFrame を非同期的に評価して、結果のデータセットを オブジェクトの Array として取得します。 すべての行を返す をご参照ください。

DataFrame.async.toLocalIterator

DataFrame を非同期的に評価して、 オブジェクトの 反復子 を取得します。結果セットが大きい場合は、この方法を使用して、すべての結果を一度にメモリにロードすることを回避します。 行の反復子を返す をご参照ください。

DataFrame.async.count

DataFrame を非同期的に評価して、行数を取得します。

DataFrame.write.async.saveAsTable

DataFrame のデータを指定したテーブルに非同期的に保存します。 テーブルへのデータの保存 をご参照ください。

DataFrame.write.async.(csv |json| parquet)

DataFrame をステージ上の指定されたファイルに保存します。 ステージにあるファイルへの DataFrame の保存 をご参照ください。

DataFrame.read.fileformat.async.copyInto('tableName')

DataFrame のデータを指定したテーブルに非同期的にコピーします。 ファイルからテーブルへのデータのコピー をご参照ください。

Session.table('tableName').async.delete

指定されたテーブルの行を非同期的に削除します。 テーブル内の行の更新、削除、およびマージ をご参照ください。

Session.table('tableName').async.update

指定されたテーブルの行を非同期的に更新します。 テーブル内の行の更新、削除、およびマージ をご参照ください。

Session.table('tableName').merge.methods.async.collect

非同期で指定されたテーブルに行をマージします。バージョン1.3.0以降でサポートされています。 テーブル内の行の更新、削除、およびマージ をご参照ください。

返された TypedAsyncJob オブジェクトから、次の操作を実行できます。

  • アクションが完了したかどうかを判断するには、 isDone メソッドを呼び出します。

  • アクションに対応するクエリ ID を取得するには、 getQueryId メソッドを呼び出します。

  • アクションの結果(例: collect メソッドに対する Row オブジェクトの Array、または count メソッドの行数)を返すには、 getResult メソッドを呼び出します。

    getResult はブロッキング呼び出しであることに注意してください。

  • アクションをキャンセルするには、 cancel メソッドを呼び出します。

たとえば、クエリを非同期で実行し、結果を Row オブジェクトの Array として取得するには、 DataFrame.async.collect を呼び出します。

// Create a DataFrame with the "id" and "name" columns from the "sample_product_data" table.
// This does not execute the query.
val df = session.table("sample_product_data").select(col("id"), col("name"))

// Execute the query asynchronously.
// This call does not block.
val asyncJob = df.async.collect()
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// Get an Array of Rows containing the results, and print the results.
// Note that getResult is a blocking call.
val results = asyncJob.getResult()
results.foreach(println)
Copy

クエリを非同期で実行し、結果の数を取得するには、 DataFrame.async.count を呼び出します。

// Create a DataFrame for the "sample_product_data" table.
val dfProducts = session.table("sample_product_data")

// Execute the query asynchronously.
// This call does not block.
val asyncJob = df.async.count()
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// Print the count of rows in the table.
// Note that getResult is a blocking call.
println("Rows returned: " + asyncJob.getResult())
Copy

待機する最大秒数の指定

getResult メソッドを呼び出すときは、 maxWaitTimeInSeconds 引数を使用して、クエリが完了するのを待ってから結果を取得するまでの最大秒数を指定できます。たとえば、

// Wait a maximum of 10 seconds for the query to complete before retrieving the results.
val results = asyncJob.getResult(10)
Copy

この引数を省略すると、メソッドは Snowparkがリクエストするタイムアウト(秒単位) の構成プロパティで指定された最大秒数で待機します。(これは、 セッションオブジェクトを作成する ときに設定できるプロパティです。)

ID による非同期クエリへのアクセス

以前に送信した非同期クエリのクエリ ID がある場合は、 Session.createAsyncJob メソッドを呼び出して、クエリのステータスの確認、クエリ結果の取得、クエリのキャンセルに使用できる AsyncJob オブジェクトの作成ができます。

TypedAsyncJob とは異なり、 AsyncJob は結果を取得するための getResult メソッドを提供しないことに注意してください。結果を取得する必要がある場合は、代わりに getRows または getIterator メソッドを呼び出します。

例:

val asyncJob = session.createAsyncJob(myQueryId)
// Check if the query has completed execution.
println(s"Is query ${asyncJob.getQueryId()} done? ${asyncJob.isDone()}")
// If you need to retrieve the results, call getRows to return an Array of Rows containing the results.
// Note that getRows is a blocking call.
val rows = asyncJob.getRows()
rows.foreach(println)
Copy

行の DataFrame への取得

DataFrame の変換方法を指定 した後、 アクションメソッドを呼び出して クエリを実行し、結果を返すことができます。 Array 内のすべての行を返すことも、行ごとに結果を反復処理できる 反復子 を返すこともできます。後者では、データ量が多い場合、大量のデータがメモリにロードされないように、行がチャンクごとにメモリにロードされます。

すべての行を返す

すべての行を一度に返すには、 DataFrame.collect メソッドを呼び出します。このメソッドは、 オブジェクトの配列を返します。行から値を取得するには、 getType メソッドを呼び出します(例: getStringgetInt など)。

例:

import com.snowflake.snowpark.functions_

val rows = session.table("sample_product_data").select(col("name"), col("category_id")).sort(col("name")).collect()
for (row <- rows) {
  println(s"Name: ${row.getString(0)}; Category ID: ${row.getInt(1)}")
}
Copy

行の反復子を返す

反復子 を使用して結果の オブジェクトを反復処理する場合は、 DataFrame.toLocalIterator を呼び出します。結果のデータ量が多い場合、メソッドは行をチャンクごとにロードして、すべての行を一度にメモリにロードしないようにします。

例:

import com.snowflake.snowpark.functions_

while (rowIterator.hasNext) {
  val row = rowIterator.next()
  println(s"Name: ${row.getString(0)}; Category ID: ${row.getInt(1)}")
}
Copy

最初の n 行を返す

最初の n 行を返すには、 DataFrame.first メソッドを呼び出し、返す行数を渡します。

DataFrame の行数制限 で説明されているように、結果は非決定的です。結果を決定的にする場合は、ソートされた DataFrame (df.sort().first())でこのメソッドを呼び出します。

例:

import com.snowflake.snowpark.functions_

val df = session.table("sample_product_data")
val rows = df.sort(col("name")).first(5)
rows.foreach(println)
Copy

DataFrame での行の出力

DataFrame の最初の10行をコンソールに出力するには、 DataFrame.show メソッドを呼び出します。別の行数を出力するには、出力する行数を渡します。

DataFrame の行数制限 で説明されているように、結果は非決定的です。結果を決定的にする場合は、ソートされた DataFrame (df.sort().show())でこのメソッドを呼び出します。

例:

import com.snowflake.snowpark.functions_

val df = session.table("sample_product_data")
df.sort(col("name")).show()
Copy

テーブル内の行の更新、削除、およびマージ

注釈

この機能は、Snowpark 0.7.0で導入されました。

Session.table を呼び出してテーブルの DataFrame オブジェクトを作成すると、メソッドは Updatable オブジェクトを返します。これは、テーブル内のデータを更新および削除するための追加のメソッドにより、 DataFrame を拡張します。(Updatable をご参照ください。)

テーブルの行を更新または削除する必要がある場合は、 Updatable クラスの次のメソッドを使用できます。

  • update を呼び出して、テーブル内の既存の行を更新します。 テーブルにある行の更新 をご参照ください。

  • delete を呼び出して、テーブルから行を削除します。 テーブル内にある行の削除 をご参照ください。

  • merge を呼び出して、2番目のテーブルまたはサブクエリのデータに基づき、1つのテーブルにある行を挿入、更新、および削除します。(SQL の 行のテーブルへのマージ コマンドと同等。) MERGE をご参照ください。

テーブルにある行の更新

update メソッドの場合は、更新する列とそれらの列に割り当てる対応する値を関連付ける、 Map で渡します。 update は、更新された行数を含む UpdateResult オブジェクトを返します。(UpdateResult をご参照ください。)

注釈

update は、 アクションメソッド です。これは、メソッドを呼び出すと、 SQL ステートメントがサーバーに送信されて実行されることを意味します。

たとえば、 count という名前の列の値を値 1 に置き換えるには、

val updatableDf = session.table("sample_product_data")
val updateResult = updatableDf.update(Map("count" -> lit(1)))
println(s"Number of rows updated: ${updateResult.rowsUpdated}")
Copy

上記の例では、列の名前を使用して列を識別しています。列式を使用することもできます。

val updateResult = updatableDf.update(Map(col("count") -> lit(1)))
Copy

条件が満たされたときにのみ更新する必要がある場合は、その条件を引数として指定できます。たとえば、 category_id 列の値が 20 である行の count という名前の列の値を置き換えるには、次のようにします。

val updateResult = updatableDf.update(Map(col("count") -> lit(1)), col("category_id") === 20)
Copy

別の DataFrame オブジェクトとの結合に基づいて条件を作成する必要がある場合は、その DataFrame を引数として渡し、その DataFrame を条件で使用できます。たとえば、 category_id 列が DataFrame dfPartscategory_id と一致する行の、 count という名前の列の値を置き換えるには、次のようにします。

val updatableDf = session.table("sample_product_data")
val dfParts = session.table("parts")
val updateResult = updatableDf.update(Map(col("count") -> lit(1)), updatableDf("category_id") === dfParts("category_id"), dfParts)
Copy

テーブル内にある行の削除

delete メソッドの場合は、削除する行を識別する条件を指定でき、その条件は別の DataFrame との結合に基づくことができます。 delete は、削除された行数を含む DeleteResult オブジェクトを返します。(DeleteResult をご参照ください。)

注釈

delete は、 アクションメソッド です。これは、メソッドを呼び出すと、 SQL ステートメントがサーバーに送信されて実行されることを意味します。

たとえば、 category_id 列の値が 1 の行を削除するには、次を実行します。

val updatableDf = session.table("sample_product_data")
val deleteResult = updatableDf.delete(updatableDf("category_id") === 1)
println(s"Number of rows deleted: ${deleteResult.rowsDeleted}")
Copy

条件が別の DataFrame の列を参照している場合は、その DataFrame を2番目の引数として渡します。たとえば、 category_id 列が DataFrame dfPartscategory_id と一致する行を削除するには、 dfParts を2番目の引数として渡します。

val updatableDf = session.table("sample_product_data")
val deleteResult = updatableDf.delete(updatableDf("category_id") === dfParts("category_id"), dfParts)
println(s"Number of rows deleted: ${deleteResult.rowsDeleted}")
Copy

行のテーブルへのマージ

2番目のテーブルまたはサブクエリの値に基づいて1つのテーブルの行を挿入、更新、および削除するには(SQL の MERGE コマンドに相当)、次の手順を実行します。

  1. データをマージするテーブルの Updatable オブジェクトで、 merge メソッドを呼び出し、他のテーブルの DataFrame オブジェクトと、結合条件の列式を渡します。

    これにより、一致する行と一致しない行に対して実行するアクション(例: 挿入、更新、削除)を指定するために使用できる、 MergeBuilder オブジェクトが返されます。(MergeBuilder をご参照ください。)

  2. MergeBuilder オブジェクトの使用。

    • 一致する行に対して実行する必要がある更新または削除を指定するには、 whenMatched メソッドを呼び出します。

      行の更新または削除が必要なときに追加の条件を指定する必要がある場合は、その条件の列式を渡すことができます。

      このメソッドは、実行するアクションを指定するために使用できる MatchedClauseBuilder オブジェクトを返します。(MatchedClauseBuilder をご参照ください。)

      MatchedClauseBuilder オブジェクトの update または delete メソッドを呼び出して、一致する行に対して実行する必要がある更新または削除アクションを指定します。これらのメソッドは、追加の句を指定するために使用できる MergeBuilder オブジェクトを返します。

    • 行が一致しない場合に実行する必要のある挿入を指定するには、 whenNotMatched メソッドを呼び出します。

      行を挿入する必要があるときに追加の条件を指定する必要がある場合は、その条件の列式を渡すことができます。

      このメソッドは、実行するアクションを指定するために使用できる NotMatchedClauseBuilder オブジェクトを返します。(NotMatchedClauseBuilder をご参照ください。)

      NotMatchedClauseBuilder オブジェクトの insert メソッドを呼び出して、行が一致しない場合に実行する必要のある挿入アクションを指定します。これらのメソッドは、追加の句を指定するために使用できる MergeBuilder オブジェクトを返します。

  3. 実行する必要のある挿入、更新、および削除の指定が完了したら、 MergeBuilder オブジェクトの collect メソッドを呼び出して、指定した挿入、更新、および削除をテーブルで実行します。

    collect は、挿入、更新、および削除された行数を含む MergeResult オブジェクトを返します。(MergeResult をご参照ください。)

次の例では、 target テーブルに一致する ID の行が含まれていない場合に、 source テーブルの id 列と value 列の行を target テーブルに挿入します。

val mergeResult = target.merge(source, target("id") === source("id"))
                      .whenNotMatched.insert(Seq(source("id"), source("value")))
                      .collect()
Copy

次の例では、同じ ID を持つ source テーブルにある行の value 列の値で、 target テーブルにある行を更新します。

val mergeResult = target.merge(source, target("id") === source("id"))
                      .whenMatched.update(Map("value" -> source("value")))
                      .collect()
Copy

テーブルへのデータの保存

DataFrame の内容を新規または既存のテーブルに保存できます。これには、次の権限が必要です。

  • テーブルが存在しない場合は、スキーマに対する CREATE TABLE 権限。

  • テーブルに対する INSERT 権限。

DataFrame の内容をテーブルに保存するには、

  1. DataFrame.write メソッドを呼び出し、 DataFrameWriter オブジェクトを取得します。

  2. DataFrameWriter.mode メソッドを呼び出し、テーブルへの書き込みの設定を指定する SaveMode オブジェクトを渡します。

    • 行を挿入するには、 SaveMode.Append を渡します。

    • 既存のテーブルを上書きするには、 SaveMode.Overwrite を渡します。

    このメソッドは、指定されたモードで構成された同じ DataFrameWriter オブジェクトを返します。

  3. 既存のテーブル(SaveMode.Append)に行を挿入していて、 DataFrame の列名がテーブルの列名と一致する場合は、 DataFrameWriter.option メソッドを呼び出し、 "columnOrder""name" を引数として渡します。

    注釈

    このメソッドはSnowpark 1.4.0で導入されました。

    デフォルトでは、 columnOrder オプションは "index" に設定されています。これは、 DataFrameWriter が列の表示される順序で値を挿入することを意味します。たとえば、 DataFrameWriter は、テーブルの最初の列にある DataFrame から最初の列の値を挿入し、テーブルの2番目の列にある DataFrame から2番目の列の値を挿入します。

    このメソッドは、指定されたオプションで構成された同じ DataFrameWriter オブジェクトを返します。

  4. DataFrameWriter.saveAsTable を呼び出し、 DataFrame の内容を指定されたテーブルに保存します。

    データをテーブルに保存する SQL ステートメントを実行するために、別のメソッド(例: collect)を呼び出す必要はありません。 saveAsTable は、 SQL ステートメントを実行する アクションメソッド です。

次の例では、既存のテーブル(tableName 変数で識別される)を DataFrame df のコンテンツで上書きします。

df.write.mode(SaveMode.Overwrite).saveAsTable(tableName)
Copy

次の例では、 DataFrame df から既存のテーブル(tableName 変数で識別される)に行を挿入します。この例では、テーブルと DataFrame の両方に列 c1c2 が含まれています。

この例では、 columnOrder オプションを "name" (DataFrame 列と同じ名前の値をテーブル列に挿入する)に設定することと、デフォルトの columnOrder オプション(DataFrame の列の順序に基づいて、テーブル列に値を挿入する)を使用することの違いを示しています。

val df = session.sql("SELECT 1 AS c2, 2 as c1")
// With the columnOrder option set to "name", the DataFrameWriter uses the column names
// and inserts a row with the values (2, 1).
df.write.mode(SaveMode.Append).option("columnOrder", "name").saveAsTable(tableName)
// With the default value of the columnOrder option ("index"), the DataFrameWriter the uses column positions
// and inserts a row with the values (1, 2).
df.write.mode(SaveMode.Append).saveAsTable(tableName)
Copy

DataFrame からのビューの作成

DataFrame からビューを作成するには、 DataFrame.createOrReplaceView メソッドを呼び出します。

df.createOrReplaceView("db.schema.viewName")
Copy

createOrReplaceView を呼び出すと、すぐに新しいビューが作成されることに注意してください。さらに重要なことに、 DataFrame が評価されることはありません。(アクションを実行 するまで、 DataFrame 自体は評価されません。)

createOrReplaceView を呼び出して作成したビューは永続的です。そのビューが不要になった場合は、 ビューを手動でドロップ できます。

セッション専用の仮ビューを作成する必要がある場合は、代わりに DataFrame.createOrReplaceTempView メソッドを呼び出します。

df.createOrReplaceTempView("db.schema.viewName")
Copy

DataFrame のキャッシング

場合によっては、複雑なクエリを実行し、(同じクエリを再度実行するのではなく)後続の操作で使用するために結果を保持する必要があります。このような場合は、 DataFrame.cacheResult メソッドを呼び出すことで、 DataFrame の内容をキャッシュできます。

このメソッドは、

  • クエリを実行します。

    cacheResult を呼び出す前に、 結果を取得するための別個のアクションメソッドを呼び出す 必要はありません。 cacheResult は、クエリを実行するアクションメソッドです。

  • 結果を仮テーブルに保存します

    cacheResult は、仮テーブルを作成するため、使用中のスキーマに対する CREATE TABLE 権限が必要です。

  • 仮テーブルの結果へのアクセスを提供する、 HasCachedResult オブジェクトを返します。

    HasCachedResult は、 DataFrame を拡張するため、このキャッシュされたデータに対して、 DataFrame で実行できるのと同じ操作のいくつかを実行できます。

注釈

cacheResult はクエリを実行して結果をテーブルに保存するため、このメソッドでは計算とストレージのコストが増加する可能性があります。

例:

import com.snowflake.snowpark.functions_

// Set up a DataFrame to query a table.
val df = session.table("sample_product_data").filter(col("category_id") > 10)
// Retrieve the results and cache the data.
val cachedDf = df.cacheResult()
// Create a DataFrame containing a subset of the cached data.
val dfSubset = cachedDf.filter(col("category_id") === lit(20)).select(col("name"), col("category_id"))
dfSubset.show()
Copy

このメソッドを呼び出しても、元の DataFrame は影響を受けないことに注意してください。たとえば、 dfTable がテーブル sample_product_data の DataFrame であるとします。

val dfTempTable = dfTable.cacheResult()
Copy

cacheResult を呼び出した後も、 dfTablesample_product_data テーブルを指しており、引き続き dfTable を使用してそのテーブルをクエリおよび更新できます。

一時テーブルにキャッシュされたデータを使用するには、 dfTempTablecacheResult によって返される HasCachedResult オブジェクト)を使用します。

ステージでのファイルの操作

Snowparkライブラリは、ステージにあるファイルを使用して、 Snowflakeにデータをロード し、 Snowflake からデータをアンロードするために使用できるクラスとメソッドを提供します。

注釈

これらのクラスとメソッドをステージで使用するには、 ステージを操作するための権限 が必要です。

次のセクションでは、これらのクラスとメソッドの使用方法について説明します。

ステージでのファイルのアップロードとダウンロード

ステージでファイルをアップロードおよびダウンロードするには、 FileOperation オブジェクトを使用します。

ステージへのファイルのアップロード

ステージにファイルをアップロードするには、

  1. ファイルをステージにアップロードするための権限 があることを確認します。

  2. Session.file を使用して、セッションの FileOperation オブジェクトにアクセスします。

  3. FileOperation.put メソッドを呼び出して、ファイルをステージにアップロードします。

    このメソッドは、 SQL PUT コマンドを実行します。

    • PUT コマンドに オプションのパラメーター を指定するには、パラメーターと値の Map を作成し、 Mapoptions 引数として渡します。たとえば、

      // Upload a file to a stage without compressing the file.
      val putOptions = Map("AUTO_COMPRESS" -> "FALSE")
      val putResults = session.file.put("file:///tmp/myfile.csv", "@myStage", putOptions)
      
      Copy
    • localFilePath 引数では、ワイルドカード(* および ?)を使用して、アップロードするファイルのセットを識別できます。たとえば、

      // Upload the CSV files in /tmp with names that start with "file".
      // You can use the wildcard characters "*" and "?" to match multiple files.
      val putResults = session.file.put("file:///tmp/file*.csv", "@myStage/prefix2")
      
      Copy
  4. put メソッドによって返された PutResult オブジェクトの Array をチェックして、ファイルが正常にアップロードされたかどうかを確認します。たとえば、そのファイルのファイル名と PUT 操作のステータスを出力するには、

    // Print the filename and the status of the PUT operation.
    putResults.foreach(r => println(s"  ${r.sourceFileName}: ${r.status}"))
    
    Copy

ステージからのファイルのダウンロード

ステージからファイルをダウンロードするには、

  1. ファイルをステージからダウンロードするための権限 があることを確認します。

  2. Session.file を使用して、セッションの FileOperation オブジェクトにアクセスします。

  3. FileOperation.get メソッドを呼び出して、ステージからファイルをダウンロードします。

    このメソッドは、 SQL GET コマンドを実行します。

    GET コマンドに オプションのパラメーター を指定するには、パラメーターと値の Map を作成し、 Mapoptions 引数として渡します。たとえば、

    // Download files with names that match a regular expression pattern.
    val getOptions = Map("PATTERN" -> s"'.*file_.*.csv.gz'")
    val getResults = session.file.get("@myStage", "file:///tmp", getOptions)
    
    Copy
  4. get メソッドによって返された GetResult オブジェクトの Array をチェックして、ファイルが正常にダウンロードされたかどうかを確認します。たとえば、そのファイルのファイル名と GET 操作のステータスを出力するには、

    // Print the filename and the status of the GET operation.
    getResults.foreach(r => println(s"  ${r.fileName}: ${r.status}"))
    
    Copy

ステージにデータをアップロードおよびダウンロードする際の入力ストリームの使用

注釈

この機能は、Snowpark 1.4.0で導入されました。

入力ストリームを使用してステージ上のファイルにデータをアップロードし、ステージ上のファイルからデータをダウンロードするには、 FileOperation オブジェクトの uploadStream メソッドと downloadStream メソッドを使用します。

ステージ上のファイルにデータをアップロードする際の入力ストリームの使用

java.io.InputStream オブジェクトからステージ上のファイルにデータをアップロードするには、

  1. ファイルをステージにアップロードするための権限 があることを確認します。

  2. Session.file を使用して、セッションの FileOperation オブジェクトにアクセスします。

  3. FileOperation.uploadStream メソッドを呼び出します。

    データを書き込むステージ上のファイルと InputStream オブジェクトへの完全なパスを渡します。さらに、 compress 引数を使用して、データをアップロードする前にデータを圧縮するかどうかを指定します。

例:

import java.io.InputStream
...
val compressData = true
val pathToFileOnStage = "@myStage/path/file"
session.file.uploadStream(pathToFileOnStage, new ByteArrayInputStream(fileContent.getBytes()), compressData)
Copy

ステージ上のファイルからデータをダウンロードする際の入力ストリームの使用

ステージ上のファイルから java.io.InputStream オブジェクトにデータをダウンロードするには、

  1. ファイルをステージからダウンロードするための権限 があることを確認します。

  2. Session.file を使用して、セッションの FileOperation オブジェクトにアクセスします。

  3. FileOperation.downloadStream メソッドを呼び出します。

    ダウンロードするデータを含むステージ上のファイルへの完全なパスを渡します。 decompress 引数を使用して、ファイル内のデータを圧縮するかどうかを指定します。

例:

import java.io.InputStream
...
val isDataCompressed = true
val pathToFileOnStage = "@myStage/path/file"
val is = session.file.downloadStream(pathToFileOnStage, isDataCompressed)
Copy

ステージ内におけるファイルの DataFrame の設定

このセクションでは、Snowflakeステージでファイルの DataFrame を設定する方法について説明します。この DataFrame を作成すると、 DataFrame を使用して次のことができます。

Snowflakeステージのファイルに DataFrame を設定するには、 DataFrameReader クラスを使用します。

  1. 次の権限があることを確認します。

  2. Session クラスの read メソッドを呼び出して、 DataFrameReader オブジェクトにアクセスします。

  3. ファイルが CSV 形式の場合は、ファイルのフィールドを記述します。そのためには、

    1. ファイル内のフィールドを記述する一連の StructField オブジェクトで構成される StructType オブジェクトを作成します。

    2. StructField オブジェクトごとに、以下を指定します。

      • フィールドの名前。

      • フィールドのデータ型(com.snowflake.snowpark.types パッケージでオブジェクトとして指定)。

      • フィールドがNULL可能かどうか。

      例:

      import com.snowflake.snowpark.types._
      
      val schemaForDataFile = StructType(
          Seq(
              StructField("id", StringType, true),
              StructField("name", StringType, true)))
      
      Copy
    3. DataFrameReader オブジェクトで schema メソッドを呼び出し、 StructType オブジェクトを渡します。

      例:

      var dfReader = session.read.schema(schemaForDataFile)
      
      Copy

      schema メソッドは、指定されたフィールドを含むファイルを読み取るように構成された DataFrameReader オブジェクトを返します。

      他の形式(JSON など)のファイルでは、これを実行する必要はありません。これらのファイルの場合、 DataFrameReader は、データをフィールド名 $1 の VARIANT 型の単一フィールドとして扱います。

  4. データの読み取り方法に関する追加情報を指定する必要がある(例: データが圧縮されているか、 CSV ファイルがフィールドを区切るためにコンマではなくセミコロンを使用している)場合は、 DataFrameReader.option メソッドまたは DataFrameReader.options メソッドを呼び出します。

    設定するオプションの名前と値を渡します。次のタイプのオプションを設定できます。

    次の例では、圧縮されておらず、フィールド区切り文字にセミコロンを使用する CSV ファイル内のデータをクエリするように、 DataFrameReader オブジェクトを設定します。

    dfReader = dfReader.option("field_delimiter", ";").option("COMPRESSION", "NONE")
    
    Copy

    option メソッドは、指定されたオプションで構成された DataFrameReader オブジェクトを返します。

    複数のオプションを設定するには、 option メソッドに 呼び出しをチェーン する(上記の例に示すように)か、 DataFrameReader.options メソッドを呼び出し、オプションの名前と値の Map で渡します。

  5. ファイルの形式に対応したメソッドを呼び出します。以下のいずれかのメソッドを呼び出すことができます。

    これらのメソッドを呼び出すときは、読み取るファイルのステージ位置を渡します。たとえば、

    val df = dfReader.csv("@s3_ts_stage/emails/data_0_0_0.csv")
    
    Copy

    同じプレフィックスで始まる複数のファイルを指定するには、ステージ名の後にプレフィックスを指定します。たとえば、ステージ @mystage からプレフィックス csv_ を持つファイルをロードするには、

    val df = dfReader.csv("@mystage/csv_")
    
    Copy

    ファイルの形式に対応するメソッドは、そのファイルの CopyableDataFrame オブジェクトを返します。 CopyableDataFrameDataFrame を拡張し、ステージングされたファイルにあるデータを処理するための追加のメソッドを提供します。

  6. アクションメソッドを呼び出して、次を実行します。

    テーブルの DataFrames の場合と同様、データは、 アクションメソッド を呼び出すまで DataFrame に取得されません。

ファイルから DataFrame へのデータのロード

ステージのファイルに DataFrame を設定 した後、ファイルから DataFrame にデータをロードできます。

  1. DataFrame オブジェクトメソッドを使用して、データセットで必要な 変換を実行 します(例: 特定のフィールドの選択、行のフィルタリングなど)。

    たとえば、 mystage という名前のステージにある data.json という名前の JSON ファイルから color 要素を抽出するには、

    val df = session.read.json("@mystage/data.json").select(col("$1")("color"))
    
    Copy

    前に説明したように、 CSV 以外の形式のファイル(例: JSON)の場合、 DataFrameReader は、ファイル内のデータを $1 という名前の単一の VARIANT 列として扱います。

  2. DataFrame.collect メソッドを呼び出してデータをロードします。たとえば、

    val results = df.collect()
    
    Copy

ファイルからテーブルへのデータのコピー

ステージのファイルに DataFrame を設定 した後、 CopyableDataFrame.copyInto メソッドを呼び出してデータをテーブルにコピーできます。このメソッドは、 COPY INTO <テーブル> コマンドを実行します。

注釈

copyInto を呼び出す前に collect メソッドを呼び出す必要はありません。 copyInto を呼び出す前に、ファイルのデータが DataFrame にある必要はありません。

たとえば、次のコードは、 myFileStage で指定された CSV ファイルからテーブル mytable にデータをロードします。データは CSV ファイルにあるため、コードは、合わせて ファイルのフィールドを記述する 必要があります。この例では、 DataFrameReader.schema メソッドを呼び出し、フィールドを説明する一連の StructField オブジェクトを含む StructType オブジェクト(csvFileSchema)を渡しています。

val df = session.read.schema(csvFileSchema).csv(myFileStage)
df.copyInto("mytable")
Copy

ステージにあるファイルへの DataFrame の保存

注釈

この機能は、Snowpark 1.5.0で導入されました。

ステージ上のファイルに DataFrame を保存する必要がある場合は、ファイルの形式に対応する DataFrameWriter メソッド(例: CSV ファイルに書き込む csv メソッド)を呼び出して、ファイルを保存する必要のあるステージ位置に渡します。これらの DataFrameWriter メソッドは、 COPY INTO <場所> コマンドを実行します。

注釈

これらの DataFrameWriter メソッドを呼び出す前に collect メソッドを呼び出す必要はありません。これらのメソッドを呼び出す前に、ファイルのデータが DataFrame にある必要はありません。

DataFrame の内容をステージ上のファイルに保存するには、

  1. DataFrame.write メソッドを呼び出し、 DataFrameWriter オブジェクトを取得します。たとえば、 sample_product_data という名前のテーブルを表す DataFrame の DataFrameWriter オブジェクトを取得するには、次のようにします。

    dfWriter = session.table("sample_product_data").write
    
    Copy
  2. ファイルの内容を上書きする場合(ファイルが存在する場合)は、 DataFrameWriter.mode メソッドを呼び出し、 SaveMode.Overwrite を渡します。

    それ以外でステージ上の指定されたファイルがすでに存在する場合、デフォルトで、 DataFrameWriter はエラーを報告します。

    mode メソッドは、指定されたモードで構成された同じ DataFrameWriter オブジェクトを返します。

    たとえば、 DataFrameWriter がステージ上のファイルを上書きするように指定するには、次のようにします。

    dfWriter = dfWriter.mode(SaveMode.Overwrite)
    
    Copy
  3. データの保存方法に関する追加情報を指定する必要がある場合(たとえば、データを圧縮する必要がある場合や、セミコロンを使用してCSVファイルのフィールドを区切る場合)は、 DataFrameWriter.option メソッドまたは DataFrameWriter.options メソッドを呼び出します。

    設定するオプションの名前と値を渡します。次のタイプのオプションを設定できます。

    次のオプションの設定には、 option メソッドは使用できないことに注意してください。

    • TYPE 形式タイプオプション。

    • OVERWRITE コピーオプション。このオプションを設定するには、代わりに mode メソッドを呼び出します(前のステップで説明のとおり)。

    次の例では、フィールド区切り文字としてセミコロン(コンマではなく)を使用して、データを非圧縮形式で CSV ファイルに保存するように DataFrameWriter オブジェクトを設定します。

    dfWriter = dfWriter.option("field_delimiter", ";").option("COMPRESSION", "NONE")
    
    Copy

    option メソッドは、指定されたオプションで構成された DataFrameWriter オブジェクトを返します。

    複数のオプションを設定するには、 option メソッドに 呼び出しをチェーン する(上記の例に示すように)か、 DataFrameWriter.options メソッドを呼び出し、オプションの名前と値の Map で渡します。

  4. 保存された各ファイルの詳細を返すには、 DETAILED_OUTPUT コピーオプションTRUE に設定します。

    デフォルトでは、 DETAILED_OUTPUTFALSE です。これは、メソッドがフィールド "rows_unloaded""input_bytes"、および "output_bytes" を含む単一行の出力を返すことを意味します。

    DETAILED_OUTPUTTRUE に設定すると、メソッドは保存されたファイルごとに出力の行を返します。各行には、フィールド FILE_NAMEFILE_SIZE、および ROW_COUNT が含まれています。

  5. ファイルの形式に対応するメソッドを呼び出して、データをファイルに保存します。次のいずれかのメソッドを呼び出すことができます。

    これらのメソッドを呼び出すときは、データを書き込む必要のあるファイルのステージ位置を渡します(例: @mystage)。

    ファイルに別のプレフィックスを付けて名前を付ける場合は、ステージ名の後にプレフィックスを指定します。デフォルトでは、このメソッドはプレフィックスが data_ のファイル名にデータを保存します(例: @mystage/data_0_0_0.csv)。たとえば、

    val writeFileResult = dfWriter.csv("@mystage/saved_data")
    
    Copy

    この例では、 DataFrame の内容をプレフィックス saved_data で始まるファイル(例: @mystage/saved_data_0_0_0.csv)に保存します。

  6. ファイルに書き込まれるデータ量に関する情報については、返された WriteFileResult オブジェクトを確認します。

    WriteFileResult オブジェクトから、 COPY INTO <場所> コマンドによって生成された出力にアクセスできます。

    • Row オブジェクトの配列として出力の行にアクセスするには、 rows 値のメンバーを使用します。

    • 行に存在するフィールドを判別するには、行のフィールドを記述する StructType である schema 値のメンバーを使用します。

    たとえば、出力行のフィールドと値の名前を出力するには、次のようにします。

    val writeFileResult = dfWriter.csv("@mystage/saved_data")
    for ((row, index) <- writeFileResult.rows.zipWithIndex) {
      (writeFileResult.schema.fields, writeFileResult.rows(index).toSeq).zipped.foreach {
        (structField, element) => println(s"${structField.name}: $element")
      }
    }
    
    Copy

次の例では、 DataFrame を使用して、 car_sales という名前のテーブルの内容をステージ @mystage でプレフィックス saved_data が付いた JSON ファイルに保存します(例: @mystage/saved_data_0_0_0.json)。たとえば、

  • ファイルがステージ上にすでに存在する場合は、ファイルを上書きします。

  • 保存操作に関する詳細な出力を返します。

  • データを非圧縮で保存します。

最後に、サンプルコードは、返された出力行の各フィールドと値を出力します。

val df = session.table("car_sales")
val writeFileResult = df.write.mode(SaveMode.Overwrite).option("DETAILED_OUTPUT", "TRUE").option("compression", "none").json("@mystage/saved_data")
for ((row, index) <- writeFileResult.rows.zipWithIndex) {
  println(s"Row: $index")
  (writeFileResult.schema.fields, writeFileResult.rows(index).toSeq).zipped.foreach {
    (structField, element) => println(s"${structField.name}: $element")
  }
}
Copy

半構造化データの操作

DataFrame を使用すると、 半構造化データ (例: JSON データ)へのクエリとアクセスができます。次のセクションでは、 DataFrame 内の半構造化データの操作方法について説明します。

注釈

これらのセクションの例では、 例で使用されるサンプルデータ のサンプルデータを使用しています。

半構造化データの走査

半構造化データの特定のフィールドまたは要素を参照するには、 Column オブジェクトの次のメソッドを使用します。

注釈

パス内のフィールド名または要素が不規則であり、 Column.apply メソッドの使用が困難な場合は、 getget_ignore_case、または get_path を代替として使用できます。

apply メソッドを使用した列の参照 で説明したように、メソッド名 apply は省略できます。

col("column_name")("field_name")
col("column_name")(index)
Copy

たとえば、次のコードは、 サンプルデータsrc 列にあるオブジェクトの dealership フィールドを選択します。

val df = session.table("car_sales")
df.select(col("src")("dealership")).show()
Copy

このコードは、次を出力します。

----------------------------
|"""SRC""['DEALERSHIP']"   |
----------------------------
|"Valley View Auto Sales"  |
|"Tindel Toyota"           |
----------------------------
Copy

注釈

DataFrame の値は文字列リテラルとして返されるため、二重引用符で囲まれています。これらの値を特定の型にキャストするには、 半構造化データへの明示的な値のキャスト をご参照ください。

メソッド呼び出しのチェーン を使用して、特定のフィールドまたは要素へのパスを走査することもできます。

たとえば、次のコードは salesperson オブジェクトの name フィールドを選択します。

val df = session.table("car_sales")
df.select(col("src")("salesperson")("name")).show()
Copy

このコードは、次を出力します。

------------------------------------
|"""SRC""['SALESPERSON']['NAME']"  |
------------------------------------
|"Frank Beasley"                   |
|"Greg Northrup"                   |
------------------------------------
Copy

別の例として、次のコードは、車両の配列を保持する vehicle フィールドの最初の要素を選択します。この例では、最初の要素から price フィールドも選択しています。

val df = session.table("car_sales")
df.select(col("src")("vehicle")(0)).show()
df.select(col("src")("vehicle")(0)("price")).show()
Copy

このコードは、次を出力します。

---------------------------
|"""SRC""['VEHICLE'][0]"  |
---------------------------
|{                        |
|  "extras": [            |
|    "ext warranty",      |
|    "paint protection"   |
|  ],                     |
|  "make": "Honda",       |
|  "model": "Civic",      |
|  "price": "20275",      |
|  "year": "2017"         |
|}                        |
|{                        |
|  "extras": [            |
|    "ext warranty",      |
|    "rust proofing",     |
|    "fabric protection"  |
|  ],                     |
|  "make": "Toyota",      |
|  "model": "Camry",      |
|  "price": "23500",      |
|  "year": "2017"         |
|}                        |
---------------------------

------------------------------------
|"""SRC""['VEHICLE'][0]['PRICE']"  |
------------------------------------
|"20275"                           |
|"23500"                           |
------------------------------------
Copy

パス内のフィールド名または要素が不規則なために Column.apply メソッドの使用が困難な場合は、 apply メソッドの代わりに、 getget_ignore_case、または get_path 関数を使用できます。

たとえば、次のコード行は両方とも、オブジェクトの指定されたフィールドの値を出力します。

df.select(get(col("src"), lit("dealership"))).show()
df.select(col("src")("dealership")).show()
Copy

同様に、次のコード行は両方とも、オブジェクトの指定されたパスにあるフィールドの値を出力します。

df.select(get_path(col("src"), lit("vehicle[0].make"))).show()
df.select(col("src")("vehicle")(0)("make")).show()
Copy

半構造化データへの明示的な値のキャスト

デフォルトでは、上記の例に示すように、フィールドと要素の値は文字列リテラル(二重引用符を含む)として返されます。

予期しない結果を回避するには、 キャスト メソッドを呼び出して、値を特定の型にキャストします。たとえば、次のコードは、キャストなしとキャストありの値を出力します。

// Import the objects for the data types, including StringType.
import com.snowflake.snowpark.types._
...
val df = session.table("car_sales")
df.select(col("src")("salesperson")("id")).show()
df.select(col("src")("salesperson")("id").cast(StringType)).show()
Copy

このコードは、次を出力します。

----------------------------------
|"""SRC""['SALESPERSON']['ID']"  |
----------------------------------
|"55"                            |
|"274"                           |
----------------------------------

---------------------------------------------------
|"CAST (""SRC""['SALESPERSON']['ID'] AS STRING)"  |
---------------------------------------------------
|55                                               |
|274                                              |
---------------------------------------------------
Copy

オブジェクト配列の行へのフラット化

半構造化データを DataFrame に「フラット化」する必要がある場合(例: 配列内にあるすべてのオブジェクトの行を生成する場合)は、 DataFrame.flatten メソッドを呼び出します。このメソッドは、 FLATTEN SQL 関数と同等です。オブジェクトまたは配列へのパスを渡すと、メソッドは、各フィールドの行か、オブジェクトまたは配列の要素の行を含む DataFrame を返します。

たとえば、 サンプルデータ では、 src:customer は顧客に関する情報を含むオブジェクトの配列です。各オブジェクトには、 name および address フィールドが含まれています。

このパスを flatten 関数に渡すと、次のようになります。

val df = session.table("car_sales")
df.flatten(col("src")("customer")).show()
Copy

メソッドは DataFrame を返します。

----------------------------------------------------------------------------------------------------------------------------------------------------------
|"SRC"                                      |"SEQ"  |"KEY"  |"PATH"  |"INDEX"  |"VALUE"                            |"THIS"                               |
----------------------------------------------------------------------------------------------------------------------------------------------------------
|{                                          |1      |NULL   |[0]     |0        |{                                  |[                                    |
|  "customer": [                            |       |       |        |         |  "address": "San Francisco, CA",  |  {                                  |
|    {                                      |       |       |        |         |  "name": "Joyce Ridgely",         |    "address": "San Francisco, CA",  |
|      "address": "San Francisco, CA",      |       |       |        |         |  "phone": "16504378889"           |    "name": "Joyce Ridgely",         |
|      "name": "Joyce Ridgely",             |       |       |        |         |}                                  |    "phone": "16504378889"           |
|      "phone": "16504378889"               |       |       |        |         |                                   |  }                                  |
|    }                                      |       |       |        |         |                                   |]                                    |
|  ],                                       |       |       |        |         |                                   |                                     |
|  "date": "2017-04-28",                    |       |       |        |         |                                   |                                     |
|  "dealership": "Valley View Auto Sales",  |       |       |        |         |                                   |                                     |
|  "salesperson": {                         |       |       |        |         |                                   |                                     |
|    "id": "55",                            |       |       |        |         |                                   |                                     |
|    "name": "Frank Beasley"                |       |       |        |         |                                   |                                     |
|  },                                       |       |       |        |         |                                   |                                     |
|  "vehicle": [                             |       |       |        |         |                                   |                                     |
|    {                                      |       |       |        |         |                                   |                                     |
|      "extras": [                          |       |       |        |         |                                   |                                     |
|        "ext warranty",                    |       |       |        |         |                                   |                                     |
|        "paint protection"                 |       |       |        |         |                                   |                                     |
|      ],                                   |       |       |        |         |                                   |                                     |
|      "make": "Honda",                     |       |       |        |         |                                   |                                     |
|      "model": "Civic",                    |       |       |        |         |                                   |                                     |
|      "price": "20275",                    |       |       |        |         |                                   |                                     |
|      "year": "2017"                       |       |       |        |         |                                   |                                     |
|    }                                      |       |       |        |         |                                   |                                     |
|  ]                                        |       |       |        |         |                                   |                                     |
|}                                          |       |       |        |         |                                   |                                     |
|{                                          |2      |NULL   |[0]     |0        |{                                  |[                                    |
|  "customer": [                            |       |       |        |         |  "address": "New York, NY",       |  {                                  |
|    {                                      |       |       |        |         |  "name": "Bradley Greenbloom",    |    "address": "New York, NY",       |
|      "address": "New York, NY",           |       |       |        |         |  "phone": "12127593751"           |    "name": "Bradley Greenbloom",    |
|      "name": "Bradley Greenbloom",        |       |       |        |         |}                                  |    "phone": "12127593751"           |
|      "phone": "12127593751"               |       |       |        |         |                                   |  }                                  |
|    }                                      |       |       |        |         |                                   |]                                    |
|  ],                                       |       |       |        |         |                                   |                                     |
|  "date": "2017-04-28",                    |       |       |        |         |                                   |                                     |
|  "dealership": "Tindel Toyota",           |       |       |        |         |                                   |                                     |
|  "salesperson": {                         |       |       |        |         |                                   |                                     |
|    "id": "274",                           |       |       |        |         |                                   |                                     |
|    "name": "Greg Northrup"                |       |       |        |         |                                   |                                     |
|  },                                       |       |       |        |         |                                   |                                     |
|  "vehicle": [                             |       |       |        |         |                                   |                                     |
|    {                                      |       |       |        |         |                                   |                                     |
|      "extras": [                          |       |       |        |         |                                   |                                     |
|        "ext warranty",                    |       |       |        |         |                                   |                                     |
|        "rust proofing",                   |       |       |        |         |                                   |                                     |
|        "fabric protection"                |       |       |        |         |                                   |                                     |
|      ],                                   |       |       |        |         |                                   |                                     |
|      "make": "Toyota",                    |       |       |        |         |                                   |                                     |
|      "model": "Camry",                    |       |       |        |         |                                   |                                     |
|      "price": "23500",                    |       |       |        |         |                                   |                                     |
|      "year": "2017"                       |       |       |        |         |                                   |                                     |
|    }                                      |       |       |        |         |                                   |                                     |
|  ]                                        |       |       |        |         |                                   |                                     |
|}                                          |       |       |        |         |                                   |                                     |
----------------------------------------------------------------------------------------------------------------------------------------------------------
Copy

この DataFrame から、 VALUE フィールドにある各オブジェクトからの name フィールドと address フィールドを選択できます。

df.flatten(col("src")("customer")).select(col("value")("name"), col("value")("address")).show()
Copy
-------------------------------------------------
|"""VALUE""['NAME']"   |"""VALUE""['ADDRESS']"  |
-------------------------------------------------
|"Joyce Ridgely"       |"San Francisco, CA"     |
|"Bradley Greenbloom"  |"New York, NY"          |
-------------------------------------------------
Copy

次のコードは、 特定の型に値をキャスト し、列の名前を変更することにより、前の例に追加します。

df.flatten(col("src")("customer")).select(col("value")("name").cast(StringType).as("Customer Name"), col("value")("address").cast(StringType).as("Customer Address")).show()
Copy
-------------------------------------------
|"Customer Name"     |"Customer Address"  |
-------------------------------------------
|Joyce Ridgely       |San Francisco, CA   |
|Bradley Greenbloom  |New York, NY        |
-------------------------------------------
Copy

SQL ステートメントの実行

指定した SQL ステートメントを実行するには、 Session クラスの sql メソッドを呼び出し、実行するステートメントを渡します。メソッドは DataFrame を返します。

アクションメソッドの呼び出し があるまで、 SQL ステートメントは実行されないことに注意してください。

// Get the list of the files in a stage.
// The collect() method causes this SQL statement to be executed.
val dfStageFiles = session.sql("ls @myStage")
val files = dfStageFiles.collect()
files.foreach(println)

// Resume the operation of a warehouse.
// Note that you must call the collect method in order to execute
// the SQL statement.
session.sql("alter warehouse if exists myWarehouse resume if suspended").collect()

val tableDf = session.table("table").select(col("a"), col("b"))
// Get the count of rows from the table.
val numRows = tableDf.count()
println("Count: " + numRows);
Copy

メソッドを呼び出して DataFrame を変換 する場合(例: フィルター、選択など)、これらのメソッドは、基になる SQL ステートメントが SELECT ステートメントである場合にのみ機能することに注意してください。変換メソッドは、他の種類の SQL ステートメントではサポートされていません。

val df = session.sql("select id, category_id, name from sample_product_data where id > 10")
// Because the underlying SQL statement for the DataFrame is a SELECT statement,
// you can call the filter method to transform this DataFrame.
val results = df.filter(col("category_id") < 10).select(col("id")).collect()
results.foreach(println)

// In this example, the underlying SQL statement is not a SELECT statement.
val dfStageFiles = session.sql("ls @myStage")
// Calling the filter method results in an error.
dfStageFiles.filter(...)
Copy