Snowpark PythonでのDataFramesの使用¶
Snowpark内でデータをクエリして処理する主な方法は、 DataFrame を使用することです。このトピックでは、 DataFrames の操作方法について説明します。
このトピックの内容:
データを取得して操作するには、 DataFrame
クラスを使用します。DataFrame は、遅延評価されるリレーショナルデータセットを表します。これは、特定のアクションがトリガーされたときにのみ実行されます。ある意味で、 DataFrame は、データを取得するために評価する必要があるクエリのようなものです。
データを DataFrame に取得するには、
DataFrame を作成し、データセットのためにデータのソースを指定 します。
例えば、テーブル、外部CSVファイル、ローカルデータ、またはSQLステートメントの実行からのデータを保持するDataFrameを作成できます。
DataFrame のデータセットを変換する方法を指定 します。
たとえば、どの列を選択するか、行をどのようにフィルタリングするか、結果をどのようにソートおよびグループ化するかなどを指定できます。
ステートメントを実行して、データをDataFrame に取得 します。
データを DataFrame に取得するには、アクションを実行するメソッド(例:
collect()
メソッド)を呼び出す必要があります。
次のセクションでは、これらのステップについて詳しく説明します。
このセクションの例の設定¶
このセクションの例のいくつかは、 DataFrame を使用して sample_product_data
という名前のテーブルをクエリします。これらの例を実行する場合は、次の SQL ステートメントを実行することにより、このテーブルを作成し、テーブルにデータを入力できます。
Snowpark Pythonを使用して SQL ステートメントを実行できます。
>>> session.sql('CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT)').collect()
[Row(status='Table SAMPLE_PRODUCT_DATA successfully created.')]
>>> session.sql("""
... INSERT INTO sample_product_data VALUES
... (1, 0, 5, 'Product 1', 'prod-1', 1, 10),
... (2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20),
... (3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30),
... (4, 0, 10, 'Product 2', 'prod-2', 2, 40),
... (5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50),
... (6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60),
... (7, 0, 20, 'Product 3', 'prod-3', 3, 70),
... (8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80),
... (9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90),
... (10, 0, 50, 'Product 4', 'prod-4', 4, 100),
... (11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100),
... (12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100)
... """).collect()
[Row(number of rows inserted=12)]
テーブルが作成されたことを確認するには、次のコマンドを実行します。
>>> session.sql("SELECT count(*) FROM sample_product_data").collect()
[Row(COUNT(*)=12)]
Pythonワークシートでの例の設定¶
Pythonワークシート でこれらの例を設定して実行するには、サンプルテーブルを作成し、Pythonワークシートを設定します。
SQL ワークシートを作成し、次を実行します。
CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT); INSERT INTO sample_product_data VALUES (1, 0, 5, 'Product 1', 'prod-1', 1, 10), (2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20), (3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30), (4, 0, 10, 'Product 2', 'prod-2', 2, 40), (5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50), (6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60), (7, 0, 20, 'Product 3', 'prod-3', 3, 70), (8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80), (9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90), (10, 0, 50, 'Product 4', 'prod-4', 4, 100), (11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100), (12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100); SELECT count(*) FROM sample_product_data;
sample_product_data
テーブルの作成に使用した SQL ワークシートと同じデータベースとスキーマコンテキストを設定して、 Pythonワークシートを作成します。
このトピックの例をPythonワークシートで使用する場合は、ハンドラー関数(例: main
)内で例を使用し、関数に渡される Session
オブジェクトを使用して DataFrames を作成します。
たとえば、 session
オブジェクトの table
メソッドを呼び出して、テーブルの DataFrame を作成します。
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
def main(session: snowpark.Session):
df_table = session.table("sample_product_data")
DataFrame オブジェクトの show
メソッドを呼び出すなど、関数によって生成された出力を確認するには、 Output タブを使用します。
関数によって返された値を調べるには、 Settings » Return type から戻り値のデータ型を選択し、 Results タブを使用します。
関数が DataFrame を返す場合は、 Table のデフォルトの戻り値の型を使用してください。
関数が DataFrame オブジェクトの
collect
メソッドからRow
のlist
を返す場合は、戻り値の型に Variant を使用します。関数が文字列にキャストできるその他の値を返す場合、または関数が値を返さない場合は、戻り値の型として String を使用します。
詳細については、 Pythonワークシートの実行 をご参照ください。
DataFrame の構築¶
DataFrameを作成するには、 Session
クラスのメソッドとプロパティを使用できます。次の各メソッドは、異なるタイプのデータソースから DataFrame を作成します。
これらの例は、ローカルの開発環境で実行するか、 Pythonワークシート で定義された main
関数内で呼び出すことができます。
テーブル、ビュー、またはストリームのデータから DataFrame を作成するには、
table
メソッドを呼び出します。>>> # Create a DataFrame from the data in the "sample_product_data" table. >>> df_table = session.table("sample_product_data") # To print out the first 10 rows, call df_table.show()
指定された値から DataFrame を作成するには、
create_dataframe
メソッドを呼び出します。>>> # Create a DataFrame with one column named a from specified values. >>> df1 = session.create_dataframe([1, 2, 3, 4]).to_df("a") >>> df1.show() >>> # To return the DataFrame as a table in a Python worksheet use return instead of show() >>> # return df1 ------- |"A" | ------- |1 | |2 | |3 | |4 | -------
4列 "a"、"b"、"c"、"d" を持つ DataFrame を作成します。
>>> # Create a DataFrame with 4 columns, "a", "b", "c" and "d". >>> df2 = session.create_dataframe([[1, 2, 3, 4]], schema=["a", "b", "c", "d"]) >>> df2.show() >>> # To return the DataFrame as a table in a Python worksheet use return instead of show() >>> # return df2 ------------------------- |"A" |"B" |"C" |"D" | ------------------------- |1 |2 |3 |4 | -------------------------
4列 "a"、"b"、"c"、"d" を持つ別の DataFrame を作成します。
>>> # Create another DataFrame with 4 columns, "a", "b", "c" and "d". >>> from snowflake.snowpark import Row >>> df3 = session.create_dataframe([Row(a=1, b=2, c=3, d=4)]) >>> df3.show() >>> # To return the DataFrame as a table in a Python worksheet use return instead of show() >>> # return df3 ------------------------- |"A" |"B" |"C" |"D" | ------------------------- |1 |2 |3 |4 | -------------------------
DataFrame を作成し、スキーマを指定します。
>>> # Create a DataFrame and specify a schema >>> from snowflake.snowpark.types import IntegerType, StringType, StructType, StructField >>> schema = StructType([StructField("a", IntegerType()), StructField("b", StringType())]) >>> df4 = session.create_dataframe([[1, "snow"], [3, "flake"]], schema) >>> df4.show() >>> # To return the DataFrame as a table in a Python worksheet use return instead of show() >>> # return df4 --------------- |"A" |"B" | --------------- |1 |snow | |3 |flake | ---------------
値の範囲を含む DataFrame を作成するには、
range
メソッドを呼び出します。>>> # Create a DataFrame from a range >>> # The DataFrame contains rows with values 1, 3, 5, 7, and 9 respectively. >>> df_range = session.range(1, 10, 2).to_df("a") >>> df_range.show() >>> # To return the DataFrame as a table in a Python worksheet use return instead of show() >>> # return df_range ------- |"A" | ------- |1 | |3 | |5 | |7 | |9 | -------
ステージ内のファイルからのデータを保持するDataFrameを作成するには、
read
プロパティを使用して、DataFrameReader
オブジェクトを取得します。DataFrameReader
オブジェクトで、ファイル内のデータの形式に対応するメソッドを呼び出します。>>> from snowflake.snowpark.types import StructType, StructField, StringType, IntegerType >>> # Create DataFrames from data in a stage. >>> df_json = session.read.json("@my_stage2/data1.json") >>> df_catalog = session.read.schema(StructType([StructField("name", StringType()), StructField("age", IntegerType())])).csv("@stage/some_dir")
SQL クエリの結果を保持する DataFrame を作成するには、
sql
メソッドを呼び出します。>>> # Create a DataFrame from a SQL query >>> df_sql = session.sql("SELECT name from sample_product_data") >>> df_sql.show() >>> # To return the DataFrame as a table in a Python worksheet use return instead of show() >>> # return df_sql -------------- |"NAME" | -------------- |Product 1 | |Product 1A | |Product 1B | |Product 2 | |Product 2A | |Product 2B | |Product 3 | |Product 3A | |Product 3B | |Product 4 | --------------
sql
メソッドを使用して、テーブルおよびステージングされたファイルからデータを取得する SELECT ステートメントを実行することは可能ですが、table
メソッドとread
プロパティを使用すると、開発ツールでより優れた構文の強調表示、エラーの強調表示、インテリジェントなコード補完が提供されます。
データセットの変換方法の指定¶
選択する列と、結果のフィルタリング、並べ替え、グループ化などの方法を指定するには、データセットを変換する DataFrame メソッドを呼び出します。これらのメソッドで列を識別するには、 col
関数または列に評価される式を使用します。 列と式の指定 をご参照ください。
例:
返される行を指定するには、
filter
メソッドを呼び出します。>>> # Import the col function from the functions module. >>> # Python worksheets import this function by default >>> from snowflake.snowpark.functions import col >>> # Create a DataFrame for the rows with the ID 1 >>> # in the "sample_product_data" table. >>> # This example uses the == operator of the Column object to perform an >>> # equality check. >>> df = session.table("sample_product_data").filter(col("id") == 1) >>> df.show() >>> # To return the DataFrame as a table in a Python worksheet use return instead of show() >>> # return df ------------------------------------------------------------------------------------ |"ID" |"PARENT_ID" |"CATEGORY_ID" |"NAME" |"SERIAL_NUMBER" |"KEY" |"3rd" | ------------------------------------------------------------------------------------ |1 |0 |5 |Product 1 |prod-1 |1 |10 | ------------------------------------------------------------------------------------
選択する列を指定するには、
select
メソッドを呼び出します。>>> # Import the col function from the functions module. >>> from snowflake.snowpark.functions import col >>> # Create a DataFrame that contains the id, name, and serial_number >>> # columns in the "sample_product_data" table. >>> df = session.table("sample_product_data").select(col("id"), col("name"), col("serial_number")) >>> df.show() >>> # To return the DataFrame as a table in a Python worksheet use return instead of show() >>> # return df --------------------------------------- |"ID" |"NAME" |"SERIAL_NUMBER" | --------------------------------------- |1 |Product 1 |prod-1 | |2 |Product 1A |prod-1-A | |3 |Product 1B |prod-1-B | |4 |Product 2 |prod-2 | |5 |Product 2A |prod-2-A | |6 |Product 2B |prod-2-B | |7 |Product 3 |prod-3 | |8 |Product 3A |prod-3-A | |9 |Product 3B |prod-3-B | |10 |Product 4 |prod-4 | ---------------------------------------
次のような列を参照することもできます。
>>> # Import the col function from the functions module. >>> from snowflake.snowpark.functions import col >>> df_product_info = session.table("sample_product_data") >>> df1 = df_product_info.select(df_product_info["id"], df_product_info["name"], df_product_info["serial_number"]) >>> df2 = df_product_info.select(df_product_info.id, df_product_info.name, df_product_info.serial_number) >>> df3 = df_product_info.select("id", "name", "serial_number")
各メソッドは、変換された新しい DataFrame オブジェクトを返します。このメソッドは元の DataFrame オブジェクトには影響を及ぼしません。複数の変換を適用する場合は、 チェーンメソッド呼び出し を実行して、前のメソッド呼び出しによって返された、新しい DataFrame オブジェクトに対する後続の各変換メソッドを呼び出すことができます。
これらの変換メソッドは、SQL ステートメントの作成方法を指定し、Snowflakeデータベースからデータを取得しません。 DataFrame を評価するアクションの実行 で説明されているアクションメソッドは、データ取得を実行します。
DataFrames の結合¶
DataFrame オブジェクトを結合するには、 join
メソッドを呼び出します。
>>> # Create two DataFrames to join
>>> df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
>>> df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
>>> # Create a DataFrame that joins the two DataFrames
>>> # on the column named "key".
>>> df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs["key"].as_("key"), "value1", "value2").show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs["key"].as_("key"), "value1", "value2")
-------------------------------
|"KEY" |"VALUE1" |"VALUE2" |
-------------------------------
|a |1 |3 |
|b |2 |4 |
-------------------------------
両方の DataFrames に結合する同じ列がある場合は、次の構文例を使用できます。
>>> # Create two DataFrames to join
>>> df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
>>> df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
>>> # If both dataframes have the same column "key", the following is more convenient.
>>> df_lhs.join(df_rhs, ["key"]).show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df_lhs.join(df_rhs, ["key"])
-------------------------------
|"KEY" |"VALUE1" |"VALUE2" |
-------------------------------
|a |1 |3 |
|b |2 |4 |
-------------------------------
& 演算子を使用して結合式を接続することもできます。
>>> # Create two DataFrames to join
>>> df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
>>> df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
>>> # Use & operator connect join expression. '|' and ~ are similar.
>>> df_joined_multi_column = df_lhs.join(df_rhs, (df_lhs.col("key") == df_rhs.col("key")) & (df_lhs.col("value1") < df_rhs.col("value2"))).select(df_lhs["key"].as_("key"), "value1", "value2")
>>> df_joined_multi_column.show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df_joined_multi_column
-------------------------------
|"KEY" |"VALUE1" |"VALUE2" |
-------------------------------
|a |1 |3 |
|b |2 |4 |
-------------------------------
自己結合を実行する場合は、DataFrame をコピーする必要があります。
>>> # copy the DataFrame if you want to do a self-join
>>> from copy import copy
>>> # Create two DataFrames to join
>>> df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
>>> df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
>>> df_lhs_copied = copy(df_lhs)
>>> df_self_joined = df_lhs.join(df_lhs_copied, (df_lhs.col("key") == df_lhs_copied.col("key")) & (df_lhs.col("value1") == df_lhs_copied.col("value1")))
DataFrames に重複する列がある場合、Snowparkはランダムに生成されたプレフィックスを結合結果の列に追加します。
>>> # Create two DataFrames to join
>>> df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
>>> df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
>>> df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key"))
-----------------------------------------------------
|"l_av5t_KEY" |"VALUE1" |"r_1p6k_KEY" |"VALUE2" |
-----------------------------------------------------
|a |1 |a |3 |
|b |2 |b |4 |
-----------------------------------------------------
Column.alias
を使用して、重複する列の名前を変更できます。
>>> # Create two DataFrames to join
>>> df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
>>> df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
>>> df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs["key"].alias("key1"), df_rhs["key"].alias("key2"), "value1", "value2").show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs["key"].alias("key1"), df_rhs["key"].alias("key2"), "value1", "value2")
-----------------------------------------
|"KEY1" |"KEY2" |"VALUE1" |"VALUE2" |
-----------------------------------------
|a |a |1 |3 |
|b |b |2 |4 |
-----------------------------------------
ランダムなプレフィックスを避けるために、重複する列に追加するサフィックスを指定することもできます。
>>> # Create two DataFrames to join
>>> df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
>>> df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
>>> df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key"), lsuffix="_left", rsuffix="_right").show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key"), lsuffix="_left", rsuffix="_right")
--------------------------------------------------
|"KEY_LEFT" |"VALUE1" |"KEY_RIGHT" |"VALUE2" |
--------------------------------------------------
|a |1 |a |3 |
|b |2 |b |4 |
--------------------------------------------------
これらの例では、 DataFrame.col
を使用して、結合で使用する列を指定しています。列を指定するその他の方法については、 列と式の指定 をご参照ください。
異なる列でテーブルをそれ自体と結合する必要がある場合は、単一の DataFrame で自己結合を実行することはできません。次の例は単一の DataFrame を使用して自己結合を実行し、 "id"
の列式が結合の左側と右側に存在するため、失敗します。
>>> from snowflake.snowpark.exceptions import SnowparkJoinException
>>> df = session.table("sample_product_data")
>>> # This fails because columns named "id" and "parent_id"
>>> # are in the left and right DataFrames in the join.
>>> try:
... df_joined = df.join(df, col("id") == col("parent_id")) # fails
... except SnowparkJoinException as e:
... print(e.message)
You cannot join a DataFrame with itself because the column references cannot be resolved correctly. Instead, create a copy of the DataFrame with copy.copy(), and join the DataFrame with this copy.
>>> # This fails because columns named "id" and "parent_id"
>>> # are in the left and right DataFrames in the join.
>>> try:
... df_joined = df.join(df, df["id"] == df["parent_id"]) # fails
... except SnowparkJoinException as e:
... print(e.message)
You cannot join a DataFrame with itself because the column references cannot be resolved correctly. Instead, create a copy of the DataFrame with copy.copy(), and join the DataFrame with this copy.
代わりに、Pythonのビルトイン copy()
メソッドを使用してDataFrameオブジェクトのクローンを作成し、2つのDataFrameオブジェクトを使用して結合を実行します。
>>> from copy import copy
>>> # Create a DataFrame object for the "sample_product_data" table for the left-hand side of the join.
>>> df_lhs = session.table("sample_product_data")
>>> # Clone the DataFrame object to use as the right-hand side of the join.
>>> df_rhs = copy(df_lhs)
>>> # Create a DataFrame that joins the two DataFrames
>>> # for the "sample_product_data" table on the
>>> # "id" and "parent_id" columns.
>>> df_joined = df_lhs.join(df_rhs, df_lhs.col("id") == df_rhs.col("parent_id"))
>>> df_joined.count()
8
列と式の指定¶
これらの変換メソッドを呼び出すときは、列または列を使用する式を指定する必要がある場合があります。たとえば、 select
メソッドを呼び出すときは、選択する必要のある列を指定する必要があります。
列を参照するには、 snowflake.snowpark.functions
モジュールで col
関数を呼び出して、 Column
オブジェクトを作成します。
>>> # Import the col function from the functions module.
>>> from snowflake.snowpark.functions import col
>>> df_product_info = session.table("sample_product_data").select(col("id"), col("name"))
>>> df_product_info.show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df_product_info
---------------------
|"ID" |"NAME" |
---------------------
|1 |Product 1 |
|2 |Product 1A |
|3 |Product 1B |
|4 |Product 2 |
|5 |Product 2A |
|6 |Product 2B |
|7 |Product 3 |
|8 |Product 3A |
|9 |Product 3B |
|10 |Product 4 |
---------------------
注釈
リテラルの Column
オブジェクトを作成するには、 列オブジェクトとしてのリテラルの使用 をご参照ください。
フィルター、プロジェクション、結合条件などを指定する場合は、式で Column
オブジェクトを使用できます。例:
filter
メソッドでColumn
オブジェクトを使用して、フィルター条件を指定できます。>>> # Specify the equivalent of "WHERE id = 20" >>> # in a SQL SELECT statement. >>> df_filtered = df.filter(col("id") == 20)
>>> df = session.create_dataframe([[1, 3], [2, 10]], schema=["a", "b"]) >>> # Specify the equivalent of "WHERE a + b < 10" >>> # in a SQL SELECT statement. >>> df_filtered = df.filter((col("a") + col("b")) < 10) >>> df_filtered.show() >>> # To return the DataFrame as a table in a Python worksheet use return instead of show() >>> # return df_filtered ------------- |"A" |"B" | ------------- |1 |3 | -------------
select
メソッドでColumn
オブジェクトを使用して、エイリアスを定義できます。>>> df = session.create_dataframe([[1, 3], [2, 10]], schema=["a", "b"]) >>> # Specify the equivalent of "SELECT b * 10 AS c" >>> # in a SQL SELECT statement. >>> df_selected = df.select((col("b") * 10).as_("c")) >>> df_selected.show() >>> # To return the DataFrame as a table in a Python worksheet use return instead of show() >>> # return df_selected ------- |"C" | ------- |30 | |100 | -------
join
メソッドでColumn
オブジェクトを使用して、結合条件を定義できます。>>> dfX = session.create_dataframe([[1], [2]], schema=["a_in_X"]) >>> dfY = session.create_dataframe([[1], [3]], schema=["b_in_Y"]) >>> # Specify the equivalent of "X JOIN Y on X.a_in_X = Y.b_in_Y" >>> # in a SQL SELECT statement. >>> df_joined = dfX.join(dfY, col("a_in_X") == col("b_in_Y")).select(dfX["a_in_X"].alias("the_joined_column")) >>> df_joined.show() >>> # To return the DataFrame as a table in a Python worksheet use return instead of show() >>> # return df_joined ----------------------- |"THE_JOINED_COLUMN" | ----------------------- |1 | -----------------------
同じ名前の2つの異なる DataFrame オブジェクトの列を参照する場合(例: その列の DataFrames を結合する場合)、1つの DataFrame オブジェクトにある DataFrame.col
メソッドを使用して、そのオブジェクトにある列(例: df1.col("name")
と df2.col("name")
)を参照できます。
次の例は、 DataFrame.col
メソッドを使用して特定の DataFrame の列を参照する方法を示しています。この例では、両方とも key
という名前の列を持つ2つの DataFrame オブジェクトを結合します。この例では、 Column.as
メソッドを使用して、新しく作成された DataFrame の列の名前を変更します。
>>> # Create two DataFrames to join
>>> df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value"])
>>> df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value"])
>>> # Create a DataFrame that joins two other DataFrames (df_lhs and df_rhs).
>>> # Use the DataFrame.col method to refer to the columns used in the join.
>>> df_joined = df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs.col("key").as_("key"), df_lhs.col("value").as_("L"), df_rhs.col("value").as_("R"))
>>> df_joined.show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df_joined
---------------------
|"KEY" |"L" |"R" |
---------------------
|a |1 |3 |
|b |2 |4 |
---------------------
オブジェクト識別子(テーブル名、列名など)の前後での二重引用符の使用¶
指定するデータベース、スキーマ、テーブル、およびステージの名前は、 Snowflake識別子の要件 に準拠している必要があります。
大文字と小文字を区別する列を持つテーブルを作成します。
>>> session.sql("""
... create or replace temp table "10tablename"(
... id123 varchar, -- case insensitive because it's not quoted.
... "3rdID" varchar, -- case sensitive.
... "id with space" varchar -- case sensitive.
... )""").collect()
>>> # Add return to the statement to return the collect() results in a Python worksheet
[Row(status='Table 10tablename successfully created.')]
次に、テーブルに値を追加します。
>>> session.sql("""insert into "10tablename" (id123, "3rdID", "id with space") values ('a', 'b', 'c')""").collect()
>>> # Add return to the statement to return the collect() results in a Python worksheet
[Row(number of rows inserted=1)]
次に、テーブルの DataFrame を作成し、テーブルに対してクエリを実行します。
>>> df = session.table('"10tablename"')
>>> df.show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df
---------------------------------------
|"ID123" |"3rdID" |"id with space" |
---------------------------------------
|a |b |c |
---------------------------------------
名前を指定すると、Snowflakeはその名前を大文字と見なします。たとえば、次の呼び出しは同等です。
>>> df.select(col("id123")).collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(ID123='a')]
>>> df.select(col("ID123")).collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(ID123='a')]
名前が識別子の要件に準拠していない場合は、名前を二重引用符("
)で囲む必要があります。バックスラッシュ(\
)を使用して、文字列リテラル内の二重引用符をエスケープします。たとえば、次のテーブル名は文字やアンダースコアで始まらないため、名前を二重引用符で囲む必要があります。
>>> df = session.table("\"10tablename\"")
または、バックスラッシュの代わりに一重引用符を使用して、文字列リテラル内の二重引用符をエスケープすることもできます。
>>> df = session.table('"10tablename"')
列 の名前を指定するときは、名前を二重引用符で囲む必要はありません。名前が識別子の要件に準拠していない場合、Snowparkライブラリは列名を自動的に二重引用符で囲みます。
>>> df.select(col("3rdID")).collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(3rdID='b')]
>>> df.select(col("\"3rdID\"")).collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(3rdID='b')]
別の例として、次の呼び出しは同等です。
>>> df.select(col("id with space")).collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(id with space='c')]
>>> df.select(col("\"id with space\"")).collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(id with space='c')]
すでに列名を二重引用符で囲んでいる場合、ライブラリは名前を二重引用符で囲みません。
場合によっては、列名に二重引用符が含まれることがあります。
>>> session.sql('''
... create or replace temp table quoted(
... "name_with_""air""_quotes" varchar,
... """column_name_quoted""" varchar
... )''').collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(status='Table QUOTED successfully created.')]
>>> session.sql('''insert into quoted ("name_with_""air""_quotes", """column_name_quoted""") values ('a', 'b')''').collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(number of rows inserted=1)]
識別子の要件 で説明されているように、二重引用符で囲まれた識別子内の二重引用符文字ごとに、2つの二重引用符文字(例: "name_with_""air""_quotes"
と """column_name_quoted"""
)を使用する必要があります。
>>> df_table = session.table("quoted")
>>> df_table.select("\"name_with_\"\"air\"\"_quotes\"").collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(name_with_"air"_quotes='a')]
>>> df_table.select("\"\"\"column_name_quoted\"\"\"").collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row("column_name_quoted"='b')]
識別子が二重引用符で囲まれている場合は(明示的に引用符を追加したか、ライブラリが引用符を追加したかに関係なく)、 Snowflakeは識別子を大文字と小文字を区別する ものとして扱います。
>>> # The following calls are NOT equivalent!
>>> # The Snowpark library adds double quotes around the column name,
>>> # which makes Snowflake treat the column name as case-sensitive.
>>> df.select(col("id with space")).collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(id with space='c')]
この例と比較すると:
>>> from snowflake.snowpark.exceptions import SnowparkSQLException
>>> try:
... df.select(col("ID WITH SPACE")).collect()
... except SnowparkSQLException as e:
... print(e.message)
000904 (42000): SQL compilation error: error line 1 at position 7
invalid identifier '"ID WITH SPACE"'
列オブジェクトとしてのリテラルの使用¶
Column
オブジェクトを引数として取るメソッドでリテラルを使用するには、リテラルを snowflake.snowpark.functions
モジュールの lit
関数に渡して、リテラルの Column
オブジェクトを作成します。例:
>>> # Import for the lit and col functions.
>>> from snowflake.snowpark.functions import col, lit
>>> # Show the first 10 rows in which num_items is greater than 5.
>>> # Use `lit(5)` to create a Column object for the literal 5.
>>> df_filtered = df.filter(col("num_items") > lit(5))
列オブジェクトの特定の型へのキャスト¶
Column
オブジェクトを特定の型にキャストするには、 cast
メソッドを呼び出し、 snowflake.snowpark.types
モジュールから型オブジェクトを渡します。たとえば、リテラルを5の精度と2のスケールで NUMBER としてキャストするには、次のようにします。
>>> # Import for the lit function.
>>> from snowflake.snowpark.functions import lit
>>> # Import for the DecimalType class.
>>> from snowflake.snowpark.types import DecimalType
>>> decimal_value = lit(0.05).cast(DecimalType(5,2))
メソッド呼び出しのチェーン¶
DataFrame オブジェクトを変換するメソッド ごとに、変換が適用された新しい DataFrame オブジェクトが返されるため、 メソッド呼び出しのチェーン により、追加の方法で変換される新しい DataFrame を生成できます。
次の例では、次のように構成された DataFrame を返します。
sample_product_data
テーブルをクエリします。id = 1
で行を返します。name
およびserial_number
列を選択します。>>> df_product_info = session.table("sample_product_data").filter(col("id") == 1).select(col("name"), col("serial_number")) >>> df_product_info.show() >>> # To return the DataFrame as a table in a Python worksheet use return instead of show() >>> # return df_product_info ------------------------------- |"NAME" |"SERIAL_NUMBER" | ------------------------------- |Product 1 |prod-1 | -------------------------------
この例では、
session.table("sample_product_data")
は、sample_product_data
テーブルの DataFrame を返します。DataFrame にはまだテーブルのデータが含まれていませんが、オブジェクトにはテーブルの列の定義が含まれています。
filter(col("id") == 1)
は、id = 1
で行を返すように設定されたsample_product_data
テーブルの DataFrame を返します。DataFrame には、テーブルからの一致する行がまだ含まれていないことに注意してください。 アクションメソッドを呼び出す まで、一致する行は取得されません。
select(col("name"), col("serial_number"))
は、id = 1
を持つsample_product_data
テーブルの行のname
列とserial_number
列を含む DataFrame を返します。
メソッド呼び出しをチェーンするときは、呼び出しの順序が重要です。各メソッド呼び出しは、変換された DataFrame を返します。後続の呼び出しが変換された DataFrame で機能することを確認します。
たとえば、次のコードでは、 select
メソッドは name
と serial_number
の2つの列のみを含む DataFrame を返します。この DataFrame の filter
メソッド呼び出しは、変換された DataFrame にない id
列を使用しているため失敗します。
>>> # This fails with the error "invalid identifier 'ID'."
>>> df_product_info = session.table("sample_product_data").select(col("name"), col("serial_number")).filter(col("id") == 1)
>>> try:
... df_product_info.show()
... except SnowparkSQLException as e:
... print(e.message)
000904 (42000): SQL compilation error: error line 1 at position 121
invalid identifier 'ID'
対照的に、次のコードは、 id
列など、 sample_product_data
テーブルのすべての列を含む DataFrame で filter()
メソッドが呼び出されるため、正常に実行されます。
>>> # This succeeds because the DataFrame returned by the table() method
>>> # includes the "id" column.
>>> df_product_info = session.table("sample_product_data").filter(col("id") == 1).select(col("name"), col("serial_number"))
>>> df_product_info.show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df_product_info
-------------------------------
|"NAME" |"SERIAL_NUMBER" |
-------------------------------
|Product 1 |prod-1 |
-------------------------------
Snowpark Pythonを使用すると、SQL ステートメントで同等のキーワード(SELECT および WHERE)を使用する場合とは異なる順序で、 select
および filter
メソッドの呼び出しが必要になる場合があります。
列定義の取得¶
DataFrameのためにデータセット内の列の定義を取得するには、 schema
プロパティを呼び出します。このメソッドは、 StructField
オブジェクトの list
を含む StructType
オブジェクトを返します。各 StructField
オブジェクトには、列の定義が含まれています。
# Import the StructType
from snowflake.snowpark.types import *
# Get the StructType object that describes the columns in the
# underlying rowset.
table_schema = session.table("sample_product_data").schema
table_schema
StructType([StructField('ID', LongType(), nullable=True), StructField('PARENT_ID', LongType(), nullable=True), StructField('CATEGORY_ID', LongType(), nullable=True), StructField('NAME', StringType(), nullable=True), StructField('SERIAL_NUMBER', StringType(), nullable=True), StructField('KEY', LongType(), nullable=True), StructField('"3rd"', LongType(), nullable=True)])
返された StructType
オブジェクトでは、列名は常に正規化されています。引用符で囲まれていない識別子は大文字で返され、引用符で囲まれた識別子は定義された正確な大文字小文字で返されます。
次の例では、 ID
および 3rd
という名前の列を含む DataFrame を作成します。列名 3rd
の場合は、 名前が識別子の要件に準拠していない ため、Snowparkライブラリは名前を自動的に二重引用符("3rd"
)で囲みます。
この例では、 schema
プロパティを呼び出してから、返された StructType
オブジェクトで names
プロパティを呼び出して、列名の list
を取得します。名前は、 schema
プロパティによって返される StructType
で正規化されます。
>>> # Create a DataFrame containing the "id" and "3rd" columns.
>>> df_selected_columns = session.table("sample_product_data").select(col("id"), col("3rd"))
>>> # Print out the names of the columns in the schema. This prints out:
>>> # This prints List["ID", "\"3rd\""]
>>> df_selected_columns.schema.names
['ID', '"3rd"']
DataFrame を評価するアクションの実行¶
前述のように、 DataFrame は遅延評価されます。つまり、アクションを実行するまで、 SQL ステートメントは実行のためにサーバーに送信されません。アクションにより、 DataFrame が評価され、対応する SQL ステートメントが実行のためにサーバーに送信されます。
このリリースでは、次のメソッドがアクションを実行します。
クラス |
メソッド |
説明 |
---|---|---|
|
|
DataFrame を評価し、結果のデータセットを |
|
|
DataFrame を評価し、行数を返します。 |
|
|
DataFrame を評価し、行をコンソールに出力します。このメソッドでは行数が10に制限されます(デフォルト)。 |
|
|
DataFrame のデータを指定したテーブルに保存します。 テーブルへのデータの保存 をご参照ください。 |
たとえば、テーブルに対してクエリを実行して結果を返すには、 collect
メソッドを呼び出します。
>>> # Create a DataFrame with the "id" and "name" columns from the "sample_product_data" table.
>>> # This does not execute the query.
>>> df = session.table("sample_product_data").select(col("id"), col("name"))
>>> # Send the query to the server for execution and
>>> # return a list of Rows containing the results.
>>> results = df.collect()
>>> # Use a return statement to return the collect() results in a Python worksheet
>>> # return results
クエリを実行して結果の数を返すには、 count
メソッドを呼び出します。
>>> # Create a DataFrame for the "sample_product_data" table.
>>> df_products = session.table("sample_product_data")
>>> # Send the query to the server for execution and
>>> # print the count of rows in the table.
>>> print(df_products.count())
12
クエリを実行して結果をコンソールに出力するには、 show
メソッドを呼び出します。
>>> # Create a DataFrame for the "sample_product_data" table.
>>> df_products = session.table("sample_product_data")
>>> # Send the query to the server for execution and
>>> # print the results to the console.
>>> # The query limits the number of rows to 10 by default.
>>> df_products.show()
>>> # To return the DataFrame as a table in a Python worksheet use return instead of show()
>>> # return df_products
-------------------------------------------------------------------------------------
|"ID" |"PARENT_ID" |"CATEGORY_ID" |"NAME" |"SERIAL_NUMBER" |"KEY" |"3rd" |
-------------------------------------------------------------------------------------
|1 |0 |5 |Product 1 |prod-1 |1 |10 |
|2 |1 |5 |Product 1A |prod-1-A |1 |20 |
|3 |1 |5 |Product 1B |prod-1-B |1 |30 |
|4 |0 |10 |Product 2 |prod-2 |2 |40 |
|5 |4 |10 |Product 2A |prod-2-A |2 |50 |
|6 |4 |10 |Product 2B |prod-2-B |2 |60 |
|7 |0 |20 |Product 3 |prod-3 |3 |70 |
|8 |7 |20 |Product 3A |prod-3-A |3 |80 |
|9 |7 |20 |Product 3B |prod-3-B |3 |90 |
|10 |0 |50 |Product 4 |prod-4 |4 |100 |
-------------------------------------------------------------------------------------
行数を20に制限するには:
>>> # Create a DataFrame for the "sample_product_data" table.
>>> df_products = session.table("sample_product_data")
>>> # Limit the number of rows to 20, rather than 10.
>>> df_products.show(20)
>>> # All rows are returned when you use return in a Python worksheet to return the DataFrame as a table
>>> # return df_products
-------------------------------------------------------------------------------------
|"ID" |"PARENT_ID" |"CATEGORY_ID" |"NAME" |"SERIAL_NUMBER" |"KEY" |"3rd" |
-------------------------------------------------------------------------------------
|1 |0 |5 |Product 1 |prod-1 |1 |10 |
|2 |1 |5 |Product 1A |prod-1-A |1 |20 |
|3 |1 |5 |Product 1B |prod-1-B |1 |30 |
|4 |0 |10 |Product 2 |prod-2 |2 |40 |
|5 |4 |10 |Product 2A |prod-2-A |2 |50 |
|6 |4 |10 |Product 2B |prod-2-B |2 |60 |
|7 |0 |20 |Product 3 |prod-3 |3 |70 |
|8 |7 |20 |Product 3A |prod-3-A |3 |80 |
|9 |7 |20 |Product 3B |prod-3-B |3 |90 |
|10 |0 |50 |Product 4 |prod-4 |4 |100 |
|11 |10 |50 |Product 4A |prod-4-A |4 |100 |
|12 |10 |50 |Product 4B |prod-4-B |4 |100 |
-------------------------------------------------------------------------------------
注釈
DataFrame の列の定義を取得するために schema
プロパティを呼び出す場合は、アクションメソッドを呼び出す必要はありません。
DataFrameのコンテンツをPandas DataFrameとして返す¶
DataFrameのコンテンツをPandas DataFrameとして返すには、 to_pandas
メソッドを使用します。
例:
>>> python_df = session.create_dataframe(["a", "b", "c"])
>>> pandas_df = python_df.to_pandas()
テーブルへのデータの保存¶
DataFrame の内容をテーブルに保存するには、
write
プロパティを呼び出して、DataFrameWriter
オブジェクトを取得します。DataFrameWriter
オブジェクトのmode
メソッドを呼び出し、モードを指定します。詳細については、 API のドキュメント をご参照ください。このメソッドは、指定されたモードで構成された新しいDataFrameWriter
オブジェクトを返します。DataFrameWriter
オブジェクトのsave_as_table
メソッドを呼び出し、 DataFrame の内容を指定されたテーブルに保存します。
データをテーブルに保存する SQL ステートメントを実行するために、別のメソッド(例: collect
)を呼び出す必要はありません。
例:
>>> df.write.mode("overwrite").save_as_table("table1")
DataFrame からのビューの作成¶
DataFrameからビューを作成するには、 create_or_replace_view
メソッドを呼び出します。これにより新しいビューがすぐに作成されます。
>>> import os
>>> database = os.environ["snowflake_database"] # use your own database and schema
>>> schema = os.environ["snowflake_schema"]
>>> view_name = "my_view"
>>> df.create_or_replace_view(f"{database}.{schema}.{view_name}")
[Row(status='View MY_VIEW successfully created.')]
Pythonワークシートでは、データベースとスキーマのコンテキストでワークシートを実行するため、次を実行してビューを作成できます。
# Define a DataFrame
df_products = session.table("sample_product_data")
# Define a View name
view_name = "my_view"
# Create the view
df_products.create_or_replace_view(f"{view_name}")
# return the view name
return view_name + " successfully created"
my_view successfully created
create_or_replace_view
を呼び出して作成したビューは永続的です。そのビューが不要になった場合は、 ビューを手動でドロップ できます。
または、仮ビューを作成する create_or_replace_temp_view
メソッドを使用します。仮ビューは、それが作成されたセッションでのみ使用できます。
ステージでのファイルの操作¶
このセクションでは、Snowflakeステージでファイル内のデータをクエリする方法について説明します。ファイルに対するその他の操作には、 SQL ステートメント を使用します。
Snowflakeステージのファイル内のデータをクエリするには、 DataFrameReader
クラスを使用します。
Session
クラスのread
メソッドを呼び出して、DataFrameReader
オブジェクトにアクセスします。ファイルが CSV 形式の場合は、ファイルのフィールドを記述します。これを実行するには、
ファイル内のフィールドを説明する
StructType
オブジェクトのlist
で構成されるStructField
オブジェクトを作成します。StructField
オブジェクトごとに、以下を指定します。フィールドの名前。
フィールドのデータ型(
snowflake.snowpark.types
モジュールでオブジェクトとして指定)。フィールドがNULL可能かどうか。
例:
>>> from snowflake.snowpark.types import * >>> schema_for_data_file = StructType([ ... StructField("id", StringType()), ... StructField("name", StringType()) ... ])
DataFrameReader
オブジェクトでschema
プロパティを呼び出し、StructType
オブジェクトを渡します。例:
>>> df_reader = session.read.schema(schema_for_data_file)
schema
プロパティは、指定されたフィールドを含むファイルを読み取るように構成されたDataFrameReader
オブジェクトを返します。他の形式(JSON など)のファイルでは、これを実行する必要はありません。これらのファイルの場合、
DataFrameReader
は、データをフィールド名$1
の VARIANT 型の単一フィールドとして扱います。
データの読み取り方法に関する追加情報を指定する必要がある(例: データが圧縮されている、またはCSVファイルがフィールドを区切るためにコンマではなくセミコロンを使用している)場合は、
DataFrameReader
オブジェクトのoption
またはoptions
メソッドを呼び出します。option
メソッドは、設定するオプションの名前と値をとり、複数のチェーン呼び出しを組み合わせることができますが、options
メソッドは、オプションの名前とそれに対応する値のディクショナリをとります。ファイル形式オプションの名前と値については、 CREATE FILE FORMAT のドキュメント をご参照ください。
COPY INTO TABLE ドキュメント で説明されているコピーオプションを設定することもできます。コピーオプションを設定すると、 DataFrame にデータを取得 するときに、よりコストのかかる実行戦略の実行される可能性があることに注意してください。
次の例では、圧縮されておらず、フィールド区切り文字にセミコロンを使用する CSV ファイル内のデータをクエリするように、
DataFrameReader
オブジェクトを設定します。>>> df_reader = df_reader.option("field_delimiter", ";").option("COMPRESSION", "NONE")
option
およびoptions
メソッドは、指定されたオプションで構成されたDataFrameReader
オブジェクトを返します。ファイルの形式に対応するメソッド(例:
csv
メソッド)を呼び出し、ファイルの場所を渡します。>>> df = df_reader.csv("@s3_ts_stage/emails/data_0_0_0.csv")
ファイルの形式に対応するメソッドは、そのファイルにデータを保持するように構成された DataFrame オブジェクトを返します。
DataFrame オブジェクトメソッドを使用して、データセットで必要な 変換を実行 します(例: 特定のフィールドの選択、行のフィルタリングなど)。
たとえば、
my_stage
という名前のステージで JSON ファイルからcolor
要素を抽出するには、>>> # Import the sql_expr function from the functions module. >>> from snowflake.snowpark.functions import sql_expr >>> df = session.read.json("@my_stage").select(sql_expr("$1:color"))
前に説明したように、 CSV 以外の形式のファイル(例: JSON)の場合、
DataFrameReader
は、ファイル内のデータを$1
という名前の単一の VARIANT 列として扱います。この例では、
snowflake.snowpark.functions
モジュールのsql_expr
関数を使用して、color
要素へのパスを指定します。sql_expr
関数は入力引数を解釈または変更しないことに注意してください。この関数を使用すると、Snowpark API でまだサポートされていない式とスニペットを SQL で作成できます。アクションメソッドの呼び出し により、ファイル内のデータをクエリします。
テーブルの DataFrames の場合と同様、データは、アクションメソッドを呼び出すまで DataFrame に取得されません。
半構造化データの操作¶
DataFrame を使用すると、 半構造化データ (例: JSON データ)へのクエリとアクセスができます。次のセクションでは、 DataFrame 内の半構造化データの操作方法について説明します。
注釈
これらのセクションの例では、 例で使用されるサンプルデータ のサンプルデータを使用しています。
半構造化データの走査¶
半構造化データの特定のフィールドまたは要素を参照するには、 Column
オブジェクトの次のメソッドを使用します。
col_object["<フィールド名>"]
の属性を取得して、OBJECT (または OBJECT を含む VARIANT)のフィールドにColumn
オブジェクトを返します。col_object[<インデックス>]
を使用して、ARRAY (または ARRAY を含む VARIANT)の要素のColumn
オブジェクトを返します。
注釈
パス内のフィールド名または要素が不規則であり、上で説明したインデックスの使用が困難な場合は、 get
、 get_ignore_case
、または get_path
を代わりに使用できます。
たとえば、次のコードは、 サンプルデータ の src
列にあるオブジェクトの dealership
フィールドを選択します。
>>> from snowflake.snowpark.functions import col
>>> df = session.table("car_sales")
>>> df.select(col("src")["dealership"]).show()
このコードは、次を出力します。
----------------------------
|"""SRC""['DEALERSHIP']" |
----------------------------
|"Valley View Auto Sales" |
|"Tindel Toyota" |
----------------------------
注釈
DataFrame の値は文字列リテラルとして返されるため、二重引用符で囲まれています。これらの値を特定の型にキャストするには、 半構造化データへの明示的な値のキャスト をご参照ください。
メソッド呼び出しのチェーン を使用して、特定のフィールドまたは要素へのパスを走査することもできます。
たとえば、次のコードは salesperson
オブジェクトの name
フィールドを選択します。
>>> df = session.table("car_sales")
>>> df.select(df["src"]["salesperson"]["name"]).show()
このコードは、次を出力します。
------------------------------------
|"""SRC""['SALESPERSON']['NAME']" |
------------------------------------
|"Frank Beasley" |
|"Greg Northrup" |
------------------------------------
別の例として、次のコードは、車両の配列を保持する vehicle
フィールドの最初の要素を選択します。この例では、最初の要素から price
フィールドも選択しています。
>>> df = session.table("car_sales")
>>> df.select(df["src"]["vehicle"][0]).show()
>>> df.select(df["src"]["vehicle"][0]["price"]).show()
このコードは、次を出力します。
---------------------------
|"""SRC""['VEHICLE'][0]" |
---------------------------
|{ |
| "extras": [ |
| "ext warranty", |
| "paint protection" |
| ], |
| "make": "Honda", |
| "model": "Civic", |
| "price": "20275", |
| "year": "2017" |
|} |
|{ |
| "extras": [ |
| "ext warranty", |
| "rust proofing", |
| "fabric protection" |
| ], |
| "make": "Toyota", |
| "model": "Camry", |
| "price": "23500", |
| "year": "2017" |
|} |
---------------------------
------------------------------------
|"""SRC""['VEHICLE'][0]['PRICE']" |
------------------------------------
|"20275" |
|"23500" |
------------------------------------
前述の方法でフィールドにアクセスする代わりに、パス内のフィールド名または要素が不規則な場合は、 get
、 get_ignore_case
、または get_path
関数を使用できます。
たとえば、次のコード行は両方とも、オブジェクトの指定されたフィールドの値を出力します。
>>> from snowflake.snowpark.functions import get, get_path, lit
>>> df.select(get(col("src"), lit("dealership"))).show()
>>> df.select(col("src")["dealership"]).show()
同様に、次のコード行は両方とも、オブジェクトの指定されたパスにあるフィールドの値を出力します。
>>> df.select(get_path(col("src"), lit("vehicle[0].make"))).show()
>>> df.select(col("src")["vehicle"][0]["make"]).show()
半構造化データへの明示的な値のキャスト¶
デフォルトでは、上記の例に示すように、フィールドと要素の値は文字列リテラル(二重引用符を含む)として返されます。
予期しない結果を回避するには、 cast メソッドを呼び出して、値を特定の型にキャストします。たとえば、次のコードは、キャストなしとキャストありの値を出力します。
>>> # Import the objects for the data types, including StringType.
>>> from snowflake.snowpark.types import *
>>> df = session.table("car_sales")
>>> df.select(col("src")["salesperson"]["id"]).show()
>>> df.select(col("src")["salesperson"]["id"].cast(StringType())).show()
このコードは、次を出力します。
----------------------------------
|"""SRC""['SALESPERSON']['ID']" |
----------------------------------
|"55" |
|"274" |
----------------------------------
---------------------------------------------------
|"CAST (""SRC""['SALESPERSON']['ID'] AS STRING)" |
---------------------------------------------------
|55 |
|274 |
---------------------------------------------------
オブジェクト配列の行へのフラット化¶
半構造化データを DataFrame に「フラット化」する必要がある場合(例: 配列内にあるすべてのオブジェクトの行を生成する場合)は、 join_table_function
メソッドを使用して flatten
を呼び出します。このメソッドは、 FLATTEN SQL 関数と同等です。オブジェクトまたは配列へのパスを渡すと、メソッドは、各フィールドの行か、オブジェクトまたは配列の要素の行を含む DataFrame を返します。
たとえば、 サンプルデータ では、 src:customer
は顧客に関する情報を含むオブジェクトの配列です。各オブジェクトには、 name
および address
フィールドが含まれています。
このパスを flatten
関数に渡すと、次のようになります。
>>> df = session.table("car_sales")
>>> df.join_table_function("flatten", col("src")["customer"]).show()
メソッドは DataFrame を返します。
----------------------------------------------------------------------------------------------------------------------------------------------------------
|"SRC" |"SEQ" |"KEY" |"PATH" |"INDEX" |"VALUE" |"THIS" |
----------------------------------------------------------------------------------------------------------------------------------------------------------
|{ |1 |NULL |[0] |0 |{ |[ |
| "customer": [ | | | | | "address": "San Francisco, CA", | { |
| { | | | | | "name": "Joyce Ridgely", | "address": "San Francisco, CA", |
| "address": "San Francisco, CA", | | | | | "phone": "16504378889" | "name": "Joyce Ridgely", |
| "name": "Joyce Ridgely", | | | | |} | "phone": "16504378889" |
| "phone": "16504378889" | | | | | | } |
| } | | | | | |] |
| ], | | | | | | |
| "date": "2017-04-28", | | | | | | |
| "dealership": "Valley View Auto Sales", | | | | | | |
| "salesperson": { | | | | | | |
| "id": "55", | | | | | | |
| "name": "Frank Beasley" | | | | | | |
| }, | | | | | | |
| "vehicle": [ | | | | | | |
| { | | | | | | |
| "extras": [ | | | | | | |
| "ext warranty", | | | | | | |
| "paint protection" | | | | | | |
| ], | | | | | | |
| "make": "Honda", | | | | | | |
| "model": "Civic", | | | | | | |
| "price": "20275", | | | | | | |
| "year": "2017" | | | | | | |
| } | | | | | | |
| ] | | | | | | |
|} | | | | | | |
|{ |2 |NULL |[0] |0 |{ |[ |
| "customer": [ | | | | | "address": "New York, NY", | { |
| { | | | | | "name": "Bradley Greenbloom", | "address": "New York, NY", |
| "address": "New York, NY", | | | | | "phone": "12127593751" | "name": "Bradley Greenbloom", |
| "name": "Bradley Greenbloom", | | | | |} | "phone": "12127593751" |
| "phone": "12127593751" | | | | | | } |
| } | | | | | |] |
| ], | | | | | | |
| "date": "2017-04-28", | | | | | | |
| "dealership": "Tindel Toyota", | | | | | | |
| "salesperson": { | | | | | | |
| "id": "274", | | | | | | |
| "name": "Greg Northrup" | | | | | | |
| }, | | | | | | |
| "vehicle": [ | | | | | | |
| { | | | | | | |
| "extras": [ | | | | | | |
| "ext warranty", | | | | | | |
| "rust proofing", | | | | | | |
| "fabric protection" | | | | | | |
| ], | | | | | | |
| "make": "Toyota", | | | | | | |
| "model": "Camry", | | | | | | |
| "price": "23500", | | | | | | |
| "year": "2017" | | | | | | |
| } | | | | | | |
| ] | | | | | | |
|} | | | | | | |
----------------------------------------------------------------------------------------------------------------------------------------------------------
この DataFrame から、 VALUE
フィールドにある各オブジェクトからの name
フィールドと address
フィールドを選択できます。
>>> df.join_table_function("flatten", col("src")["customer"]).select(col("value")["name"], col("value")["address"]).show()
-------------------------------------------------
|"""VALUE""['NAME']" |"""VALUE""['ADDRESS']" |
-------------------------------------------------
|"Joyce Ridgely" |"San Francisco, CA" |
|"Bradley Greenbloom" |"New York, NY" |
-------------------------------------------------
次のコードは、 特定の型に値をキャスト し、列の名前を変更することにより、前の例に追加します。
>>> df.join_table_function("flatten", col("src")["customer"]).select(col("value")["name"].cast(StringType()).as_("Customer Name"), col("value")["address"].cast(StringType()).as_("Customer Address")).show()
-------------------------------------------
|"Customer Name" |"Customer Address" |
-------------------------------------------
|Joyce Ridgely |San Francisco, CA |
|Bradley Greenbloom |New York, NY |
-------------------------------------------
SQL ステートメントの実行¶
指定した SQL ステートメントを実行するには、 Session
クラスの sql
メソッドを呼び出し、実行するステートメントを渡します。メソッドは DataFrame を返します。
アクションメソッドの呼び出し があるまで、 SQL ステートメントは実行されないことに注意してください。
>>> # Get the list of the files in a stage.
>>> # The collect() method causes this SQL statement to be executed.
>>> session.sql("create or replace temp stage my_stage").collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(status='Stage area MY_STAGE successfully created.')]
>>> stage_files_df = session.sql("ls @my_stage").collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
>>> # Resume the operation of a warehouse.
>>> # Note that you must call the collect method to execute
>>> # the SQL statement.
>>> session.sql("alter warehouse if exists my_warehouse resume if suspended").collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(status='Statement executed successfully.')]
>>> # Set up a SQL statement to copy data from a stage to a table.
>>> session.sql("copy into sample_product_data from @my_stage file_format=(type = csv)").collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
[Row(status='Copy executed with 0 files processed.')]
メソッドを呼び出してDataFrame を変換する場合(例: filter
、 select
など)、これらのメソッドは、基になるSQLステートメントがSELECTステートメントである場合にのみ機能することに注意してください。変換メソッドは、他の種類の SQL ステートメントではサポートされていません。
>>> df = session.sql("select id, parent_id from sample_product_data where id < 10")
>>> # Because the underlying SQL statement for the DataFrame is a SELECT statement,
>>> # you can call the filter method to transform this DataFrame.
>>> results = df.filter(col("id") < 3).select(col("id")).collect()
>>> # Prepend a return statement to return the collect() results in a Python worksheet
>>> # In this example, the underlying SQL statement is not a SELECT statement.
>>> df = session.sql("ls @my_stage")
>>> # Calling the filter method results in an error.
>>> try:
... df.filter(col("size") > 50).collect()
... except SnowparkSQLException as e:
... print(e.message)
000904 (42000): SQL compilation error: error line 1 at position 104
invalid identifier 'SIZE'