Snowparkでの DataFrames の操作

Snowpark内でデータをクエリして処理する主な方法は、 DataFrame を使用することです。このトピックでは、 DataFrames の操作方法について説明します。

このトピックの内容:

データを取得して操作するには、 DataFrame クラスを使用します。DataFrame は、遅延評価されるリレーショナルデータセットを表します。これは、特定のアクションがトリガーされたときにのみ実行されます。ある意味で、 DataFrame は、データを取得するために評価する必要があるクエリのようなものです。

データを DataFrame に取得するには、

  1. DataFrame を作成し、データセットのためにデータのソースを指定 します。

    たとえば、テーブル、外部 CSV ファイル、または SQL ステートメントの実行からのデータを保持する DataFrame を作成できます。

  2. DataFrame のデータセットを変換する方法を指定 します。

    たとえば、どの列を選択するか、行をどのようにフィルタリングするか、結果をどのようにソートおよびグループ化するかなどを指定できます。

  3. ステートメントを実行して、データをDataFrame に取得 します。

    データを DataFrame に取得するには、アクションを実行するメソッド(例: collect() メソッド)を呼び出す必要があります。

次のセクションでは、これらのステップについて詳しく説明します。

DataFrame の構築

DataFrame を構築するには、 Session クラスのメソッドを使用できます。次の各メソッドは、異なるタイプのデータソースから DataFrame を作成します。

  • テーブル、ビュー、またはストリームのデータから DataFrame を作成するには、 table メソッドを呼び出します。

    // Create a DataFrame from the data in the "products" table.
    val dfTable = session.table("products")
    

    注釈

    session.table メソッドは Updatable オブジェクトを返します。 UpdatableDataFrame を拡張し、テーブル内のデータを操作するための追加のメソッド(例: データを更新および削除するためのメソッド)を提供します。 テーブル内の行の更新、削除、およびマージ をご参照ください。

  • 一連の値から DataFrame を作成するには、 createDataFrame メソッドを呼び出します。

    // Create a DataFrame containing a sequence of values.
    // In the DataFrame, name the columns "i" and "s".
    val dfSeq = session.createDataFrame(Seq((1, "one"), (2, "two"))).toDF("i", "s")
    
  • 値の範囲を含む DataFrame を作成するには、 range メソッドを呼び出します。

    // Create a DataFrame from a range
    val dfRange = session.range(1, 10, 2)
    
  • ステージ内のファイルからのデータを保持する DataFrame を作成するには、 read を呼び出して DataFrameReader オブジェクトを取得します。 DataFrameReader オブジェクトで、ファイル内のデータの形式に対応するメソッドを呼び出します。

    // Create a DataFrame from data in a stage.
    val dfJson = session.read.json("@mystage2/data1.json")
    
  • SQL クエリの結果を保持する DataFrame を作成するには、 sql メソッドを呼び出します。

    // Create a DataFrame from a SQL query
    val dfSql = session.sql("SELECT name from products")
    

    注: このメソッドを使用すると、テーブルおよびステージングされたファイルからデータを取得する SELECT ステートメントを実行できますが、そうではなく、 table および read メソッドを使用するようにします。 tableread のようなメソッドは、開発ツールでより優れた構文の強調表示、エラーの強調表示、およびインテリジェントなコード補完を提供できます。

データセットの変換方法の指定

選択する列と、結果のフィルタリング、並べ替え、グループ化などを指定するには、データセットを変換する DataFrame メソッドを呼び出します。これらのメソッドで列を識別するには、 col 関数または列に評価される式を使用します。(列と式の指定 を参照。)

例:

  • 返される行を指定するには、 filter メソッドを呼び出します。

    // Create a DataFrame for the rows with the ID 1
    // in the "products" table.
    //
    // This example uses the === operator of the Column object to perform an
    // equality check.
    val dfProductIdOne = dfProductInfo.filter(col("id") === 1)
    
  • 選択する列を指定するには、 select メソッドを呼び出します。

    // Import the col function from the functions object.
    import com.snowflake.snowpark.functions._
    
    // Create a DataFrame object for the "products" table.
    val dfProductInfo = session.table("products")
    
    // Create a DataFrame that contains the id, name, and serial_number
    // columns.
    val dfProductSerialNo =
        dfProductInfo.select(col("id"), col("name"), col("serial_number"))
    

各メソッドは、変換された新しい DataFrame オブジェクトを返します。(このメソッドは元の DataFrame オブジェクトには影響を及ぼしません。)つまり、複数の変換を適用する場合は、 チェーンメソッド呼び出し を実行して、前のメソッド呼び出しによって返された、新しい DataFrame オブジェクトに対する後続の各変換メソッドを呼び出すことができます。

これらの変換方法は、Snowflakeデータベースからデータを取得しないことに注意してください。(DataFrame を評価するアクションの実行 で説明されているアクションメソッドは、データ取得を実行します。)変換メソッドは、 SQL ステートメントの作成方法を指定するだけです。

DataFrames の結合

DataFrame オブジェクトを結合するには、 join メソッドを呼び出します。

// Create a DataFrame that joins two other DataFrames
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("key") === dfRhs.col("key"))

この例では、 DataFrame.col メソッドを使用して、結合で使用する列を指定しています。この方法の詳細については、 列と式の指定 をご参照ください。

異なる列でテーブルをそれ自体と結合する必要がある場合は、単一の DataFrame で自己結合を実行することはできません。単一の DataFrame を使用して自己結合を実行する次の例は、 "id" の列式が結合の左側と右側に存在するため、失敗します。

val dfJoined = df.join(df, col("id") === col("parent_id"))

val dfJoined = df.join(df, df("id") === df("parent_id"))

これらの例は両方とも、次の例外を除いて失敗します。

Exception in thread "main" com.snowflake.snowpark.SnowparkClientException:
  Joining a DataFrame to itself can lead to incorrect results due to ambiguity of column references.
  Instead, join this DataFrame to a clone() of itself.

代わりに、 DataFrame.clone() メソッドを使用して DataFrame オブジェクトのクローンを作成し、2つの DataFrame オブジェクトを使用して結合を実行します。

// Create a DataFrame object for the "products" table for the left-hand side of the join.
val dfLhs = session.table("products")
// Clone the DataFrame object to use as the right-hand side of the join.
val dfRhs = dfLhs.clone()

// Create a DataFrame that joins the two DataFrames
// for the "products" table on the "key" column.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id") === dfRhs.col("parent_id"))

同じ列で自己結合を実行する場合は、 USING 句の列式の Seq を渡す join メソッドを呼び出します。

// Create a DataFrame that performs a self-join on
// the DataFrame for the "products" table using the "key" column.
val dfJoined = df.join(df, Seq("key"))

列と式の指定

これらの変換メソッドを呼び出すときは、列または列を使用する式を指定する必要がある場合があります。たとえば、 select メソッドを呼び出すときは、選択する必要のある列を指定する必要があります。

列を参照するには、 com.snowflake.snowpark.functions オブジェクトで col 関数を呼び出して、 Column オブジェクトを作成します。

// Import the col function from the functions object.
import com.snowflake.snowpark.functions._

val dfProductInfo = session.table("products").select(col("id"), col("name"))

注釈

リテラルの Column オブジェクトを作成するには、 列オブジェクトとしてのリテラルの使用 をご参照ください。

フィルター、射影、結合条件などを指定する場合は、式で Column オブジェクトを使用できます。次の例では、式で Column オブジェクトを使用して次を実行します。

  • id 列の値が 20 であり、 a 列と b 列の値の合計が 10 未満である行を取得します。

  • c という名前の列に b10 を掛けた値を返します。 c は、 DataFrame を結合する次のステートメントで使用される列エイリアスです。

  • DataFrame df を計算された DataFrame dfCompute と結合します。

val dfCompute = session.table("T").filter(col("id") === 20).filter((col("a") + col("b")) < 10).select((col("b") * 10) as "c")
val df2 = df.join(dfCompute, col("a") === col("c") && col("a") === col("d"))

同じ名前の2つの異なる DataFrame オブジェクトの列を参照する場合(例: その列の DataFrames を結合する場合)、1つの DataFrame オブジェクトにある DataFrame.col メソッドを使用して、そのオブジェクトにある列(例: df1.col("name")df2.col("name"))を参照できます。

次の例は、 DataFrame.col メソッドを使用して特定の DataFrame の列を参照する方法を示しています。この例では、両方とも key という名前の列を持つ2つの DataFrame オブジェクトを結合します。この例では、 Column.as メソッドを使用して、新しく作成された DataFrame の列の名前を変更します。

// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.col method to refer to the columns used in the join.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("key") === dfRhs.col("key")).select(dfLhs.col("value").as("L"), dfRhs.col("value").as("R"))

DataFrame.col メソッドの代わりに、 DataFrame.apply メソッドを使用して特定の DataFrame の列を参照できます。 DataFrame.col メソッドと同様に、 DataFrame.apply メソッドは入力として列名を受け入れ、 Column オブジェクトを返します。

オブジェクトでScalaに apply メソッドがある場合は、関数であるかのようにオブジェクトを呼び出すことで apply メソッドを呼び出すことができます。たとえば、 df.apply("column_name") を呼び出すには、単に df("column_name") と記述できます。次の呼び出しは同等です。

  • df.col("<列名>")

  • df.apply("<列名>")

  • df("<列名>")

次の例は前の例と同じですが、結合操作で列を参照するために DataFrame.apply メソッドを使用します。

// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.apply method to refer to the columns used in the join.
// Note that dfLhs("key") is shorthand for dfLhs.apply("key").
val dfJoined = dfLhs.join(dfRhs, dfLhs("key") === dfRhs("key")).select(dfLhs("value").as("L"), dfRhs("value").as("R"))

列オブジェクトの短縮形の使用

col 関数を使用する代わりに、次のいずれかの方法で列を参照できます。

  • 引用符で囲まれた列名($"column_name")の前にドル記号を使用する。

  • 引用符で囲まれていない列名('column_name)の前にアポストロフィ(一重引用符)を使用する。

これを実行するには、 Session オブジェクトを作成した後、 implicits オブジェクトから名前をインポートします。

val session = Session.builder.configFile("/path/to/properties").create

// Import this after you create the session.
import session.implicits._

// Use the $ (dollar sign) shorthand.
val df = session.table("T").filter($"id" === 10).filter(($"a" + $"b") < 10)

// Use ' (apostrophe) shorthand.
val df = session.table("T").filter('id === 10).filter(('a + 'b) < 10).select('b * 10)

オブジェクト識別子(テーブル名、列名など)の前後での二重引用符の使用

指定するデータベース、スキーマ、テーブル、およびステージの名前は、 Snowflake識別子の要件 に準拠している必要があります。名前を指定すると、Snowflakeはその名前を大文字と見なします。たとえば、次の呼び出しは同等です。

// The following calls are equivalent:
df.select(col("id123"))
df.select(col("ID123"))

名前が識別子の要件に準拠していない場合は、名前を二重引用符(")で囲む必要があります。バックスラッシュ(\)を使用して、Scala文字列リテラル内の二重引用符をエスケープします。たとえば、次のテーブル名は文字やアンダースコアで始まらないため、名前を二重引用符で囲む必要があります。

val df = session.table("\"10tablename\"")

の名前を指定するときは、名前を二重引用符で囲む必要はありません。名前が識別子の要件に準拠していない場合、Snowparkライブラリは、列名を自動的に二重引用符で囲みます。

// The following calls are equivalent:
df.select(col("3rdID"))
df.select(col("\"3rdID\"))

// The following calls are equivalent:
df.select(col("id with space"))
df.select(col("\"id with space\""))

すでに列名を二重引用符で囲んでいる場合、ライブラリは名前を二重引用符で囲みません。

場合によっては、列名に二重引用符が含まれることがあります。

describe table quoted;
+------------------------+ ...
| name                   | ...
|------------------------+ ...
| name_with_"air"_quotes | ...
| "column_name_quoted"   | ...
+------------------------+ ...

識別子の要件 で説明されているように、二重引用符で囲まれた識別子内の二重引用符文字ごとに、2つの二重引用符文字(例: "name_with_""air""_quotes""""column_name_quoted""")を使用する必要があります。

val dfTable = session.table("quoted")
dfTable.select("\"name_with_\"\"air\"\"_quotes\"").show()
dfTable.select("\"\"\"column_name_quoted\"\"\"").show()

識別子が二重引用符で囲まれている場合は(明示的に引用符を追加したか、ライブラリが引用符を追加したかに関係なく)、 Snowflakeは識別子を大文字と小文字を区別する ものとして扱います。

// The following calls are NOT equivalent!
// The Snowpark library adds double quotes around the column name,
// which makes Snowflake treat the column name as case-sensitive.
df.select(col("id with space"))
df.select(col("ID WITH SPACE"))

列オブジェクトとしてのリテラルの使用

Column オブジェクト内で渡すメソッドでリテラルを使用するには、リテラルを com.snowflake.snowpark.functions オブジェクトの lit 関数に渡して、リテラルの Column オブジェクトを作成します。例:

// Import for the lit and col functions.
import com.snowflake.snowpark.functions._

// Show the first 10 rows in which num_items is greater than 5.
// Use `lit(5)` to create a Column object for the literal 5.
df.filter(col("num_items").gt(lit(5))).show()

リテラルがScalaの浮動小数点またはdouble値である場合(例: 0.05デフォルトでDoubleとして扱う)、Snowparkライブラリは SQL を生成し、対応するSnowparkデータ型に値を暗黙的にキャストします(例: 0.05::DOUBLE)。これにより、指定された正確な数とは異なる概算値が生成される可能性があります。

たとえば、次のコードは、フィルター(0.05 以上の値に一致する)が DataFrame の行に一致する必要がある場合でも、一致する行を表示しません。

// Create a DataFrame that contains the value 0.05.
val df = session.sql("select 0.05 :: Numeric(5, 2) as a")

// Applying this filter results in no matching rows in the DataFrame.
df.filter(col("a") <= lit(0.06) - lit(0.01)).show()

問題は、 lit(0.06)lit(0.01) が、正確な値ではなく、 0.060.01 の概算値を生成することです。

この問題を回避するには、次のいずれかの方法を使用できます。

  • オプション1: 使用する Snowparkの型にリテラルをキャスト する。たとえば、精度が5でスケールが2の NUMBER を使用するには、次のようにします。

    df.filter(col("a") <= lit(0.06).cast(new DecimalType(5, 2)) - lit(0.01).cast(new DecimalType(5, 2))).show()
    
  • オプション2: 値を lit 関数に渡す前に、使用する型に値をキャストする。たとえば、 BigDecimal 型 を使用する場合は、

    df.filter(col("a") <= lit(BigDecimal(0.06)) - lit(BigDecimal(0.01))).show()
    

列オブジェクトを特定の型にキャストする

column オブジェクトを特定の型にキャストするには、 cast メソッドを呼び出し、 com.snowflake.snowpark.typesパッケージ から型オブジェクトを渡します。たとえば、リテラルを5の精度と2のスケールで NUMBER としてキャストするには、次のようにします。

// Import for the lit function.
import com.snowflake.snowpark.functions._
// Import for the DecimalType class..
import com.snowflake.snowpark.types._

val decimalValue = lit(0.05).cast(new DecimalType(5,2))

メソッド呼び出しのチェーン

DataFrame オブジェクトを変換するメソッド ごとに、変換が適用された新しい DataFrame オブジェクトが返されるため、 メソッド呼び出しのチェーン により、追加の方法で変換される新しい DataFrame を生成できます。

次の例では、次のように構成された DataFrame を返します。

  • products テーブルをクエリします。

  • id = 1 で行を返します。

  • name および serial_number 列を選択します。

val dfProductInfo = session.table("products").filter(col("id") === 1).select(col("name"), col("serial_number"))

この例では、

  • session.table("products") は、 products テーブルの DataFrame を返します。

    DataFrame にはまだテーブルのデータが含まれていませんが、オブジェクトにはテーブルの列の定義が含まれています。

  • filter(col("id") === 1) は、 id = 1 で行を返すように設定された products テーブルの DataFrame を返します。

    DataFrame には、テーブルからの一致する行がまだ含まれていないことに再度注意してください。 アクションメソッドを呼び出す まで、一致する行は取得されません。

  • select(col("name"), col("serial_number")) は、 id = 1 を持つ products テーブルの行の name 列と serial_number 列を含む DataFrame を返します。

メソッド呼び出しをチェーンするときは、呼び出しの順序が重要であることに注意してください。各メソッド呼び出しは、変換された DataFrame を返します。後続の呼び出しが変換された DataFrame で機能することを確認します。

たとえば、次のコードでは、 select メソッドは nameserial_number の2つの列のみを含む DataFrame を返します。この DataFrame の filter メソッド呼び出しは、変換された DataFrame にない id 列を使用しているため失敗します。

// This fails with the error "invalid identifier 'ID'."
val dfProductInfo = session.table("products").select(col("name"), col("serial_number")).filter(col("id") === 1)

対照的に、次のコードは、 products テーブルのすべての列(id 列を含む)を含む DataFrame で filter() メソッドが呼び出されるため、正常に実行されます。

// This succeeds because the DataFrame returned by the table() method
// includes the id column.
val dfProductInfo = session.table("products").filter(col("id") === 1).select(col("name"), col("serial_number"))

SQL ステートメントで同等のキーワード(SELECT および WHERE)を使用する場合とは異なる順序で、 select および filter メソッドの呼び出しが必要になる場合があることに注意してください。

列定義の取得

DataFrame のためにデータセット内の列の定義を取得するには、 schema メソッドを呼び出します。このメソッドは、 StructField オブジェクトの Array を含む StructType オブジェクトを返します。各 StructField オブジェクトには、列の定義が含まれています。

// Get the StructType object that describes the columns in the
// underlying rowset.
val dfDefinition = session.table("products").schema

返された StructType オブジェクトでは、列名は常に正規化されています。引用符で囲まれていない識別子は大文字で返され、引用符で囲まれた識別子は定義された正確な大文字小文字で返されます。

次の例では、 ID および 3rd という名前の列を含む DataFrame を返します。列名 3rd の場合は、 名前が識別子の要件に準拠していない ため、Snowparkライブラリは名前を自動的に二重引用符("3rd")で囲みます。

この例では、 schema メソッドを呼び出してから、返された StructType オブジェクトで names メソッドを呼び出して、列名の Seq を取得します。名前は、 schema メソッドによって返される StructType で正規化されます。

// This returns Seq("ID", "\"3rd\"")
df.select(col("id"), col("3rd")).schema.names.toSeq

DataFrame を評価するアクションの実行

前述のように、 DataFrame は遅延評価されます。つまり、アクションを実行するまで、 SQL ステートメントは実行のためにサーバーに送信されません。アクションにより、 DataFrame が評価され、対応する SQL ステートメントが実行のためにサーバーに送信されます。

このリリースでは、次のメソッドがアクションを実行します。

クラス

メソッド

説明

DataFrame

collect

DataFrame を評価し、結果のデータセットを Row オブジェクトの Array として返します。

DataFrame

count

DataFrame を評価し、行数を返します。

DataFrame

show

DataFrame を評価し、行をコンソールに出力します。この方法では、行数が10に制限されることに注意してください(デフォルト)。

DataFrameWriter

saveAsTable

DataFrame のデータを指定したテーブルに保存します。 テーブルへのデータの保存 をご参照ください。

Updatable

delete

指定されたテーブルの行を削除します。 テーブル内の行の更新、削除、およびマージ をご参照ください。

Updatable

update

指定されたテーブルの行を更新します。 テーブル内の行の更新、削除、およびマージ をご参照ください。

MergeBuilder

collect

指定されたテーブルに行をマージします。 テーブル内の行の更新、削除、およびマージ をご参照ください。

たとえば、テーブルに対してクエリを実行して結果を返すには、 collect メソッドを呼び出します。

// Create a DataFrame for the row in the "products" table with the id 1.
// This does not execute the query.
val dfProductIdOne = session.table("products").filter(col("id") === 1)

// Send the query to the server for execution and
// return an Array of Rows containing the results.
val results = dfProductIdOne.collect()

クエリを実行して結果の数を返すには、 count メソッドを呼び出します。

// Create a DataFrame for the "products" table.
val dfProducts = session.table("products")

// Send the query to the server for execution and
// return the count of rows in the table.
val resultCount = dfProducts.count()

クエリを実行して結果をコンソールに出力するには、 show メソッドを呼び出します。

// Create a DataFrame for the "products" table.
val dfProducts = session.table("products")

// Send the query to the server for execution and
// print the results to the console.
// The query limits the number of rows to 10 by default.
dfProducts.show()

// Limit the number of rows to 20, rather than 10.
dfProducts.show(20)

注: DataFrame の列の定義を取得するために schema メソッドを呼び出す場合は、アクションメソッドを呼び出す必要はありません。

テーブル内の行の更新、削除、およびマージ

注釈

この機能は、Snowpark 0.7.0で導入されました。

Session.table を呼び出してテーブルの DataFrame オブジェクトを作成すると、メソッドは Updatable オブジェクトを返します。これは、テーブル内のデータを更新および削除するための追加のメソッドにより、 DataFrame を拡張します。(更新可能 をご参照ください。)

テーブルの行を更新または削除する必要がある場合は、 Updatable クラスの次のメソッドを使用できます。

  • update を呼び出して、テーブル内の既存の行を更新します。 テーブルにある行の更新 をご参照ください。

  • delete を呼び出して、テーブルから行を削除します。 テーブル内にある行の削除 をご参照ください。

  • merge を呼び出して、2番目のテーブルまたはサブクエリのデータに基づき、1つのテーブルにある行を挿入、更新、および削除します。(これは、 SQL の MERGE コマンドと同等です。) 行のテーブルへのマージ をご参照ください。

テーブルにある行の更新

update メソッドの場合は、更新する列とそれらの列に割り当てる対応する値を関連付ける、 Map で渡します。 update は、更新された行数を含む UpdateResult オブジェクトを返します。(UpdateResult をご参照ください。)

注釈

update は、 アクションメソッド です。これは、メソッドを呼び出すと、 SQL ステートメントがサーバーに送信されて実行されることを意味します。

たとえば、 count という名前の列の値を値 1 に置き換えるには、

val updatableDf = session.table("products")
val updateResult = updatableDf.update(Map("count" -> lit(1)))
println(s"Number of rows updated: ${updateResult.rowsUpdated}")

上記の例では、列の名前を使用して列を識別しています。列式を使用することもできます。

val updateResult = updatableDf.update(Map(col("count") -> lit(1)))

条件が満たされたときにのみ更新する必要がある場合は、その条件を引数として指定できます。たとえば、 category_id 列の値が 20 である行の count という名前の列の値を置き換えるには、次のようにします。

val updateResult = updatableDf.update(Map(col("count") -> lit(1)), col("category_id") === 20)

別の DataFrame オブジェクトとの結合に基づいて条件を作成する必要がある場合は、その DataFrame を引数として渡し、その DataFrame を条件で使用できます。たとえば、 category_id 列が DataFrame dfPartcategory_id と一致する行の、 count という名前の列の値を置き換えるには、次のようにします。

val updatableDf = session.table("products")
val dfParts = session.table("parts")
val updateResult = updatableDf.update(Map(col("count") -> lit(1)), updatableDf("category_id") === dfParts("category_id"))

テーブル内にある行の削除

delete メソッドの場合は、削除する行を識別する条件を指定でき、その条件は別の DataFrame との結合に基づくことができます。 delete は、削除された行数を含む DeleteResult オブジェクトを返します。(DeleteResult をご参照ください。)

注釈

delete は、 アクションメソッド です。これは、メソッドを呼び出すと、 SQL ステートメントがサーバーに送信されて実行されることを意味します。

たとえば、 category_id 列が DataFrame dfPartcategory_id と一致する行を削除するには、次のようにします。

val updatableDf = session.table("products")
val deleteResult = updatableDf.delete(updatableDf("category_id") === dfParts("category_id"))
println(s"Number of rows deleted: ${deleteResult.rowsDeleted}")

行のテーブルへのマージ

2番目のテーブルまたはサブクエリの値に基づいて1つのテーブルの行を挿入、更新、および削除するには(SQL の MERGE コマンドに相当)、次の手順を実行します。

  1. データをマージするテーブルの Updatable オブジェクトで、 merge メソッドを呼び出し、他のテーブルの DataFrame オブジェクトと、結合条件の列式を渡します。

    これにより、一致する行と一致しない行に対して実行するアクション(例: 挿入、更新、削除)を指定するために使用できる、 MergeBuilder オブジェクトが返されます。(MergeBuilder をご参照ください。)

  2. MergeBuilder オブジェクトの使用。

    • 一致する行に対して実行する必要がある更新または削除を指定するには、 whenMatched メソッドを呼び出します。

      行の更新または削除が必要なときに追加の条件を指定する必要がある場合は、その条件の列式を渡すことができます。

      このメソッドは、実行するアクションを指定するために使用できる MatchedClauseBuilder オブジェクトを返します。(MatchedClauseBuilder をご参照ください。)

      MatchedClauseBuilder オブジェクトの update または delete メソッドを呼び出して、一致する行に対して実行する必要がある更新または削除アクションを指定します。これらのメソッドは、追加の句を指定するために使用できる MergeBuilder オブジェクトを返します。

    • 行が一致しない場合に実行する必要のある挿入を指定するには、 whenNotMatched メソッドを呼び出します。

      行を挿入する必要があるときに追加の条件を指定する必要がある場合は、その条件の列式を渡すことができます。

      このメソッドは、実行するアクションを指定するために使用できる NotMatchedClauseBuilder オブジェクトを返します。(NotMatchedClauseBuilder をご参照ください。)

      NotMatchedClauseBuilder オブジェクトの insert メソッドを呼び出して、行が一致しない場合に実行する必要のある挿入アクションを指定します。これらのメソッドは、追加の句を指定するために使用できる MergeBuilder オブジェクトを返します。

  3. 実行する必要のある挿入、更新、および削除の指定が完了したら、 MergeBuilder オブジェクトの collect メソッドを呼び出して、指定した挿入、更新、および削除をテーブルで実行します。

    collect は、挿入、更新、および削除された行数を含む MergeResult オブジェクトを返します。(MergeResult をご参照ください。)

次の例では、 target テーブルに一致する ID の行が含まれていない場合に、 source テーブルの id 列と value 列の行を target テーブルに挿入します。

val mergeResult = target.merge(source, target("id") === source("id"))
                      .whenNotMatched.insert(Seq(source("id"), source("value")))
                      .collect()

次の例では、同じ ID を持つ source テーブルにある行の value 列の値で、 target テーブルにある行を更新します。

val mergeResult = target.merge(source, target("id") === source("id"))
                      .whenMatched.update(Map("value" -> source("value")))
                      .collect()

テーブルへのデータの保存

DataFrame の内容をテーブルに保存するには、

  1. write メソッドを呼び出し、 DataFrameWriter オブジェクトを取得します。

  2. DataFrameWriter オブジェクトの mode メソッドを呼び出し、テーブルの行を挿入するか更新するかを指定します。このメソッドは、指定されたモードで構成された新しい DataFrameWriter オブジェクトを返します。

  3. DataFrameWriter オブジェクトの saveToTable メソッドを呼び出し、 DataFrame の内容を指定されたテーブルに保存します。

データをテーブルに保存する SQL ステートメントを実行するために、別のメソッド(例: collect)を呼び出す必要はありません。

例:

df.write.mode(SaveMode.Overwrite).saveAsTable(tableName)

DataFrame からのビューの作成

DataFrame からビューを作成するには、 createOrReplaceView メソッドを呼び出します。

df.createOrReplaceView("db.schema.viewName")

createOrReplaceView を呼び出すと、ただちに新しいビューが作成されることに注意してください。さらに重要なことに、 DataFrame が評価されることはありません。(アクションを実行 するまで、 DataFrame 自体は評価されません。)

createOrReplaceView を呼び出して作成したビューは永続的です。そのビューが不要になった場合は、 ビューを手動でドロップ できます。

ステージでのファイルの操作

このセクションでは、Snowflakeステージでファイル内のデータをクエリする方法について説明します。ファイルに対するその他の操作には、 SQL ステートメント を使用します。

Snowflakeステージのファイル内のデータをクエリするには、 DataFrameReader クラスを使用します。

  1. Session クラスの read メソッドを呼び出して、 DataFrameReader オブジェクトにアクセスします。

  2. ファイルが CSV 形式の場合は、ファイルのフィールドを記述します。これを実行するには、

    1. ファイル内のフィールドを記述する一連の StructField オブジェクトで構成される StructType オブジェクトを作成します。

    2. StructField オブジェクトごとに、以下を指定します。

      • フィールドの名前。

      • フィールドのデータ型(com.snowflake.snowpark.types パッケージでオブジェクトとして指定)。

      • フィールドがNULL可能かどうか。

      例:

      import com.snowflake.snowpark.types._
      
      val schemaForDataFile = StructType(
          Seq(
              StructField("id", StringType, true),
              StructField("name", StringType, true)))
      
    3. DataFrameReader オブジェクトで schema メソッドを呼び出し、 StructType オブジェクトを渡します。

      例:

      var dfReader = session.read.schema(schemaForDataFile)
      

      schema メソッドは、指定されたフィールドを含むファイルを読み取るように構成された DataFrameReader オブジェクトを返します。

      他の形式(JSON など)のファイルでは、これを実行する必要はありません。これらのファイルの場合、 DataFrameReader は、データをフィールド名 $1 の VARIANT 型の単一フィールドとして扱います。

  3. データの読み取り方法に関する追加情報を指定する必要がある(例: データが圧縮されている、または CSV ファイルがフィールドを区切るためにコンマではなくセミコロンを使用している)場合は、 DataFrameReader オブジェクトの options メソッドを呼び出します。

    設定するオプションの名前と値を渡します。ファイル形式オプションの名前と値については、 CREATE FILE FORMAT のドキュメント をご参照ください。

    COPY INTO TABLE ドキュメント で説明されているコピーオプションを設定することもできます。コピーオプションを設定すると、 DataFrame にデータを取得 するときに、よりコストのかかる実行戦略の実行される可能性があることに注意してください。

    次の例では、圧縮されておらず、フィールド区切り文字にセミコロンを使用する CSV ファイル内のデータをクエリするように、 DataFrameReader オブジェクトを設定します。

    dfReader = dfReader.option("field_delimiter", ";").option("COMPRESSION", "NONE")
    

    options メソッドは、指定されたオプションで構成された DataFrameReader オブジェクトを返します。

  4. ファイルの形式に対応するメソッド(例: csv メソッド)を呼び出し、ファイルの場所を渡します。

    val df = dfReader.csv("@s3_ts_stage/emails/data_0_0_0.csv")
    

    ファイルの形式に対応するメソッドは、そのファイルにデータを保持するように構成された DataFrame オブジェクトを返します。

  5. DataFrame オブジェクトメソッドを使用して、データセットで必要な 変換を実行 します(例: 特定のフィールドの選択、行のフィルタリングなど)。

    たとえば、 mystage という名前のステージで JSON ファイルから color 要素を抽出するには、

    // Import the sqlExpr function from the functions object.
    import com.snowflake.snowpark.functions._
    
    val df = session.read.json("@mystage").select(sqlExpr("$1:color"))
    

    前に説明したように、 CSV 以外の形式のファイル(例: JSON)の場合、 DataFrameReader は、ファイル内のデータを $1 という名前の単一の VARIANT 列として扱います。

    この例では、 com.snowflake.snowpark.functions オブジェクトの sqlExpr 関数を使用して、 color 要素へのパスを指定します。

    sqlExpr 関数は入力引数を解釈または変更しないことに注意してください。この関数を使用すると、Snowpark API でまだサポートされていない式とスニペットを SQL で作成できます。

  6. アクションメソッドの呼び出し により、ファイル内のデータをクエリします。

    テーブルの DataFrames の場合と同様、データは、アクションメソッドを呼び出すまで DataFrame に取得されません。

SQL ステートメントの実行

指定した SQL ステートメントを実行するには、 Session クラスの sql メソッドを呼び出し、実行するステートメントを渡します。メソッドは DataFrame を返します。

アクションメソッドの呼び出し があるまで、 SQL ステートメントは実行されないことに注意してください。

// Get the list of the files in a stage.
// The collect() method causes this SQL statement to be executed.
val stageFilesDf = session.sql("ls @myStage").collect()

// Resume the operation of a warehouse.
// Note that you must call the collect method in order to execute
// the SQL statement.
session.sql("alter warehouse if exists myWarehouse resume if suspended").collect()

val tableDf = session.table("table").select(col("a"), col("b"))
// Get the count of rows from the table.
val numRows = tableDf.count()

// Set up a SQL statement to copy data from a stage to a table.
val copyDf = session.sql("copy into myTable from @myStage file_format=(type = csv)").collect()

メソッドを呼び出して DataFrame を変換 する場合(例: フィルター、選択など)、これらのメソッドは、基になる SQL ステートメントが SELECT ステートメントである場合にのみ機能することに注意してください。変換メソッドは、他の種類の SQL ステートメントではサポートされていません。

val df = session.sql("select a, c from table where b < 1")
// Because the underlying SQL statement for the DataFrame is a SELECT statement,
// you can call the filter method to transform this DataFrame.
val results = df.filter(col("c") < 10).select(col("a")).collect()

// In this example, the underlying SQL statement is not a SELECT statement.
val df = session.sql("ls @myStage")
// Calling the filter method results in an error.
df.filter(...)