Snowpark PythonでのDataFramesの使用¶
Snowpark内でデータをクエリして処理する主な方法は、 DataFrame を使用することです。このトピックでは、 DataFrames の操作方法について説明します。
このトピックの内容:
データを取得して操作するには、 DataFrame クラスを使用します。DataFrame は、遅延評価されるリレーショナルデータセットを表します。これは、特定のアクションがトリガーされたときにのみ実行されます。ある意味で、 DataFrame は、データを取得するために評価する必要があるクエリのようなものです。
データを DataFrame に取得するには、
- DataFrame を作成し、データセットのためにデータのソースを指定 します。 - 例えば、テーブル、外部CSVファイル、ローカルデータ、またはSQLステートメントの実行からのデータを保持するDataFrameを作成できます。 
- DataFrame のデータセットを変換する方法を指定 します。 - たとえば、どの列を選択するか、行をどのようにフィルタリングするか、結果をどのようにソートおよびグループ化するかなどを指定できます。 
- ステートメントを実行して、データをDataFrame に取得 します。 - データを DataFrame に取得するには、アクションを実行するメソッド(例: - collect()メソッド)を呼び出す必要があります。
次のセクションでは、これらのステップについて詳しく説明します。
このセクションの例の設定¶
このセクションの例のいくつかは、 DataFrame を使用して sample_product_data という名前のテーブルをクエリします。これらの例を実行する場合は、次の SQL ステートメントを実行することにより、このテーブルを作成し、テーブルにデータを入力できます。
Snowpark Pythonを使用して SQL ステートメントを実行できます。
session.sql('CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT)').collect()
[Row(status='Table SAMPLE_PRODUCT_DATA successfully created.')]
session.sql("""
INSERT INTO sample_product_data VALUES
(1, 0, 5, 'Product 1', 'prod-1', 1, 10),
(2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20),
(3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30),
(4, 0, 10, 'Product 2', 'prod-2', 2, 40),
(5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50),
(6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60),
(7, 0, 20, 'Product 3', 'prod-3', 3, 70),
(8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80),
(9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90),
(10, 0, 50, 'Product 4', 'prod-4', 4, 100),
(11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100),
(12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100)
""").collect()
[Row(number of rows inserted=12)]
テーブルが作成されたことを確認するには、次のコマンドを実行します。
session.sql("SELECT count(*) FROM sample_product_data").collect()
[Row(COUNT(*)=12)]
Pythonワークシートでの例の設定¶
Pythonワークシート でこれらの例を設定して実行するには、サンプルテーブルを作成し、Pythonワークシートを設定します。
- SQL ワークシートを作成し、次を実行します。 
CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT); INSERT INTO sample_product_data VALUES (1, 0, 5, 'Product 1', 'prod-1', 1, 10), (2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20), (3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30), (4, 0, 10, 'Product 2', 'prod-2', 2, 40), (5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50), (6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60), (7, 0, 20, 'Product 3', 'prod-3', 3, 70), (8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80), (9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90), (10, 0, 50, 'Product 4', 'prod-4', 4, 100), (11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100), (12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100); SELECT count(*) FROM sample_product_data;
- sample_product_dataテーブルの作成に使用した SQL ワークシートと同じデータベースとスキーマコンテキストを設定して、 Pythonワークシートを作成します。
このトピックの例をPythonワークシートで使用する場合は、ハンドラー関数(例: main)内で例を使用し、関数に渡される Session オブジェクトを使用して DataFrames を作成します。
たとえば、 session オブジェクトの table メソッドを呼び出して、テーブルの DataFrame を作成します。
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
def main(session: snowpark.Session):
  df_table = session.table("sample_product_data")
DataFrame オブジェクトの show メソッドを呼び出すなど、関数によって生成された出力を確認するには、 Output タブを使用します。
関数によって返された値を調べるには、 Settings » Return type から戻り値のデータ型を選択し、 Results タブを使用します。
- 関数が DataFrame を返す場合は、 Table のデフォルトの戻り値の型を使用してください。 
- 関数が DataFrame オブジェクトの - collectメソッドから- Rowの- listを返す場合は、戻り値の型に Variant を使用します。
- 関数が文字列にキャストできるその他の値を返す場合、または関数が値を返さない場合は、戻り値の型として String を使用します。 
詳細については、 Pythonワークシートの実行 をご参照ください。
DataFrame の構築¶
DataFrameを作成するには、 Session クラスのメソッドとプロパティを使用できます。次の各メソッドは、異なるタイプのデータソースから DataFrame を作成します。
これらの例は、ローカルの開発環境で実行するか、 Pythonワークシート で定義された main 関数内で呼び出すことができます。
- テーブル、ビュー、またはストリームのデータから DataFrame を作成するには、 - tableメソッドを呼び出します。- # Create a DataFrame from the data in the "sample_product_data" table. df_table = session.table("sample_product_data") # To print out the first 10 rows, call df_table.show() 
- 指定された値から DataFrame を作成するには、 - create_dataframeメソッドを呼び出します。- # Create a DataFrame with one column named a from specified values. df1 = session.create_dataframe([1, 2, 3, 4]).to_df("a") df1.show() # To return the DataFrame as a table in a Python worksheet use return instead of show() # return df1 - ------- |"A" | ------- |1 | |2 | |3 | |4 | ------- - 4列 "a"、"b"、"c"、"d" を持つ DataFrame を作成します。 - # Create a DataFrame with 4 columns, "a", "b", "c" and "d". df2 = session.create_dataframe([[1, 2, 3, 4]], schema=["a", "b", "c", "d"]) df2.show() # To return the DataFrame as a table in a Python worksheet use return instead of show() # return df2 - ------------------------- |"A" |"B" |"C" |"D" | ------------------------- |1 |2 |3 |4 | ------------------------- - 4列 "a"、"b"、"c"、"d" を持つ別の DataFrame を作成します。 - # Create another DataFrame with 4 columns, "a", "b", "c" and "d". from snowflake.snowpark import Row df3 = session.create_dataframe([Row(a=1, b=2, c=3, d=4)]) df3.show() # To return the DataFrame as a table in a Python worksheet use return instead of show() # return df3 - ------------------------- |"A" |"B" |"C" |"D" | ------------------------- |1 |2 |3 |4 | ------------------------- - DataFrame を作成し、スキーマを指定します。 - # Create a DataFrame and specify a schema from snowflake.snowpark.types import IntegerType, StringType, StructType, StructField schema = StructType([StructField("a", IntegerType()), StructField("b", StringType())]) df4 = session.create_dataframe([[1, "snow"], [3, "flake"]], schema) df4.show() # To return the DataFrame as a table in a Python worksheet use return instead of show() # return df4 - --------------- |"A" |"B" | --------------- |1 |snow | |3 |flake | --------------- 
- 値の範囲を含む DataFrame を作成するには、 - rangeメソッドを呼び出します。- # Create a DataFrame from a range # The DataFrame contains rows with values 1, 3, 5, 7, and 9 respectively. df_range = session.range(1, 10, 2).to_df("a") df_range.show() # To return the DataFrame as a table in a Python worksheet use return instead of show() # return df_range - ------- |"A" | ------- |1 | |3 | |5 | |7 | |9 | ------- 
- ステージ内のファイルからのデータを保持するDataFrameを作成するには、 - readプロパティを使用して、- DataFrameReaderオブジェクトを取得します。- DataFrameReaderオブジェクトで、ファイル内のデータの形式に対応するメソッドを呼び出します。- from snowflake.snowpark.types import StructType, StructField, StringType, IntegerType # Create DataFrames from data in a stage. df_json = session.read.json("@my_stage2/data1.json") df_catalog = session.read.schema(StructType([StructField("name", StringType()), StructField("age", IntegerType())])).csv("@stage/some_dir") 
- SQL クエリの結果を保持する DataFrame を作成するには、 - sqlメソッドを呼び出します。- # Create a DataFrame from a SQL query df_sql = session.sql("SELECT name from sample_product_data") df_sql.show() # To return the DataFrame as a table in a Python worksheet use return instead of show() # return df_sql - -------------- |"NAME" | -------------- |Product 1 | |Product 1A | |Product 1B | |Product 2 | |Product 2A | |Product 2B | |Product 3 | |Product 3A | |Product 3B | |Product 4 | -------------- 
sql メソッドを使用して、テーブルおよびステージングされたファイルからデータを取得する SELECT ステートメントを実行することは可能ですが、 table メソッドと read プロパティを使用すると、開発ツールでより優れた構文の強調表示、エラーの強調表示、インテリジェントなコード補完が提供されます。
データセットの変換方法の指定¶
選択する列と、結果のフィルタリング、並べ替え、グループ化などの方法を指定するには、データセットを変換する DataFrame メソッドを呼び出します。これらのメソッドで列を識別するには、 col 関数または列に評価される式を使用します。 列と式の指定 をご参照ください。
例:
- 返される行を指定するには、 - filterメソッドを呼び出します。- # Import the col function from the functions module. # Python worksheets import this function by default from snowflake.snowpark.functions import col # Create a DataFrame for the rows with the ID 1 # in the "sample_product_data" table. # This example uses the == operator of the Column object to perform an # equality check. df = session.table("sample_product_data").filter(col("id") == 1) df.show() # To return the DataFrame as a table in a Python worksheet use return instead of show() return df - ------------------------------------------------------------------------------------ |"ID" |"PARENT_ID" |"CATEGORY_ID" |"NAME" |"SERIAL_NUMBER" |"KEY" |"3rd" | ------------------------------------------------------------------------------------ |1 |0 |5 |Product 1 |prod-1 |1 |10 | ------------------------------------------------------------------------------------ 
- 選択する列を指定するには、 - selectメソッドを呼び出します。- # Import the col function from the functions module. from snowflake.snowpark.functions import col # Create a DataFrame that contains the id, name, and serial_number # columns in the "sample_product_data" table. df = session.table("sample_product_data").select(col("id"), col("name"), col("serial_number")) df.show() # To return the DataFrame as a table in a Python worksheet use return instead of show() return df - --------------------------------------- |"ID" |"NAME" |"SERIAL_NUMBER" | --------------------------------------- |1 |Product 1 |prod-1 | |2 |Product 1A |prod-1-A | |3 |Product 1B |prod-1-B | |4 |Product 2 |prod-2 | |5 |Product 2A |prod-2-A | |6 |Product 2B |prod-2-B | |7 |Product 3 |prod-3 | |8 |Product 3A |prod-3-A | |9 |Product 3B |prod-3-B | |10 |Product 4 |prod-4 | --------------------------------------- 
- 次のような列を参照することもできます。 - # Import the col function from the functions module. from snowflake.snowpark.functions import col df_product_info = session.table("sample_product_data") df1 = df_product_info.select(df_product_info["id"], df_product_info["name"], df_product_info["serial_number"]) df2 = df_product_info.select(df_product_info.id, df_product_info.name, df_product_info.serial_number) df3 = df_product_info.select("id", "name", "serial_number") 
各メソッドは、変換された新しい DataFrame オブジェクトを返します。このメソッドは元の DataFrame オブジェクトには影響を及ぼしません。複数の変換を適用する場合は、 チェーンメソッド呼び出し を実行して、前のメソッド呼び出しによって返された、新しい DataFrame オブジェクトに対する後続の各変換メソッドを呼び出すことができます。
これらの変換メソッドは、SQL ステートメントの作成方法を指定し、Snowflakeデータベースからデータを取得しません。 DataFrame を評価するアクションの実行 で説明されているアクションメソッドは、データ取得を実行します。
DataFrames の結合¶
DataFrame オブジェクトを結合するには、 join メソッドを呼び出します。
# Create two DataFrames to join
df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
# Create a DataFrame that joins the two DataFrames
# on the column named "key".
df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs["key"].as_("key"), "value1", "value2").show()
# To return the DataFrame as a table in a Python worksheet use return instead of show()
return df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs["key"].as_("key"), "value1", "value2")
-------------------------------
|"KEY"  |"VALUE1"  |"VALUE2"  |
-------------------------------
|a      |1         |3         |
|b      |2         |4         |
-------------------------------
両方の DataFrames に結合する同じ列がある場合は、次の構文例を使用できます。
# Create two DataFrames to join
df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
# If both dataframes have the same column "key", the following is more convenient.
df_lhs.join(df_rhs, ["key"]).show()
# To return the DataFrame as a table in a Python worksheet use return instead of show()
return df_lhs.join(df_rhs, ["key"])
-------------------------------
|"KEY"  |"VALUE1"  |"VALUE2"  |
-------------------------------
|a      |1         |3         |
|b      |2         |4         |
-------------------------------
& 演算子を使用して結合式を接続することもできます。
# Create two DataFrames to join
df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
# Use & operator connect join expression. '|' and ~ are similar.
df_joined_multi_column = df_lhs.join(df_rhs, (df_lhs.col("key") == df_rhs.col("key")) & (df_lhs.col("value1") < df_rhs.col("value2"))).select(df_lhs["key"].as_("key"), "value1", "value2")
df_joined_multi_column.show()
# To return the DataFrame as a table in a Python worksheet use return instead of show()
return df_joined_multi_column
-------------------------------
|"KEY"  |"VALUE1"  |"VALUE2"  |
-------------------------------
|a      |1         |3         |
|b      |2         |4         |
-------------------------------
自己結合を実行する場合は、DataFrame をコピーする必要があります。
# copy the DataFrame if you want to do a self-join
from copy import copy
# Create two DataFrames to join
df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
df_lhs_copied = copy(df_lhs)
df_self_joined = df_lhs.join(df_lhs_copied, (df_lhs.col("key") == df_lhs_copied.col("key")) & (df_lhs.col("value1") == df_lhs_copied.col("value1")))
DataFrames に重複する列がある場合、Snowparkはランダムに生成されたプレフィックスを結合結果の列に追加します。
# Create two DataFrames to join
df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).show()
# To return the DataFrame as a table in a Python worksheet use return instead of show()
return df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key"))
-----------------------------------------------------
|"l_av5t_KEY"  |"VALUE1"  |"r_1p6k_KEY"  |"VALUE2"  |
-----------------------------------------------------
|a             |1         |a             |3         |
|b             |2         |b             |4         |
-----------------------------------------------------
Column.alias を使用して、重複する列の名前を変更できます。
# Create two DataFrames to join
df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs["key"].alias("key1"), df_rhs["key"].alias("key2"), "value1", "value2").show()
# To return the DataFrame as a table in a Python worksheet use return instead of show()
return df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs["key"].alias("key1"), df_rhs["key"].alias("key2"), "value1", "value2")
-----------------------------------------
|"KEY1"  |"KEY2"  |"VALUE1"  |"VALUE2"  |
-----------------------------------------
|a       |a       |1         |3         |
|b       |b       |2         |4         |
-----------------------------------------
ランダムなプレフィックスを避けるために、重複する列に追加するサフィックスを指定することもできます。
# Create two DataFrames to join
df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key"), lsuffix="_left", rsuffix="_right").show()
# To return the DataFrame as a table in a Python worksheet use return instead of show()
return df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key"), lsuffix="_left", rsuffix="_right")
--------------------------------------------------
|"KEY_LEFT"  |"VALUE1"  |"KEY_RIGHT"  |"VALUE2"  |
--------------------------------------------------
|a           |1         |a            |3         |
|b           |2         |b            |4         |
--------------------------------------------------
これらの例では、 DataFrame.col を使用して、結合で使用する列を指定しています。列を指定するその他の方法については、 列と式の指定 をご参照ください。
異なる列でテーブルをそれ自体と結合する必要がある場合は、単一の DataFrame で自己結合を実行することはできません。次の例は単一の DataFrame を使用して自己結合を実行し、 "id" の列式が結合の左側と右側に存在するため、失敗します。
from snowflake.snowpark.exceptions import SnowparkJoinException
df = session.table("sample_product_data")
# This fails because columns named "id" and "parent_id"
# are in the left and right DataFrames in the join.
try:
  df_joined = df.join(df, col("id") == col("parent_id")) # fails
except SnowparkJoinException as e:
  print(e.message)
You cannot join a DataFrame with itself because the column references cannot be resolved correctly. Instead, create a copy of the DataFrame with copy.copy(), and join the DataFrame with this copy.
# This fails because columns named "id" and "parent_id"
# are in the left and right DataFrames in the join.
try:
  df_joined = df.join(df, df["id"] == df["parent_id"])   # fails
except SnowparkJoinException as e:
  print(e.message)
You cannot join a DataFrame with itself because the column references cannot be resolved correctly. Instead, create a copy of the DataFrame with copy.copy(), and join the DataFrame with this copy.
代わりに、Pythonのビルトイン copy() メソッドを使用してDataFrameオブジェクトのクローンを作成し、2つのDataFrameオブジェクトを使用して結合を実行します。
from copy import copy
# Create a DataFrame object for the "sample_product_data" table for the left-hand side of the join.
df_lhs = session.table("sample_product_data")
# Clone the DataFrame object to use as the right-hand side of the join.
df_rhs = copy(df_lhs)
# Create a DataFrame that joins the two DataFrames
# for the "sample_product_data" table on the
# "id" and "parent_id" columns.
df_joined = df_lhs.join(df_rhs, df_lhs.col("id") == df_rhs.col("parent_id"))
df_joined.count()
列と式の指定¶
これらの変換メソッドを呼び出すときは、列または列を使用する式を指定する必要がある場合があります。たとえば、 select メソッドを呼び出すときは、選択する必要のある列を指定する必要があります。
列を参照するには、 snowflake.snowpark.functions モジュールで col 関数を呼び出して、 Column オブジェクトを作成します。
# Import the col function from the functions module.
from snowflake.snowpark.functions import col
df_product_info = session.table("sample_product_data").select(col("id"), col("name"))
df_product_info.show()
# To return the DataFrame as a table in a Python worksheet use return instead of show()
return df_product_info
---------------------
|"ID"  |"NAME"      |
---------------------
|1     |Product 1   |
|2     |Product 1A  |
|3     |Product 1B  |
|4     |Product 2   |
|5     |Product 2A  |
|6     |Product 2B  |
|7     |Product 3   |
|8     |Product 3A  |
|9     |Product 3B  |
|10    |Product 4   |
---------------------
注釈
リテラルの Column オブジェクトを作成するには、 列オブジェクトとしてのリテラルの使用 をご参照ください。
フィルター、プロジェクション、結合条件などを指定する場合は、式で Column オブジェクトを使用できます。例:
- filterメソッドで- Columnオブジェクトを使用して、フィルター条件を指定できます。- # Specify the equivalent of "WHERE id = 20" # in a SQL SELECT statement. df_filtered = df.filter(col("id") == 20) - df = session.create_dataframe([[1, 3], [2, 10]], schema=["a", "b"]) # Specify the equivalent of "WHERE a + b < 10" # in a SQL SELECT statement. df_filtered = df.filter((col("a") + col("b")) < 10) df_filtered.show() # To return the DataFrame as a table in a Python worksheet use return instead of show() return df_filtered - ------------- |"A" |"B" | ------------- |1 |3 | ------------- 
- selectメソッドで- Columnオブジェクトを使用して、エイリアスを定義できます。- df = session.create_dataframe([[1, 3], [2, 10]], schema=["a", "b"]) # Specify the equivalent of "SELECT b * 10 AS c" # in a SQL SELECT statement. df_selected = df.select((col("b") * 10).as_("c")) df_selected.show() # To return the DataFrame as a table in a Python worksheet use return instead of show() return df_selected - ------- |"C" | ------- |30 | |100 | ------- 
- joinメソッドで- Columnオブジェクトを使用して、結合条件を定義できます。- dfX = session.create_dataframe([[1], [2]], schema=["a_in_X"]) dfY = session.create_dataframe([[1], [3]], schema=["b_in_Y"]) # Specify the equivalent of "X JOIN Y on X.a_in_X = Y.b_in_Y" # in a SQL SELECT statement. df_joined = dfX.join(dfY, col("a_in_X") == col("b_in_Y")).select(dfX["a_in_X"].alias("the_joined_column")) df_joined.show() # To return the DataFrame as a table in a Python worksheet use return instead of show() return df_joined - ----------------------- |"THE_JOINED_COLUMN" | ----------------------- |1 | ----------------------- 
同じ名前の2つの異なる DataFrame オブジェクトの列を参照する場合(例: その列の DataFrames を結合する場合)、1つの DataFrame オブジェクトにある DataFrame.col メソッドを使用して、そのオブジェクトにある列(例: df1.col("name") と df2.col("name"))を参照できます。
次の例は、 DataFrame.col メソッドを使用して特定の DataFrame の列を参照する方法を示しています。この例では、両方とも key という名前の列を持つ2つの DataFrame オブジェクトを結合します。この例では、 Column.as メソッドを使用して、新しく作成された DataFrame の列の名前を変更します。
# Create two DataFrames to join
df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value"])
df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value"])
# Create a DataFrame that joins two other DataFrames (df_lhs and df_rhs).
# Use the DataFrame.col method to refer to the columns used in the join.
df_joined = df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs.col("key").as_("key"), df_lhs.col("value").as_("L"), df_rhs.col("value").as_("R"))
df_joined.show()
# To return the DataFrame as a table in a Python worksheet use return instead of show()
return df_joined
---------------------
|"KEY"  |"L"  |"R"  |
---------------------
|a      |1    |3    |
|b      |2    |4    |
---------------------
オブジェクト識別子(テーブル名、列名など)の前後での二重引用符の使用¶
指定するデータベース、スキーマ、テーブル、およびステージの名前は、 Snowflake識別子の要件 に準拠している必要があります。
大文字と小文字を区別する列を持つテーブルを作成します。
session.sql("""
  create or replace temp table "10tablename"(
  id123 varchar, -- case insensitive because it's not quoted.
  "3rdID" varchar, -- case sensitive.
  "id with space" varchar -- case sensitive.
)""").collect()
# Add return to the statement to return the collect() results in a Python worksheet
[Row(status='Table 10tablename successfully created.')]
次に、テーブルに値を追加します。
session.sql("""insert into "10tablename" (id123, "3rdID", "id with space") values ('a', 'b', 'c')""").collect()
# Add return to the statement to return the collect() results in a Python worksheet
[Row(number of rows inserted=1)]
次に、テーブルの DataFrame を作成し、テーブルに対してクエリを実行します。
df = session.table('"10tablename"')
df.show()
# To return the DataFrame as a table in a Python worksheet use return instead of show()
return df
---------------------------------------
|"ID123"  |"3rdID"  |"id with space"  |
---------------------------------------
|a        |b        |c                |
---------------------------------------
名前を指定すると、Snowflakeはその名前を大文字と見なします。たとえば、次の呼び出しは同等です。
df.select(col("id123")).collect()
# Prepend a return statement to return the collect() results in a Python worksheet
[Row(ID123='a')]
名前が識別子の要件に準拠していない場合は、名前を二重引用符(")で囲む必要があります。バックスラッシュ(\)を使用して、文字列リテラル内の二重引用符をエスケープします。たとえば、次のテーブル名は文字やアンダースコアで始まらないため、名前を二重引用符で囲む必要があります。
df = session.table("\"10tablename\"")
または、バックスラッシュの代わりに一重引用符を使用して、文字列リテラル内の二重引用符をエスケープすることもできます。
df = session.table('"10tablename"')
列 の名前を指定するときは、名前を二重引用符で囲む必要はありません。名前が識別子の要件に準拠していない場合、Snowparkライブラリは列名を自動的に二重引用符で囲みます。
df.select(col("3rdID")).collect()
# Prepend a return statement to return the collect() results in a Python worksheet
[Row(3rdID='b')]
別の例として、次の呼び出しは同等です。
df.select(col("id with space")).collect()
# Prepend a return statement to return the collect() results in a Python worksheet
[Row(id with space='c')]
df.select(col("\"id with space\"")).collect()
# Prepend a return statement to return the collect() results in a Python worksheet
[Row(id with space='c')]
すでに列名を二重引用符で囲んでいる場合、ライブラリは名前を二重引用符で囲みません。
場合によっては、列名に二重引用符が含まれることがあります。
session.sql('''
  create or replace temp table quoted(
  "name_with_""air""_quotes" varchar,
  """column_name_quoted""" varchar
)''').collect()
# Prepend a return statement to return the collect() results in a Python worksheet
[Row(status='Table QUOTED successfully created.')]
session.sql('''insert into quoted ("name_with_""air""_quotes", """column_name_quoted""") values ('a', 'b')''').collect()
# Prepend a return statement to return the collect() results in a Python worksheet
[Row(number of rows inserted=1)]
識別子の要件 で説明されているように、二重引用符で囲まれた識別子内の二重引用符文字ごとに、2つの二重引用符文字(例: "name_with_""air""_quotes" と """column_name_quoted""")を使用する必要があります。
df_table = session.table("quoted")
df_table.select("\"name_with_\"\"air\"\"_quotes\"").collect()
# Prepend a return statement to return the collect() results in a Python worksheet
[Row(name_with_"air"_quotes='a')]
df_table.select("\"\"\"column_name_quoted\"\"\"").collect()
# Prepend a return statement to return the collect() results in a Python worksheet
[Row("column_name_quoted"='b')]
識別子が二重引用符で囲まれている場合は(明示的に引用符を追加したか、ライブラリが引用符を追加したかに関係なく)、 Snowflakeは識別子を大文字と小文字を区別する ものとして扱います。
# The following calls are NOT equivalent!
# The Snowpark library adds double quotes around the column name,
# which makes Snowflake treat the column name as case-sensitive.
df.select(col("id with space")).collect()
# Prepend a return statement to return the collect() results in a Python worksheet
[Row(id with space='c')]
この例と比較すると:
from snowflake.snowpark.exceptions import SnowparkSQLException
try:
  df.select(col("ID WITH SPACE")).collect()
except SnowparkSQLException as e:
  print(e.message)
000904 (42000): SQL compilation error: error line 1 at position 7
invalid identifier '"ID WITH SPACE"'
列オブジェクトとしてのリテラルの使用¶
Column オブジェクトを引数として取るメソッドでリテラルを使用するには、リテラルを snowflake.snowpark.functions モジュールの lit 関数に渡して、リテラルの Column オブジェクトを作成します。例:
# Import for the lit and col functions.
from snowflake.snowpark.functions import col, lit
# Show the first 10 rows in which num_items is greater than 5.
# Use `lit(5)` to create a Column object for the literal 5.
df_filtered = df.filter(col("num_items") > lit(5))
列オブジェクトの特定の型へのキャスト¶
Column オブジェクトを特定の型にキャストするには、 cast メソッドを呼び出し、 snowflake.snowpark.types モジュールから型オブジェクトを渡します。たとえば、リテラルを5の精度と2のスケールで NUMBER としてキャストするには、次のようにします。
# Import for the lit function.
from snowflake.snowpark.functions import lit
 # Import for the DecimalType class.
from snowflake.snowpark.types import DecimalType
decimal_value = lit(0.05).cast(DecimalType(5,2))
メソッド呼び出しのチェーン¶
DataFrame オブジェクトを変換するメソッド ごとに、変換が適用された新しい DataFrame オブジェクトが返されるため、 メソッド呼び出しのチェーン により、追加の方法で変換される新しい DataFrame を生成できます。
次の例では、次のように構成された DataFrame を返します。
- sample_product_dataテーブルをクエリします。
- id = 1で行を返します。
- nameおよび- serial_number列を選択します。- df_product_info = session.table("sample_product_data").filter(col("id") == 1).select(col("name"), col("serial_number")) df_product_info.show() # To return the DataFrame as a table in a Python worksheet use return instead of show() return df_product_info - ------------------------------- |"NAME" |"SERIAL_NUMBER" | ------------------------------- |Product 1 |prod-1 | ------------------------------- 
この例では:
- session.table("sample_product_data")は、- sample_product_dataテーブルの DataFrame を返します。- DataFrame にはまだテーブルのデータが含まれていませんが、オブジェクトにはテーブルの列の定義が含まれています。 
- filter(col("id") == 1)は、- id = 1で行を返すように設定された- sample_product_dataテーブルの DataFrame を返します。- DataFrame には、テーブルからの一致する行がまだ含まれていないことに注意してください。 アクションメソッドを呼び出す まで、一致する行は取得されません。 
- select(col("name"), col("serial_number"))は、- id = 1を持つ- sample_product_dataテーブルの行の- name列と- serial_number列を含む DataFrame を返します。
メソッド呼び出しをチェーンするときは、呼び出しの順序が重要です。各メソッド呼び出しは、変換された DataFrame を返します。後続の呼び出しが変換された DataFrame で機能することを確認します。
Snowpark Pythonを使用すると、SQL ステートメントで同等のキーワード(SELECT および WHERE)を使用する場合とは異なる順序で、 select および filter メソッドの呼び出しが必要になる場合があります。
列定義の取得¶
DataFrameのためにデータセット内の列の定義を取得するには、 schema プロパティを呼び出します。このメソッドは、 StructField オブジェクトの list を含む StructType オブジェクトを返します。各 StructField オブジェクトには、列の定義が含まれています。
# Import the StructType
from snowflake.snowpark.types import *
# Get the StructType object that describes the columns in the
# underlying rowset.
table_schema = session.table("sample_product_data").schema
table_schema
StructType([StructField('ID', LongType(), nullable=True), StructField('PARENT_ID', LongType(), nullable=True), StructField('CATEGORY_ID', LongType(), nullable=True), StructField('NAME', StringType(), nullable=True), StructField('SERIAL_NUMBER', StringType(), nullable=True), StructField('KEY', LongType(), nullable=True), StructField('"3rd"', LongType(), nullable=True)])
返された StructType オブジェクトでは、列名は常に正規化されています。引用符で囲まれていない識別子は大文字で返され、引用符で囲まれた識別子は定義された正確な大文字小文字で返されます。
次の例では、 ID および 3rd という名前の列を含む DataFrame を作成します。列名 3rd の場合は、 名前が識別子の要件に準拠していない ため、Snowparkライブラリは名前を自動的に二重引用符("3rd")で囲みます。
この例では、 schema プロパティを呼び出してから、返された StructType オブジェクトで names プロパティを呼び出して、列名の list を取得します。名前は、 schema プロパティによって返される StructType で正規化されます。
# Create a DataFrame containing the "id" and "3rd" columns.
df_selected_columns = session.table("sample_product_data").select(col("id"), col("3rd"))
# Print out the names of the columns in the schema.
# This prints List["ID", "\"3rd\""]
df_selected_columns.schema.names
['ID', '"3rd"']
DataFrame を評価するアクションの実行¶
前述のように、 DataFrame は遅延評価されます。つまり、アクションを実行するまで、 SQL ステートメントは実行のためにサーバーに送信されません。アクションにより、 DataFrame が評価され、対応する SQL ステートメントが実行のためにサーバーに送信されます。
このリリースでは、次のメソッドがアクションを実行します。
| クラス | メソッド | 説明 | 
|---|---|---|
| 
 | 
 | DataFrame を評価し、結果のデータセットを  | 
| 
 | 
 | DataFrame を評価し、行数を返します。 | 
| 
 | 
 | DataFrame を評価し、行をコンソールに出力します。このメソッドでは行数が10に制限されます(デフォルト)。 | 
| 
 | 
 | DataFrame のデータを指定したテーブルに保存します。 テーブルへのデータの保存 をご参照ください。 | 
たとえば、テーブルに対してクエリを実行して結果を返すには、 collect メソッドを呼び出します。
# Create a DataFrame with the "id" and "name" columns from the "sample_product_data" table.
# This does not execute the query.
df = session.table("sample_product_data").select(col("id"), col("name"))
# Send the query to the server for execution and
# return a list of Rows containing the results.
results = df.collect()
# Use a return statement to return the collect() results in a Python worksheet
# return results
クエリを実行して結果の数を返すには、 count メソッドを呼び出します。
# Create a DataFrame for the "sample_product_data" table.
df_products = session.table("sample_product_data")
# Send the query to the server for execution and
# print the count of rows in the table.
print(df_products.count())
12
クエリを実行して結果をコンソールに出力するには、 show メソッドを呼び出します。
# Create a DataFrame for the "sample_product_data" table.
df_products = session.table("sample_product_data")
# Send the query to the server for execution and
# print the results to the console.
# The query limits the number of rows to 10 by default.
df_products.show()
# To return the DataFrame as a table in a Python worksheet use return instead of show()
return df_products
-------------------------------------------------------------------------------------
|"ID"  |"PARENT_ID"  |"CATEGORY_ID"  |"NAME"      |"SERIAL_NUMBER"  |"KEY"  |"3rd"  |
-------------------------------------------------------------------------------------
|1     |0            |5              |Product 1   |prod-1           |1      |10     |
|2     |1            |5              |Product 1A  |prod-1-A         |1      |20     |
|3     |1            |5              |Product 1B  |prod-1-B         |1      |30     |
|4     |0            |10             |Product 2   |prod-2           |2      |40     |
|5     |4            |10             |Product 2A  |prod-2-A         |2      |50     |
|6     |4            |10             |Product 2B  |prod-2-B         |2      |60     |
|7     |0            |20             |Product 3   |prod-3           |3      |70     |
|8     |7            |20             |Product 3A  |prod-3-A         |3      |80     |
|9     |7            |20             |Product 3B  |prod-3-B         |3      |90     |
|10    |0            |50             |Product 4   |prod-4           |4      |100    |
-------------------------------------------------------------------------------------
行数を20に制限するには:
# Create a DataFrame for the "sample_product_data" table.
df_products = session.table("sample_product_data")
# Limit the number of rows to 20, rather than 10.
df_products.show(20)
# All rows are returned when you use return in a Python worksheet to return the DataFrame as a table
return df_products
-------------------------------------------------------------------------------------
|"ID"  |"PARENT_ID"  |"CATEGORY_ID"  |"NAME"      |"SERIAL_NUMBER"  |"KEY"  |"3rd"  |
-------------------------------------------------------------------------------------
|1     |0            |5              |Product 1   |prod-1           |1      |10     |
|2     |1            |5              |Product 1A  |prod-1-A         |1      |20     |
|3     |1            |5              |Product 1B  |prod-1-B         |1      |30     |
|4     |0            |10             |Product 2   |prod-2           |2      |40     |
|5     |4            |10             |Product 2A  |prod-2-A         |2      |50     |
|6     |4            |10             |Product 2B  |prod-2-B         |2      |60     |
|7     |0            |20             |Product 3   |prod-3           |3      |70     |
|8     |7            |20             |Product 3A  |prod-3-A         |3      |80     |
|9     |7            |20             |Product 3B  |prod-3-B         |3      |90     |
|10    |0            |50             |Product 4   |prod-4           |4      |100    |
|11    |10           |50             |Product 4A  |prod-4-A         |4      |100    |
|12    |10           |50             |Product 4B  |prod-4-B         |4      |100    |
-------------------------------------------------------------------------------------
注釈
DataFrame の列の定義を取得するために schema プロパティを呼び出す場合は、アクションメソッドを呼び出す必要はありません。
テーブルへのデータの保存¶
DataFrame の内容をテーブルに保存するには、
- writeプロパティを呼び出して、- DataFrameWriterオブジェクトを取得します。
- DataFrameWriterオブジェクトの- modeメソッドを呼び出し、モードを指定します。詳細については、 API のドキュメント をご参照ください。このメソッドは、指定されたモードで構成された新しい- DataFrameWriterオブジェクトを返します。
- DataFrameWriterオブジェクトの- save_as_tableメソッドを呼び出し、 DataFrame の内容を指定されたテーブルに保存します。
データをテーブルに保存する SQL ステートメントを実行するために、別のメソッド(例: collect)を呼び出す必要はありません。
例:
df.write.mode("overwrite").save_as_table("table1")
DataFrame からのビューの作成¶
DataFrameからビューを作成するには、 create_or_replace_view メソッドを呼び出します。これにより新しいビューがすぐに作成されます。
import os
database = os.environ["snowflake_database"]  # use your own database and schema
schema = os.environ["snowflake_schema"]
view_name = "my_view"
df.create_or_replace_view(f"{database}.{schema}.{view_name}")
[Row(status='View MY_VIEW successfully created.')]
Pythonワークシートでは、データベースとスキーマのコンテキストでワークシートを実行するため、次を実行してビューを作成できます。
# Define a DataFrame
df_products = session.table("sample_product_data")
# Define a View name
view_name = "my_view"
# Create the view
df_products.create_or_replace_view(f"{view_name}")
# return the view name
return view_name + " successfully created"
my_view successfully created
create_or_replace_view を呼び出して作成したビューは永続的です。そのビューが不要になった場合は、 ビューを手動でドロップ できます。
または、仮ビューを作成する create_or_replace_temp_view メソッドを使用します。仮ビューは、それが作成されたセッションでのみ使用できます。
ステージでのファイルの操作¶
このセクションでは、Snowflakeステージでファイル内のデータをクエリする方法について説明します。ファイルに対するその他の操作には、 SQL ステートメント を使用します。
Snowflakeステージのファイル内のデータをクエリするには、 DataFrameReader クラスを使用します。
- Sessionクラスの- readメソッドを呼び出して、- DataFrameReaderオブジェクトにアクセスします。
- ファイルが CSV 形式の場合は、ファイルのフィールドを記述します。これを実行するには、 - ファイル内のフィールドを説明する - StructTypeオブジェクトの- listで構成される- StructFieldオブジェクトを作成します。
- StructFieldオブジェクトごとに、以下を指定します。- フィールドの名前。 
- フィールドのデータ型( - snowflake.snowpark.typesモジュールでオブジェクトとして指定)。
- フィールドがNULL可能かどうか。 
 - 例: - from snowflake.snowpark.types import * schema_for_data_file = StructType([ StructField("id", StringType()), StructField("name", StringType()) ]) 
- DataFrameReaderオブジェクトで- schemaプロパティを呼び出し、- StructTypeオブジェクトを渡します。- 例: - df_reader = session.read.schema(schema_for_data_file) - schemaプロパティは、指定されたフィールドを含むファイルを読み取るように構成された- DataFrameReaderオブジェクトを返します。- 他の形式(JSON など)のファイルでは、これを実行する必要はありません。これらのファイルの場合、 - DataFrameReaderは、データをフィールド名- $1の VARIANT 型の単一フィールドとして扱います。
 
- データの読み取り方法に関する追加情報を指定する必要がある(例: データが圧縮されている、またはCSVファイルがフィールドを区切るためにコンマではなくセミコロンを使用している)場合は、 - DataFrameReaderオブジェクトの- optionまたは- optionsメソッドを呼び出します。- optionメソッドは、設定するオプションの名前と値をとり、複数のチェーン呼び出しを組み合わせることができますが、- optionsメソッドは、オプションの名前とそれに対応する値のディクショナリをとります。- ファイル形式オプションの名前と値については、 CREATE FILE FORMAT のドキュメント をご参照ください。 - COPY INTO TABLE ドキュメント で説明されているコピーオプションを設定することもできます。コピーオプションを設定すると、 DataFrame にデータを取得 するときに、よりコストのかかる実行戦略の実行される可能性があることに注意してください。 - 次の例では、圧縮されておらず、フィールド区切り文字にセミコロンを使用する CSV ファイル内のデータをクエリするように、 - DataFrameReaderオブジェクトを設定します。- df_reader = df_reader.option("field_delimiter", ";").option("COMPRESSION", "NONE") - optionおよび- optionsメソッドは、指定されたオプションで構成された- DataFrameReaderオブジェクトを返します。
- ファイルの形式に対応するメソッド(例: - csvメソッド)を呼び出し、ファイルの場所を渡します。- df = df_reader.csv("@s3_ts_stage/emails/data_0_0_0.csv") - ファイルの形式に対応するメソッドは、そのファイルにデータを保持するように構成された DataFrame オブジェクトを返します。 
- DataFrame オブジェクトメソッドを使用して、データセットで必要な 変換を実行 します(例: 特定のフィールドの選択、行のフィルタリングなど)。 - たとえば、 - my_stageという名前のステージで JSON ファイルから- color要素を抽出するには、- # Import the sql_expr function from the functions module. from snowflake.snowpark.functions import sql_expr df = session.read.json("@my_stage").select(sql_expr("$1:color")) - 前に説明したように、 CSV 以外の形式のファイル(例: JSON)の場合、 - DataFrameReaderは、ファイル内のデータを- $1という名前の単一の VARIANT 列として扱います。- この例では、 - snowflake.snowpark.functionsモジュールの- sql_expr関数を使用して、- color要素へのパスを指定します。- sql_expr関数は入力引数を解釈または変更しないことに注意してください。この関数を使用すると、Snowpark API でまだサポートされていない式とスニペットを SQL で作成できます。
- アクションメソッドの呼び出し により、ファイル内のデータをクエリします。 - テーブルの DataFrames の場合と同様、データは、アクションメソッドを呼び出すまで DataFrame に取得されません。 
半構造化データの操作¶
DataFrame を使用すると、 半構造化データ (例: JSON データ)へのクエリとアクセスができます。次のセクションでは、 DataFrame 内の半構造化データの操作方法について説明します。
注釈
これらのセクションの例では、 例で使用されるサンプルデータ のサンプルデータを使用しています。
半構造化データの走査¶
半構造化データの特定のフィールドまたは要素を参照するには、 Column オブジェクトの次のメソッドを使用します。
- col_object["<フィールド名>"]の属性を取得して、OBJECT (または OBJECT を含む VARIANT)のフィールドに- Columnオブジェクトを返します。
- col_object[<インデックス>]を使用して、ARRAY (または ARRAY を含む VARIANT)の要素の- Columnオブジェクトを返します。
注釈
パス内のフィールド名または要素が不規則であり、上で説明したインデックスの使用が困難な場合は、 get、 get_ignore_case、または get_path を代わりに使用できます。
たとえば、次のコードは、 サンプルデータ の src 列にあるオブジェクトの dealership フィールドを選択します。
from snowflake.snowpark.functions import col
df = session.table("car_sales")
df.select(col("src")["dealership"]).show()
このコードは、次を出力します。
----------------------------
|"""SRC""['DEALERSHIP']"   |
----------------------------
|"Valley View Auto Sales"  |
|"Tindel Toyota"           |
----------------------------
注釈
DataFrame の値は文字列リテラルとして返されるため、二重引用符で囲まれています。これらの値を特定の型にキャストするには、 半構造化データへの明示的な値のキャスト をご参照ください。
メソッド呼び出しのチェーン を使用して、特定のフィールドまたは要素へのパスを走査することもできます。
たとえば、次のコードは salesperson オブジェクトの name フィールドを選択します。
df = session.table("car_sales")
df.select(df["src"]["salesperson"]["name"]).show()
このコードは、次を出力します。
------------------------------------
|"""SRC""['SALESPERSON']['NAME']"  |
------------------------------------
|"Frank Beasley"                   |
|"Greg Northrup"                   |
------------------------------------
別の例として、次のコードは、車両の配列を保持する vehicle フィールドの最初の要素を選択します。この例では、最初の要素から price フィールドも選択しています。
df = session.table("car_sales")
df.select(df["src"]["vehicle"][0]).show()
df.select(df["src"]["vehicle"][0]["price"]).show()
このコードは、次を出力します。
---------------------------
|"""SRC""['VEHICLE'][0]"  |
---------------------------
|{                        |
|  "extras": [            |
|    "ext warranty",      |
|    "paint protection"   |
|  ],                     |
|  "make": "Honda",       |
|  "model": "Civic",      |
|  "price": "20275",      |
|  "year": "2017"         |
|}                        |
|{                        |
|  "extras": [            |
|    "ext warranty",      |
|    "rust proofing",     |
|    "fabric protection"  |
|  ],                     |
|  "make": "Toyota",      |
|  "model": "Camry",      |
|  "price": "23500",      |
|  "year": "2017"         |
|}                        |
---------------------------
------------------------------------
|"""SRC""['VEHICLE'][0]['PRICE']"  |
------------------------------------
|"20275"                           |
|"23500"                           |
------------------------------------
前述の方法でフィールドにアクセスする代わりに、パス内のフィールド名または要素が不規則な場合は、 get、 get_ignore_case、または get_path 関数を使用できます。
たとえば、次のコード行は両方とも、オブジェクトの指定されたフィールドの値を出力します。
from snowflake.snowpark.functions import get, get_path, lit
df.select(get(col("src"), lit("dealership"))).show()
df.select(col("src")["dealership"]).show()
同様に、次のコード行は両方とも、オブジェクトの指定されたパスにあるフィールドの値を出力します。
df.select(get_path(col("src"), lit("vehicle[0].make"))).show()
df.select(col("src")["vehicle"][0]["make"]).show()
半構造化データへの明示的な値のキャスト¶
デフォルトでは、上記の例に示すように、フィールドと要素の値は文字列リテラル(二重引用符を含む)として返されます。
予期しない結果を回避するには、 cast メソッドを呼び出して、値を特定の型にキャストします。たとえば、次のコードは、キャストなしとキャストありの値を出力します。
# Import the objects for the data types, including StringType.
from snowflake.snowpark.types import *
df = session.table("car_sales")
df.select(col("src")["salesperson"]["id"]).show()
df.select(col("src")["salesperson"]["id"].cast(StringType())).show()
このコードは、次を出力します。
----------------------------------
|"""SRC""['SALESPERSON']['ID']"  |
----------------------------------
|"55"                            |
|"274"                           |
----------------------------------
---------------------------------------------------
|"CAST (""SRC""['SALESPERSON']['ID'] AS STRING)"  |
---------------------------------------------------
|55                                               |
|274                                              |
---------------------------------------------------
オブジェクト配列の行へのフラット化¶
半構造化データを DataFrame に「フラット化」する必要がある場合(例: 配列内にあるすべてのオブジェクトの行を生成する場合)は、 join_table_function メソッドを使用して flatten を呼び出します。このメソッドは、 FLATTEN SQL 関数と同等です。オブジェクトまたは配列へのパスを渡すと、メソッドは、各フィールドの行か、オブジェクトまたは配列の要素の行を含む DataFrame を返します。
たとえば、 サンプルデータ では、 src:customer は顧客に関する情報を含むオブジェクトの配列です。各オブジェクトには、 name および address フィールドが含まれています。
このパスを flatten 関数に渡すと、次のようになります。
df = session.table("car_sales")
df.join_table_function("flatten", col("src")["customer"]).show()
メソッドは DataFrame を返します。
----------------------------------------------------------------------------------------------------------------------------------------------------------
|"SRC"                                      |"SEQ"  |"KEY"  |"PATH"  |"INDEX"  |"VALUE"                            |"THIS"                               |
----------------------------------------------------------------------------------------------------------------------------------------------------------
|{                                          |1      |NULL   |[0]     |0        |{                                  |[                                    |
|  "customer": [                            |       |       |        |         |  "address": "San Francisco, CA",  |  {                                  |
|    {                                      |       |       |        |         |  "name": "Joyce Ridgely",         |    "address": "San Francisco, CA",  |
|      "address": "San Francisco, CA",      |       |       |        |         |  "phone": "16504378889"           |    "name": "Joyce Ridgely",         |
|      "name": "Joyce Ridgely",             |       |       |        |         |}                                  |    "phone": "16504378889"           |
|      "phone": "16504378889"               |       |       |        |         |                                   |  }                                  |
|    }                                      |       |       |        |         |                                   |]                                    |
|  ],                                       |       |       |        |         |                                   |                                     |
|  "date": "2017-04-28",                    |       |       |        |         |                                   |                                     |
|  "dealership": "Valley View Auto Sales",  |       |       |        |         |                                   |                                     |
|  "salesperson": {                         |       |       |        |         |                                   |                                     |
|    "id": "55",                            |       |       |        |         |                                   |                                     |
|    "name": "Frank Beasley"                |       |       |        |         |                                   |                                     |
|  },                                       |       |       |        |         |                                   |                                     |
|  "vehicle": [                             |       |       |        |         |                                   |                                     |
|    {                                      |       |       |        |         |                                   |                                     |
|      "extras": [                          |       |       |        |         |                                   |                                     |
|        "ext warranty",                    |       |       |        |         |                                   |                                     |
|        "paint protection"                 |       |       |        |         |                                   |                                     |
|      ],                                   |       |       |        |         |                                   |                                     |
|      "make": "Honda",                     |       |       |        |         |                                   |                                     |
|      "model": "Civic",                    |       |       |        |         |                                   |                                     |
|      "price": "20275",                    |       |       |        |         |                                   |                                     |
|      "year": "2017"                       |       |       |        |         |                                   |                                     |
|    }                                      |       |       |        |         |                                   |                                     |
|  ]                                        |       |       |        |         |                                   |                                     |
|}                                          |       |       |        |         |                                   |                                     |
|{                                          |2      |NULL   |[0]     |0        |{                                  |[                                    |
|  "customer": [                            |       |       |        |         |  "address": "New York, NY",       |  {                                  |
|    {                                      |       |       |        |         |  "name": "Bradley Greenbloom",    |    "address": "New York, NY",       |
|      "address": "New York, NY",           |       |       |        |         |  "phone": "12127593751"           |    "name": "Bradley Greenbloom",    |
|      "name": "Bradley Greenbloom",        |       |       |        |         |}                                  |    "phone": "12127593751"           |
|      "phone": "12127593751"               |       |       |        |         |                                   |  }                                  |
|    }                                      |       |       |        |         |                                   |]                                    |
|  ],                                       |       |       |        |         |                                   |                                     |
|  "date": "2017-04-28",                    |       |       |        |         |                                   |                                     |
|  "dealership": "Tindel Toyota",           |       |       |        |         |                                   |                                     |
|  "salesperson": {                         |       |       |        |         |                                   |                                     |
|    "id": "274",                           |       |       |        |         |                                   |                                     |
|    "name": "Greg Northrup"                |       |       |        |         |                                   |                                     |
|  },                                       |       |       |        |         |                                   |                                     |
|  "vehicle": [                             |       |       |        |         |                                   |                                     |
|    {                                      |       |       |        |         |                                   |                                     |
|      "extras": [                          |       |       |        |         |                                   |                                     |
|        "ext warranty",                    |       |       |        |         |                                   |                                     |
|        "rust proofing",                   |       |       |        |         |                                   |                                     |
|        "fabric protection"                |       |       |        |         |                                   |                                     |
|      ],                                   |       |       |        |         |                                   |                                     |
|      "make": "Toyota",                    |       |       |        |         |                                   |                                     |
|      "model": "Camry",                    |       |       |        |         |                                   |                                     |
|      "price": "23500",                    |       |       |        |         |                                   |                                     |
|      "year": "2017"                       |       |       |        |         |                                   |                                     |
|    }                                      |       |       |        |         |                                   |                                     |
|  ]                                        |       |       |        |         |                                   |                                     |
|}                                          |       |       |        |         |                                   |                                     |
----------------------------------------------------------------------------------------------------------------------------------------------------------
この DataFrame から、 VALUE フィールドにある各オブジェクトからの name フィールドと address フィールドを選択できます。
df.join_table_function("flatten", col("src")["customer"]).select(col("value")["name"], col("value")["address"]).show()
-------------------------------------------------
|"""VALUE""['NAME']"   |"""VALUE""['ADDRESS']"  |
-------------------------------------------------
|"Joyce Ridgely"       |"San Francisco, CA"     |
|"Bradley Greenbloom"  |"New York, NY"          |
-------------------------------------------------
次のコードは、 特定の型に値をキャスト し、列の名前を変更することにより、前の例に追加します。
df.join_table_function("flatten", col("src")["customer"]).select(col("value")["name"].cast(StringType()).as_("Customer Name"), col("value")["address"].cast(StringType()).as_("Customer Address")).show()
-------------------------------------------
|"Customer Name"     |"Customer Address"  |
-------------------------------------------
|Joyce Ridgely       |San Francisco, CA   |
|Bradley Greenbloom  |New York, NY        |
-------------------------------------------
SQL ステートメントの実行¶
指定した SQL ステートメントを実行するには、 Session クラスの sql メソッドを呼び出し、実行するステートメントを渡します。メソッドは DataFrame を返します。
アクションメソッドの呼び出し があるまで、 SQL ステートメントは実行されないことに注意してください。
# Get the list of the files in a stage.
# The collect() method causes this SQL statement to be executed.
session.sql("create or replace temp stage my_stage").collect()
# Prepend a return statement to return the collect() results in a Python worksheet
[Row(status='Stage area MY_STAGE successfully created.')]
stage_files_df = session.sql("ls @my_stage").collect()
# Prepend a return statement to return the collect() results in a Python worksheet
# Resume the operation of a warehouse.
# Note that you must call the collect method to execute
# the SQL statement.
session.sql("alter warehouse if exists my_warehouse resume if suspended").collect()
# Prepend a return statement to return the collect() results in a Python worksheet
[Row(status='Statement executed successfully.')]
# Set up a SQL statement to copy data from a stage to a table.
session.sql("copy into sample_product_data from @my_stage file_format=(type = csv)").collect()
# Prepend a return statement to return the collect() results in a Python worksheet
[Row(status='Copy executed with 0 files processed.')]
メソッドを呼び出してDataFrame を変換する場合(例: filter、 select など)、これらのメソッドは、基になるSQLステートメントがSELECTステートメントである場合にのみ機能することに注意してください。変換メソッドは、他の種類の SQL ステートメントではサポートされていません。
df = session.sql("select id, parent_id from sample_product_data where id < 10")
# Because the underlying SQL statement for the DataFrame is a SELECT statement,
# you can call the filter method to transform this DataFrame.
results = df.filter(col("id") < 3).select(col("id")).collect()
# Prepend a return statement to return the collect() results in a Python worksheet
# In this example, the underlying SQL statement is not a SELECT statement.
df = session.sql("ls @my_stage")
# Calling the filter method results in an error.
try:
  df.filter(col("size") > 50).collect()
except SnowparkSQLException as e:
  print(e.message)
000904 (42000): SQL compilation error: error line 1 at position 104
invalid identifier 'SIZE'
Snowparkクエリの同時送信¶
注釈
この機能を使用するには、Python用Snowparkライブラリのバージョンが1.24以上、サーバーのバージョンが8.46以上である必要があります。
スレッドセーフなセッションオブジェクトにより、Snowpark Pythonコードの異なる部分を、同じセッションを使用しながら同時に実行することができます。これにより、複数の演算子(複数の DataFrames に対する変換など)を同時に実行することができます。これは、Snowflakeサーバー上で独立して処理できるクエリを扱う場合に特に便利で、より伝統的なマルチスレッドアプローチに沿ったものです。
Python の Global Interpreter Lock (GIL) は Python オブジェクトへのアクセスを保護するミューテックスで、複数のネイティブスレッドが同時に Python バイトコードを実行するのを防ぎます。I/Oバウンドの演算は、 GIL がI/O演算中にリリースされるため、Pythonのスレッドモデルの恩恵をまだ受けることができますが、 CPU-バウンドのスレッドは、一度に1つのスレッドしか実行できないため、真の並列性を達成することはできません。
さらに、Snowflake内部で使用される場合(ストアドプロシージャ内など)、Snowpark PythonサーバーはSnowflakeにクエリを送信する前にグローバルインタープリタロック(GIL)をリリースして管理します。これにより、別々のスレッドから複数のクエリをキューイングする際に、真の同時実行が実現されます。この管理により、Snowparkでは複数のスレッドが同時にクエリを実行できるようになり、最適な並列実行が保証されます。
Snowparkでスレッドセーフなセッションオブジェクトを使用するメリット¶
複数の DataFrame オペレーションを同時に実行できることで、Snowparkユーザーには次のようなメリットがあります。
- パフォーマンスの向上:スレッドセーフなセッションオブジェクトにより、複数のSnowpark Pythonクエリを同時に実行でき、全体的なランタイムを短縮できます。例えば、複数のテーブルを個別に処理する必要がある場合、この機能により、各テーブルの処理が終了するのを待ってから次のテーブルの処理を開始する必要がなくなり、作業完了までの時間が大幅に短縮されます。 
- 効率的なコンピュート利用:クエリを同時に送信することで、Snowflakeのコンピュートリソースを効率的に使用し、アイドル時間を短縮します。 
- ユーザビリティ:スレッドセーフなセッションオブジェクトは、Pythonのネイティブマルチスレッディング APIs とシームレスに統合され、開発者はPythonの組み込みツールを活用してスレッドの動作を制御し、並列実行を最適化することができます。 
スレッドセーフセッションオブジェクトと非同期ジョブは、ユースケースによってお互いを補完することができます。非同期ジョブはジョブの終了を待つ必要がない場合に便利で、スレッドプールを管理することなくノンブロッキングで実行できます。一方、スレッドセーフなセッションオブジェクトは、クライアント側から複数のクエリを同時に送信したい場合に便利です。場合によっては、コードブロックに非同期ジョブを含めることもでき、両方のメソッドを効果的に併用することができます。
以下は、スレッドセーフなセッションオブジェクトがデータパイプラインを強化する例です。
例 1: 複数テーブルの同時ロード¶
この例では、 COPY INTO コマンドを3つのスレッドで同時に実行し、3つの異なる CSV ファイルから3つの異なるテーブルにデータの読み込みを行っています。
import threading
from snowflake.snowpark import Session
# Define the list of tables
tables = ["customers", "orders", "products"]
# Function to copy data from stage to tables
def execute_copy(table_name):
    try:
        # Read data from the stage using DataFrameReader
        df = (
            session.read.option("SKIP_HEADER", 1)
            .option("PATTERN", f"{table_name}[.]csv")
            .option("FORCE", True)
            .csv(f"@my_stage")
        )
        # Copy data into the target table
        df.copy_into_table(
            table_name=table_name, target_columns=session.table(table_name).columns
        )
    except Exception as e:
        print(f"Failed to copy data into {table_name}, Error: {e}")
# Create an empty list of threads
threads = []
# Loop through and start a thread for each table
for table in tables:
    thread = threading.Thread(target=execute_copy, args=(table,))
    threads.append(thread)
    thread.start()
# Wait for all threads to finish
for thread in threads:
    thread.join()
例 2: 複数テーブルの同時処理¶
この例では、複数のスレッドを使用して、各顧客トランザクション・テーブル(transaction_customer1、transaction_customer2、transaction_customer3)から結果テーブルにデータを同時にフィルター、集約、挿入する方法を示します。
from concurrent.futures import ThreadPoolExecutor
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, month, sum, lit
# List of customers
customers = ["customer1", "customer2", "customer3"]
# Define a function to process each customer transaction table
def process_customer_table(customer_name):
    table_name = f"transaction_{customer_name}"
    try:
        # Load the customer transaction table
        df = session.table(table_name)
        print(f"Processing {table_name}...")
        # Filter data by positive values and non null categories
        df_filtered = df.filter((col("value") > 0) & col("category").is_not_null())
        # Perform aggregation: Sum of value by category and month
        df_aggregated = df_filtered.with_column("month", month(col("date"))).with_column("customer_name", lit(customer_name)).group_by(col("category"), col("month"), col("customer_name")).agg(sum("value").alias("total_value"))
        # Save the processed data into a new result table
        df_aggregated.show()
        df_aggregated.write.save_as_table("aggregate_customers", mode="append")
        print(f"Data from {table_name} processed and saved")
    except Exception as e:
        print(f"Error processing {table_name}: {e}")
# Using ThreadPoolExecutor to handle concurrency
with ThreadPoolExecutor(max_workers=3) as executor:
    # Submit tasks for each customer table
    executor.map(process_customer_table, customers)
# Display the results from the aggregate table
session.table("aggregate_customers").show()
スレッドセーフ・セッション・オブジェクトを使うことの限界¶
- 複数のトランザクションを同時に管理する必要がある場合は、複数のセッションオブジェクトを使用することが重要です。なぜなら、1つのセッションの複数のスレッドは、同時実行トランザクションをサポートしていないからです。 
- 他のスレッドがアクティブな状態でセッションのランタイム構成(データベース、スキーマ、ウェアハウスなどの Snowflake セッション変数、cte_optimization_enabled、sql_simplifier_enabled などのクライアント側の構成を含む)を変更すると、予期しない動作が発生する可能性があります。競合を避けるために、異なるスレッドが異なる構成を必要とする場合は、別々のセッションオブジェクトを使うのが最善です。例えば、異なるデータベースに対して並列にオペレーションを実行する必要がある場合、同じセッションを共有するのではなく、各スレッドが独自のセッションオブジェクトを持つようにしてください。 
DataFrameのコンテンツをPandas DataFrameとして返す¶
DataFrameのコンテンツをPandas DataFrameとして返すには、 to_pandas メソッドを使用します。
例:
python_df = session.create_dataframe(["a", "b", "c"])
pandas_df = python_df.to_pandas()
Snowpark DataFrames vs Snowpark pandas DataFrame: どちらを選ぶべきでしょうか?¶
Snowpark Pythonライブラリをインストールすることで、 DataFrames API または pandas on Snowflake を使用するオプションがあります。
Snowpark DataFrames は PySpark をモデルにしている一方で、Snowpark pandas は Snowpark DataFrame の機能を拡張し、pandasユーザーに対して慣れ親しんだインターフェイスを提供して、移行や採用を容易にすることを目的としています。用途や好みに応じて、 APIs を使い分けることをお勧めします:
| 次の場合はSnowpark pandas をご使用ください: | 次の場合はSnowpark DataFrames をご使用ください: | 
|---|---|
| pandasで書かれたコードを使用しているか、既存のコードがあることを好む。 | Sparkで書かれたコードを使用しているか、既存のコードがあることを好む。 | 
| インタラクティブな分析と反復的な探索を含むワークフローを持つ。 | バッチ処理と限定的な反復開発を含むワークフローを持つ。 | 
| 即座に実行される DataFrame の操作に慣れている | DataFrame の遅延評価される操作に慣れている | 
| オペレーション中、データが一貫性を持ち、秩序立っていることを好む。 | データが注文されなくても構わない | 
| Snowpark DataFrames に比べて、 API の使いやすさを優先するため、多少性能が遅くても構わない | 使いやすさよりも性能を重視する。 | 
実装の観点からは、Snowpark DataFrames とpandas DataFrames は意味的に異なります。Snowpark DataFrames は、 PySpark をモデルにしているので、オリジナルのデータソースを操作し、最新の更新データを取得し、操作の順序は維持しません。Snowpark pandas はpandasをモデルにしています。データのスナップショットを操作し、操作中の順序を維持し、順序ベースの位置インデックスを作成できます。順序の維持は、インタラクティブなデータ分析でデータを視覚的に検査するのに便利である。
詳細については、 Using pandas on Snowflake with Snowpark DataFrames をご参照ください。