Snowparkでの DataFrames の操作

Snowpark内でデータをクエリして処理する主な方法は、 DataFrame を使用することです。このトピックでは、 DataFrames の操作方法について説明します。

このトピックの内容:

データを取得して操作するには、 DataFrame クラスを使用します。DataFrame は、遅延評価されるリレーショナルデータセットを表します。これは、特定のアクションがトリガーされたときにのみ実行されます。ある意味で、 DataFrame は、データを取得するために評価する必要があるクエリのようなものです。

データを DataFrame に取得するには、

  1. DataFrame を作成し、データセットのためにデータのソースを指定 します。

    たとえば、テーブル、外部 CSV ファイル、または SQL ステートメントの実行からのデータを保持する DataFrame を作成できます。

  2. DataFrame のデータセットを変換する方法を指定 します。

    たとえば、どの列を選択するか、行をどのようにフィルタリングするか、結果をどのようにソートおよびグループ化するかなどを指定できます。

  3. ステートメントを実行して、データをDataFrame に取得 します。

    データを DataFrame に取得するには、アクションを実行するメソッド(例: collect() メソッド)を呼び出す必要があります。

次のセクションでは、これらのステップについて詳しく説明します。

DataFrame の構築

DataFrame を構築するには、 Session クラスのメソッドを使用できます。次の各メソッドは、異なるタイプのデータソースから DataFrame を作成します。

  • テーブル、ビュー、またはストリームのデータから DataFrame を作成するには、 table メソッドを呼び出します。

    // Create a DataFrame from the data in the "products" table.
    val dfTable = session.table("products")
    
  • 一連の値から DataFrame を作成するには、 createDataFrame メソッドを呼び出します。

    // Create a DataFrame containing a sequence of values.
    // In the DataFrame, name the columns "i" and "s".
    val dfSeq = session.createDataFrame(Seq((1, "one"), (2, "two"))).toDF("i", "s")
    
  • 値の範囲を含む DataFrame を作成するには、 range メソッドを呼び出します。

    // Create a DataFrame from a range
    val dfRange = session.range(1, 10, 2)
    
  • ステージ内のファイルからのデータを保持する DataFrame を作成するには、 read を呼び出して DataFrameReader オブジェクトを取得します。 DataFrameReader オブジェクトで、ファイル内のデータの形式に対応するメソッドを呼び出します。

    // Create a DataFrame from data in a stage.
    val dfJson = session.read.json("@mystage2/data1.json")
    
  • SQL クエリの結果を保持する DataFrame を作成するには、 sql メソッドを呼び出します。

    // Create a DataFrame from a SQL query
    val dfSql = session.sql("SELECT name from products")
    

    注: このメソッドを使用すると、テーブルおよびステージングされたファイルからデータを取得する SELECT ステートメントを実行できますが、そうではなく、 table および read メソッドを使用するようにします。 tableread のようなメソッドは、開発ツールでより優れた構文の強調表示、エラーの強調表示、およびインテリジェントなコード補完を提供できます。

データセットの変換方法の指定

選択する列と、結果のフィルタリング、並べ替え、グループ化などを指定するには、データセットを変換する DataFrame メソッドを呼び出します。これらのメソッドで列を識別するには、 col 関数または列に評価される式を使用します。(列と式の指定 を参照。)

例:

  • 返される行を指定するには、 filter メソッドを呼び出します。

    // Create a DataFrame for the rows with the ID 1
    // in the "products" table.
    //
    // This example uses the === operator of the Column object to perform an
    // equality check.
    val dfProductIdOne = dfProductInfo.filter(col("id") === 1)
    
  • 選択する列を指定するには、 select メソッドを呼び出します。

    // Import the col function from the functions object.
    import com.snowflake.snowpark.functions._
    
    // Create a DataFrame object for the "products" table.
    val dfProductInfo = session.table("products")
    
    // Create a DataFrame that contains the id, name, and serial_number
    // columns.
    val dfProductSerialNo =
        dfProductInfo.select(col("id"), col("name"), col("serial_number"))
    

各メソッドは、変換された新しい DataFrame オブジェクトを返します。(このメソッドは元の DataFrame オブジェクトには影響を及ぼしません。)つまり、複数の変換を適用する場合は、 チェーンメソッド呼び出し を実行して、前のメソッド呼び出しによって返された、新しい DataFrame オブジェクトに対する後続の各変換メソッドを呼び出すことができます。

これらの変換方法は、Snowflakeデータベースからデータを取得しないことに注意してください。(DataFrame を評価するアクションの実行 で説明されているアクションメソッドは、データ取得を実行します。)変換メソッドは、 SQL ステートメントの作成方法を指定するだけです。

DataFrames の結合

DataFrame オブジェクトを結合するには、 join メソッドを呼び出します。

// Create a DataFrame that joins two other DataFrames
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("key") === dfRhs.col("key"))

この例では、 DataFrame.col メソッドを使用して、結合で使用する列を指定しています。この方法の詳細については、 列と式の指定 をご参照ください。

異なる列でテーブルをそれ自体と結合する必要がある場合は、単一の DataFrame で自己結合を実行することはできません。単一の DataFrame を使用して自己結合を実行する次の例は、 "id" の列式が結合の左側と右側に存在するため、失敗します。

val dfJoined = df.join(df, col("id") === col("parent_id"))

val dfJoined = df.join(df, df("id") === df("parent_id"))

これらの例は両方とも、次の例外を除いて失敗します。

Exception in thread "main" com.snowflake.snowpark.SnowparkClientException:
  Joining a DataFrame to itself can lead to incorrect results due to ambiguity of column references.
  Instead, join this DataFrame to a clone() of itself.

代わりに、 DataFrame.clone() メソッドを使用して DataFrame オブジェクトのクローンを作成し、2つの DataFrame オブジェクトを使用して結合を実行します。

// Create a DataFrame object for the "products" table for the left-hand side of the join.
val dfLhs = session.table("products")
// Clone the DataFrame object to use as the right-hand side of the join.
val dfRhs = dfLhs.clone()

// Create a DataFrame that joins the two DataFrames
// for the "products" table on the "key" column.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("id") === dfRhs.col("parent_id"))

同じ列で自己結合を実行する場合は、 USING 句の列式の Seq を渡す join メソッドを呼び出します。

// Create a DataFrame that performs a self-join on
// the DataFrame for the "products" table using the "key" column.
val dfJoined = df.join(df, Seq("key"))

列と式の指定

これらの変換メソッドを呼び出すときは、列または列を使用する式を指定する必要がある場合があります。たとえば、 select メソッドを呼び出すときは、選択する必要のある列を指定する必要があります。

列を参照するには、 com.snowflake.snowpark.functions オブジェクトで col 関数を呼び出して、 Column オブジェクトを作成します。

// Import the col function from the functions object.
import com.snowflake.snowpark.functions._

val dfProductInfo = session.table("products").select(col("id"), col("name"))

フィルター、射影、結合条件などを指定する場合、式で Column オブジェクトを使用できます。次の例では、式で Column オブジェクトを使用して次を実行します。

  • id 列の値が 20 であり、 a 列と b 列の値の合計が 10 未満である行を取得します。

  • c という名前の列に b10 を掛けた値を返します。 c は、 DataFrame を結合する次のステートメントで使用される列エイリアスです。

  • DataFrame df を計算された DataFrame dfCompute と結合します。

val dfCompute = session.table("T").filter(col("id") === 20).filter((col("a") + col("b")) < 10).select((col("b") * 10) as "c")
val df2 = df.join(dfCompute, col("a") === col("c") && col("a") === col("d"))

同じ名前の2つの異なる DataFrame オブジェクトの列を参照する場合(例: その列の DataFrames を結合する場合)、1つの DataFrame オブジェクトにある DataFrame.col メソッドを使用して、そのオブジェクトにある列(例: df1.col("name")df2.col("name"))を参照できます。

次の例は、 DataFrame.col メソッドを使用して特定の DataFrame の列を参照する方法を示しています。この例では、両方とも key という名前の列を持つ2つの DataFrame オブジェクトを結合します。この例では、 Column.as メソッドを使用して、新しく作成された DataFrame の列の名前を変更します。

// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.col method to refer to the columns used in the join.
val dfJoined = dfLhs.join(dfRhs, dfLhs.col("key") === dfRhs.col("key")).select(dfLhs.col("value").as("L"), dfRhs.col("value").as("R"))

DataFrame.col メソッドの代わりに、 DataFrame.apply メソッドを使用して特定の DataFrame の列を参照できます。 DataFrame.col メソッドと同様に、 DataFrame.apply メソッドは入力として列名を受け入れ、 Column オブジェクトを返します。

オブジェクトでScalaに apply メソッドがある場合は、関数であるかのようにオブジェクトを呼び出すことで apply メソッドを呼び出すことができます。たとえば、 df.apply("column_name") を呼び出すには、単に df("column_name") と記述できます。次の呼び出しは同等です。

  • df.col("<列名>")

  • df.apply("<列名>")

  • df("<列名>")

次の例は前の例と同じですが、結合操作で列を参照するために DataFrame.apply メソッドを使用します。

// Create a DataFrame that joins two other DataFrames (dfLhs and dfRhs).
// Use the DataFrame.apply method to refer to the columns used in the join.
// Note that dfLhs("key") is shorthand for dfLhs.apply("key").
val dfJoined = dfLhs.join(dfRhs, dfLhs("key") === dfRhs("key")).select(dfLhs("value").as("L"), dfRhs("value").as("R"))

列オブジェクトの短縮形の使用

col 関数を使用する代わりに、次のいずれかの方法で列を参照できます。

  • 引用符で囲まれた列名($"column_name")の前にドル記号を使用する。

  • 引用符で囲まれていない列名('column_name)の前にアポストロフィ(一重引用符)を使用する。

これを実行するには、 Session オブジェクトを作成した後、 implicits オブジェクトから名前をインポートします。

val session = Session.builder.configFile("/path/to/properties").create

// Import this after you create the session.
import session.implicits._

// Use the $ (dollar sign) shorthand.
val df = session.table("T").filter($"id" === 10).filter(($"a" + $"b") < 10)

// Use ' (apostrophe) shorthand.
val df = session.table("T").filter('id === 10).filter(('a + 'b) < 10).select('b * 10)

オブジェクト識別子(テーブル名、列名など)の前後での二重引用符の使用

指定するデータベース、スキーマ、テーブル、およびステージの名前は、 Snowflake識別子の要件 に準拠している必要があります。名前を指定すると、Snowflakeはその名前を大文字と見なします。たとえば、次の呼び出しは同等です。

// The following calls are equivalent:
df.select(col("id123"))
df.select(col("ID123"))

名前が識別子の要件に準拠していない場合は、名前を二重引用符(")で囲む必要があります。バックスラッシュ(\)を使用して、Scala文字列リテラル内の二重引用符をエスケープします。たとえば、次のテーブル名は文字やアンダースコアで始まらないため、名前を二重引用符で囲む必要があります。

val df = session.table("\"10tablename\"")

の名前を指定するときは、名前を二重引用符で囲む必要はありません。名前が識別子の要件に準拠していない場合、Snowparkライブラリは、列名を自動的に二重引用符で囲みます。

// The following calls are equivalent:
df.select(col("3rdID"))
df.select(col("\"3rdID\"))

// The following calls are equivalent:
df.select(col("id with space"))
df.select(col("\"id with space\""))

すでに列名を二重引用符で囲んでいる場合、ライブラリは名前を二重引用符で囲みません。

場合によっては、列名に二重引用符が含まれることがあります。

describe table quoted;
+------------------------+ ...
| name                   | ...
|------------------------+ ...
| name_with_"air"_quotes | ...
| "column_name_quoted"   | ...
+------------------------+ ...

識別子の要件 で説明されているように、二重引用符で囲まれた識別子内の二重引用符文字ごとに、2つの二重引用符文字(例: "name_with_""air""_quotes""""column_name_quoted""")を使用する必要があります。

val dfTable = session.table("quoted")
dfTable.select("\"name_with_\"\"air\"\"_quotes\"").show()
dfTable.select("\"\"\"column_name_quoted\"\"\"").show()

識別子が二重引用符で囲まれている場合(明示的に引用符を追加したか、ライブラリが引用符を追加したかに関係なく)、 Snowflakeは識別子を大文字と小文字を区別する ものとして扱います。

// The following calls are NOT equivalent!
// The Snowpark library adds double quotes around the column name,
// which makes Snowflake treat the column name as case-sensitive.
df.select(col("id with space"))
df.select(col("ID WITH SPACE"))

メソッド呼び出しのチェーン

DataFrame オブジェクトを変換するメソッド ごとに、変換が適用された新しい DataFrame オブジェクトが返されるため、 メソッド呼び出しのチェーン により、追加の方法で変換される新しい DataFrame を生成できます。

次の例では、次のように構成された DataFrame を返します。

  • products テーブルをクエリします。

  • id = 1 で行を返します。

  • name および serial_number 列を選択します。

val dfProductInfo = session.table("products").filter(col("id") === 1).select(col("name"), col("serial_number"))

この例では、

  • session.table("products") は、 products テーブルの DataFrame を返します。

    DataFrame にはまだテーブルのデータが含まれていませんが、オブジェクトにはテーブルの列の定義が含まれています。

  • filter(col("id") === 1) は、 id = 1 で行を返すように設定された products テーブルの DataFrame を返します。

    DataFrame には、テーブルからの一致する行がまだ含まれていないことに再度注意してください。 アクションメソッドを呼び出す まで、一致する行は取得されません。

  • select(col("name"), col("serial_number")) は、 id = 1 を持つ products テーブルの行の name 列と serial_number 列を含む DataFrame を返します。

メソッド呼び出しをチェーンするときは、呼び出しの順序が重要であることに注意してください。各メソッド呼び出しは、変換された DataFrame を返します。後続の呼び出しが変換された DataFrame で機能することを確認します。

たとえば、次のコードでは、 select メソッドは nameserial_number の2つの列のみを含む DataFrame を返します。この DataFrame の filter メソッド呼び出しは、変換された DataFrame にない id 列を使用しているため失敗します。

// This fails with the error "invalid identifier 'ID'."
val dfProductInfo = session.table("products").select(col("name"), col("serial_number")).filter(col("id") === 1)

対照的に、次のコードは、 products テーブルのすべての列(id 列を含む)を含む DataFrame で filter() メソッドが呼び出されるため、正常に実行されます。

// This succeeds because the DataFrame returned by the table() method
// includes the id column.
val dfProductInfo = session.table("products").filter(col("id") === 1).select(col("name"), col("serial_number"))

SQL ステートメントで同等のキーワード(SELECT および WHERE)を使用する場合とは異なる順序で、 select および filter メソッドの呼び出しが必要になる場合があることに注意してください。

列定義の取得

DataFrame のためにデータセット内の列の定義を取得するには、 schema メソッドを呼び出します。このメソッドは、 StructField オブジェクトの Array を含む StructType オブジェクトを返します。各 StructField オブジェクトには、列の定義が含まれています。

// Get the StructType object that describes the columns in the
// underlying rowset.
val dfDefinition = session.table("products").schema

返された StructType オブジェクトでは、列名は常に正規化されています。引用符で囲まれていない識別子は大文字で返され、引用符で囲まれた識別子は定義された正確な大文字小文字で返されます。

次の例では、 ID および 3rd という名前の列を含む DataFrame を返します。列名 3rd の場合は、 名前が識別子の要件に準拠していない ため、Snowparkライブラリは名前を自動的に二重引用符("3rd")で囲みます。

この例では、 schema メソッドを呼び出してから、返された StructType オブジェクトで names メソッドを呼び出して、列名の Seq を取得します。名前は、 schema メソッドによって返される StructType で正規化されます。

// This returns Seq("ID", "\"3rd\"")
df.select(col("id"), col("3rd")).schema.names.toSeq

DataFrame を評価するアクションの実行

前述のように、 DataFrame は遅延評価されます。つまり、アクションを実行するまで、 SQL ステートメントは実行のためにサーバーに送信されません。アクションにより、 DataFrame が評価され、対応する SQL ステートメントが実行のためにサーバーに送信されます。

このリリースでは、次の DataFrame メソッドがアクションを実行します。

メソッド

説明

collect

DataFrame を評価し、結果のデータセットを Row オブジェクトの Array として返します。

count

DataFrame を評価し、行数を返します。

show

DataFrame を評価し、行をコンソールに出力します。この方法では、行数が10に制限されることに注意してください(デフォルト)。

write.saveAsTable .DataFrameWriter 方法)

DataFrame のデータを指定したテーブルに保存します。

たとえば、テーブルに対してクエリを実行して結果を返すには、 collect メソッドを呼び出します。

// Create a DataFrame for the row in the "products" table with the id 1.
// This does not execute the query.
val dfProductIdOne = session.table("products").filter(col("id") === 1)

// Send the query to the server for execution and
// return an Array of Rows containing the results.
val results = dfProductIdOne.collect()

クエリを実行して結果の数を返すには、 count メソッドを呼び出します。

// Create a DataFrame for the "products" table.
val dfProducts = session.table("products")

// Send the query to the server for execution and
// return the count of rows in the table.
val resultCount = dfProducts.count()

クエリを実行して結果をコンソールに出力するには、 show メソッドを呼び出します。

// Create a DataFrame for the "products" table.
val dfProducts = session.table("products")

// Send the query to the server for execution and
// print the results to the console.
// The query limits the number of rows to 10 by default.
dfProducts.show()

// Limit the number of rows to 20, rather than 10.
dfProducts.show(20)

注: DataFrame の列の定義を取得するために schema メソッドを呼び出す場合は、アクションメソッドを呼び出す必要はありません。

テーブルへのデータの保存

DataFrame の内容をテーブルに保存するには、

  1. write メソッドを呼び出し、 DataFrameWriter オブジェクトを取得します。

  2. DataFrameWriter オブジェクトの mode メソッドを呼び出し、テーブルの行を挿入するか更新するかを指定します。このメソッドは、指定されたモードで構成された新しい DataFrameWriter オブジェクトを返します。

  3. DataFrameWriter オブジェクトの saveToTable メソッドを呼び出し、 DataFrame の内容を指定されたテーブルに保存します。

データをテーブルに保存する SQL ステートメントを実行するために、別のメソッド(例: collect)を呼び出す必要はありません。

例:

df.write.mode(SaveMode.Overwrite).saveAsTable(tableName)

DataFrame からのビューの作成

DataFrame からビューを作成するには、 createOrReplaceView メソッドを呼び出します。

df.createOrReplaceView("db.schema.viewName")

createOrReplaceView を呼び出すと、ただちに新しいビューが作成されることに注意してください。さらに重要なことに、 DataFrame が評価されることはありません。(アクションを実行 するまで、 DataFrame 自体は評価されません。)

createOrReplaceView を呼び出して作成したビューは永続的です。そのビューが不要になった場合は、 ビューを手動でドロップ できます。

ステージでのファイルの操作

このセクションでは、Snowflakeステージでファイル内のデータをクエリする方法について説明します。ファイルに対するその他の操作には、 SQL ステートメント を使用します。

Snowflakeステージのファイル内のデータをクエリするには、 DataFrameReader クラスを使用します。

  1. Session クラスの read メソッドを呼び出して、 DataFrameReader オブジェクトにアクセスします。

  2. ファイルが CSV 形式の場合は、ファイルのフィールドを記述します。これを実行するには、

    1. ファイル内のフィールドを記述する一連の StructField オブジェクトで構成される StructType オブジェクトを作成します。

    2. StructField オブジェクトごとに、以下を指定します。

      • フィールドの名前。

      • フィールドのデータ型(com.snowflake.snowpark.types パッケージでオブジェクトとして指定)。

      • フィールドがNULL可能かどうか。

      例:

      import com.snowflake.snowpark.types._
      
      val schemaForDataFile = StructType(
          Seq(
              StructField("id", StringType, true),
              StructField("name", StringType, true)))
      
    3. DataFrameReader オブジェクトで schema メソッドを呼び出し、 StructType オブジェクトを渡します。

      例:

      var dfReader = session.read.schema(schemaForDataFile)
      

      schema メソッドは、指定されたフィールドを含むファイルを読み取るように構成された DataFrameReader オブジェクトを返します。

      他の形式(JSON など)のファイルでは、これを実行する必要はありません。これらのファイルの場合、 DataFrameReader は、データをフィールド名 $1 の VARIANT 型の単一フィールドとして扱います。

  3. データの読み取り方法に関する追加情報を指定する必要がある(例: データが圧縮されている、または CSV ファイルがフィールドを区切るためにコンマではなくセミコロンを使用している)場合は、 DataFrameReader オブジェクトの options メソッドを呼び出します。

    設定するオプションの名前と値を渡します。ファイル形式オプションの名前と値については、 CREATE FILE FORMAT のドキュメント をご参照ください。

    COPY INTO TABLE ドキュメント で説明されているコピーオプションを設定することもできます。コピーオプションを設定すると、 DataFrame にデータを取得 するときに、よりコストのかかる実行戦略の実行される可能性があることに注意してください。

    次の例では、圧縮されておらず、フィールド区切り文字にセミコロンを使用する CSV ファイル内のデータをクエリするように、 DataFrameReader オブジェクトを設定します。

    dfReader = dfReader.option("field_delimiter", ";").option("COMPRESSION", "NONE")
    

    options メソッドは、指定されたオプションで構成された DataFrameReader オブジェクトを返します。

  4. ファイルの形式に対応するメソッド(例: csv メソッド)を呼び出し、ファイルの場所を渡します。

    val df = dfReader.csv("@s3_ts_stage/emails/data_0_0_0.csv")
    

    ファイルの形式に対応するメソッドは、そのファイルにデータを保持するように構成された DataFrame オブジェクトを返します。

  5. DataFrame オブジェクトメソッドを使用して、データセットで必要な 変換を実行 します(例: 特定のフィールドの選択、行のフィルタリングなど)。

    たとえば、 mystage という名前のステージで JSON ファイルから color 要素を抽出するには、

    // Import the sqlExpr function from the functions object.
    import com.snowflake.snowpark.functions._
    
    val df = session.read.json("@mystage").select(sqlExpr("$1:color"))
    

    前に説明したように、 CSV 以外の形式のファイル(例: JSON)の場合、 DataFrameReader は、ファイル内のデータを $1 という名前の単一の VARIANT 列として扱います。

    この例では、 com.snowflake.snowpark.functions オブジェクトの sqlExpr 関数を使用して、 color 要素へのパスを指定します。

    sqlExpr 関数は入力引数を解釈または変更しないことに注意してください。この関数を使用すると、Snowpark API でまだサポートされていない式とスニペットを SQL で作成できます。

  6. アクションメソッドの呼び出し により、ファイル内のデータをクエリします。

    テーブルの DataFrames の場合と同様、データは、アクションメソッドを呼び出すまで DataFrame に取得されません。

SQL ステートメントの実行

指定した SQL ステートメントを実行するには、 Session クラスの sql メソッドを呼び出し、実行するステートメントを渡します。メソッドは DataFrame を返します。

アクションメソッドの呼び出し があるまで、 SQL ステートメントは実行されないことに注意してください。

// Get the list of the files in a stage.
// The collect() method causes this SQL statement to be executed.
val stageFilesDf = session.sql("ls @myStage").collect()

// Resume the operation of a warehouse.
// Note that you must call the collect method in order to execute
// the SQL statement.
session.sql("alter warehouse if exists myWarehouse resume if suspended").collect()

val tableDf = session.table("table").select(col("a"), col("b"))
// Get the count of rows from the table.
val numRows = tableDf.count()

// Set up a SQL statement to copy data from a stage to a table.
val copyDf = session.sql("copy into myTable from @myStage file_format=(type = csv)").collect()

メソッドを呼び出して DataFrame を変換 する場合(例: フィルター、選択など)、これらのメソッドは、基になる SQL ステートメントが SELECT ステートメントである場合にのみ機能することに注意してください。変換メソッドは、他の種類の SQL ステートメントではサポートされていません。

val df = session.sql("select a, c from table where b < 1")
// Because the underlying SQL statement for the DataFrame is a SELECT statement,
// you can call the filter method to transform this DataFrame.
val results = df.filter(col("c") < 10).select(col("a")).collect()

// In this example, the underlying SQL statement is not a SELECT statement.
val df = session.sql("ls @myStage")
// Calling the filter method results in an error.
df.filter(...)