Snowpark PythonでのDataFramesの使用¶
Snowpark内でデータをクエリして処理する主な方法は、 DataFrame を使用することです。このトピックでは、 DataFrames の操作方法について説明します。
このトピックの内容:
データを取得して操作するには、 DataFrame
クラスを使用します。DataFrame は、遅延評価されるリレーショナルデータセットを表します。これは、特定のアクションがトリガーされたときにのみ実行されます。ある意味で、 DataFrame は、データを取得するために評価する必要があるクエリのようなものです。
データを DataFrame に取得するには、
DataFrame を作成し、データセットのためにデータのソースを指定 します。
例えば、テーブル、外部CSVファイル、ローカルデータ、またはSQLステートメントの実行からのデータを保持するDataFrameを作成できます。
DataFrame のデータセットを変換する方法を指定 します。
たとえば、どの列を選択するか、行をどのようにフィルタリングするか、結果をどのようにソートおよびグループ化するかなどを指定できます。
ステートメントを実行して、データをDataFrame に取得 します。
データを DataFrame に取得するには、アクションを実行するメソッド(例:
collect()
メソッド)を呼び出す必要があります。
次のセクションでは、これらのステップについて詳しく説明します。
このセクションの例の設定¶
このセクションの例のいくつかは、 DataFrame を使用して sample_product_data
という名前のテーブルをクエリします。これらの例を実行する場合は、次の SQL ステートメントを実行することにより、このテーブルを作成し、テーブルにデータを入力できます。
>>> session.sql('CREATE OR REPLACE TABLE sample_product_data (id INT, parent_id INT, category_id INT, name VARCHAR, serial_number VARCHAR, key INT, "3rd" INT)').collect()
[Row(status='Table SAMPLE_PRODUCT_DATA successfully created.')]
>>> session.sql("""
... INSERT INTO sample_product_data VALUES
... (1, 0, 5, 'Product 1', 'prod-1', 1, 10),
... (2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20),
... (3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30),
... (4, 0, 10, 'Product 2', 'prod-2', 2, 40),
... (5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50),
... (6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60),
... (7, 0, 20, 'Product 3', 'prod-3', 3, 70),
... (8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80),
... (9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90),
... (10, 0, 50, 'Product 4', 'prod-4', 4, 100),
... (11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100),
... (12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100)
... """).collect()
[Row(number of rows inserted=12)]
テーブルが作成されたことを確認するには、次のコマンドを実行します。
>>> session.sql("SELECT count(*) FROM sample_product_data").collect()
[Row(COUNT(*)=12)]
DataFrame の構築¶
DataFrameを作成するには、 Session
クラスのメソッドとプロパティを使用できます。次の各メソッドは、異なるタイプのデータソースから DataFrame を作成します。
テーブル、ビュー、またはストリームのデータから DataFrame を作成するには、
table
メソッドを呼び出します。>>> # Create a DataFrame from the data in the "sample_product_data" table. >>> df_table = session.table("sample_product_data") # To print out the first 10 rows, call df_table.show()
指定された値から DataFrame を作成するには、
create_dataframe
メソッドを呼び出します。>>> # Create a DataFrame from specified values. >>> df1 = session.create_dataframe([1, 2, 3, 4]).to_df("a") # one column, named "a" >>> df1.show() ------- |"A" | ------- |1 | |2 | |3 | |4 | ------- >>> # Create a DataFrame with 4 columns, "a", "b", "c" and "d". >>> df2 = session.create_dataframe([[1, 2, 3, 4]], schema=["a", "b", "c", "d"]) >>> df2.show() ------------------------- |"A" |"B" |"C" |"D" | ------------------------- |1 |2 |3 |4 | ------------------------- >>> # Create another DataFrame with 4 columns, "a", "b", "c" and "d". >>> from snowflake.snowpark import Row >>> df3 = session.create_dataframe([Row(a=1, b=2, c=3, d=4)]) >>> df3.show() ------------------------- |"A" |"B" |"C" |"D" | ------------------------- |1 |2 |3 |4 | ------------------------- >>> # Create a DataFrame and specify a schema >>> from snowflake.snowpark.types import IntegerType, StringType, StructType, StructField >>> schema = StructType([StructField("a", IntegerType()), StructField("b", StringType())]) >>> df4 = session.create_dataframe([[1, "snow"], [3, "flake"]], schema) >>> df4.show() --------------- |"A" |"B" | --------------- |1 |snow | |3 |flake | ---------------
値の範囲を含む DataFrame を作成するには、
range
メソッドを呼び出します。>>> # Create a DataFrame from a range >>> # The dataframe will contain rows with values 1, 3, 5, 7, and 9 respectively. >>> df_range = session.range(1, 10, 2).to_df("a") >>> df_range.show() ------- |"A" | ------- |1 | |3 | |5 | |7 | |9 | -------
ステージ内のファイルからのデータを保持するDataFrameを作成するには、
read
プロパティを使用して、DataFrameReader
オブジェクトを取得します。DataFrameReader
オブジェクトで、ファイル内のデータの形式に対応するメソッドを呼び出します。>>> from snowflake.snowpark.types import StructType, StructField, StringType, IntegerType >>> # Create DataFrames from data in a stage. >>> df_json = session.read.json("@my_stage2/data1.json") >>> df_catalog = session.read.schema(StructType([StructField("name", StringType()), StructField("age", IntegerType())])).csv("@stage/some_dir")
SQL クエリの結果を保持する DataFrame を作成するには、
sql
メソッドを呼び出します。>>> # Create a DataFrame from a SQL query >>> df_sql = session.sql("SELECT name from sample_product_data") >>> df_sql.show() -------------- |"NAME" | -------------- |Product 1 | |Product 1A | |Product 1B | |Product 2 | |Product 2A | |Product 2B | |Product 3 | |Product 3A | |Product 3B | |Product 4 | --------------
このメソッドを使用して、テーブルとステージングされたファイルからデータを取得するSELECTステートメントを実行できますが、代わりに
table
メソッドとread
プロパティを使用する必要があります。これは、開発ツールでより優れた構文の強調表示、エラーの強調表示、およびコード補完を提供します。
データセットの変換方法の指定¶
選択する列と、結果のフィルタリング、並べ替え、グループ化などを指定するには、データセットを変換する DataFrame メソッドを呼び出します。これらのメソッドで列を識別するには、 col
関数または列に評価される式を使用します。(列と式の指定 を参照。)
例:
返される行を指定するには、
filter
メソッドを呼び出します。>>> # Import the col function from the functions module. >>> from snowflake.snowpark.functions import col >>> # Create a DataFrame for the rows with the ID 1 >>> # in the "sample_product_data" table. >>> # This example uses the == operator of the Column object to perform an >>> # equality check. >>> df = session.table("sample_product_data").filter(col("id") == 1) >>> df.show() ------------------------------------------------------------------------------------ |"ID" |"PARENT_ID" |"CATEGORY_ID" |"NAME" |"SERIAL_NUMBER" |"KEY" |"3rd" | ------------------------------------------------------------------------------------ |1 |0 |5 |Product 1 |prod-1 |1 |10 | ------------------------------------------------------------------------------------
選択する列を指定するには、
select
メソッドを呼び出します。>>> # Import the col function from the functions module. >>> from snowflake.snowpark.functions import col >>> # Create a DataFrame that contains the id, name, and serial_number >>> # columns in the "sample_product_data" table. >>> df = session.table("sample_product_data").select(col("id"), col("name"), col("serial_number")) >>> df.show() --------------------------------------- |"ID" |"NAME" |"SERIAL_NUMBER" | --------------------------------------- |1 |Product 1 |prod-1 | |2 |Product 1A |prod-1-A | |3 |Product 1B |prod-1-B | |4 |Product 2 |prod-2 | |5 |Product 2A |prod-2-A | |6 |Product 2B |prod-2-B | |7 |Product 3 |prod-3 | |8 |Product 3A |prod-3-A | |9 |Product 3B |prod-3-B | |10 |Product 4 |prod-4 | ---------------------------------------
次のような列を参照することもできます。
>>> # Import the col function from the functions module. >>> from snowflake.snowpark.functions import col >>> df_product_info = session.table("sample_product_data") >>> df1 = df_product_info.select(df_product_info["id"], df_product_info["name"], df_product_info["serial_number"]) >>> df2 = df_product_info.select(df_product_info.id, df_product_info.name, df_product_info.serial_number) >>> df3 = df_product_info.select("id", "name", "serial_number")
各メソッドは、変換された新しい DataFrame オブジェクトを返します。(このメソッドは元の DataFrame オブジェクトには影響を及ぼしません。)つまり、複数の変換を適用する場合は、 チェーンメソッド呼び出し を実行して、前のメソッド呼び出しによって返された、新しい DataFrame オブジェクトに対する後続の各変換メソッドを呼び出すことができます。
これらの変換方法は、Snowflakeデータベースからデータを取得しないことに注意してください。(DataFrame を評価するアクションの実行 で説明されているアクションメソッドは、データ取得を実行します。)変換メソッドは、 SQL ステートメントの作成方法を指定するだけです。
DataFrames の結合¶
DataFrame オブジェクトを結合するには、 join
メソッドを呼び出します。
>>> # Create two DataFrames to join >>> df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"]) >>> df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"]) >>> # Create a DataFrame that joins the two DataFrames >>> # on the column named "key". >>> df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs["key"].as_("key"), "value1", "value2").show() ------------------------------- |"KEY" |"VALUE1" |"VALUE2" | ------------------------------- |a |1 |3 | |b |2 |4 | ------------------------------- >>> # Both dataframes have the same column "key", the following is more convenient. >>> df_lhs.join(df_rhs, ["key"]).show() ------------------------------- |"KEY" |"VALUE1" |"VALUE2" | ------------------------------- |a |1 |3 | |b |2 |4 | ------------------------------- >>> # Use & operator connect join expression. '|' and ~ are similar. >>> df_joined_multi_column = df_lhs.join(df_rhs, (df_lhs.col("key") == df_rhs.col("key")) & (df_lhs.col("value1") < df_rhs.col("value2"))).select(df_lhs["key"].as_("key"), "value1", "value2") >>> df_joined_multi_column.show() ------------------------------- |"KEY" |"VALUE1" |"VALUE2" | ------------------------------- |a |1 |3 | |b |2 |4 | ------------------------------- >>> # copy the DataFrame if you want to do a self-join >>> from copy import copy >>> df_lhs_copied = copy(df_lhs) >>> df_self_joined = df_lhs.join(df_lhs_copied, (df_lhs.col("key") == df_lhs_copied.col("key")) & (df_lhs.col("value1") == df_lhs_copied.col("value1")))
データフレームに重複する列がある場合、Snowparkはランダムに生成されたプレフィックスを結合結果の列に追加することに注意してください。
>>> df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).show() ----------------------------------------------------- |"l_av5t_KEY" |"VALUE1" |"r_1p6k_KEY" |"VALUE2" | ----------------------------------------------------- |a |1 |a |3 | |b |2 |b |4 | -----------------------------------------------------
Column.alias
を使用して重複する列を参照できます。
>>> df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs["key"].alias("key1"), df_rhs["key"].alias("key2"), "value1", "value2").show() ----------------------------------------- |"KEY1" |"KEY2" |"VALUE1" |"VALUE2" | ----------------------------------------- |a |a |1 |3 | |b |b |2 |4 | -----------------------------------------
ランダムなプレフィックスを避けるために、重複する列に追加するサフィックスを指定できます。
>>> df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key"), lsuffix="_left", rsuffix="_right").show() -------------------------------------------------- |"KEY_LEFT" |"VALUE1" |"KEY_RIGHT" |"VALUE2" | -------------------------------------------------- |a |1 |a |3 | |b |2 |b |4 | --------------------------------------------------
これらの例では、 DataFrame.col
を使用して、結合で使用する列を指定しています。これを実行するその他の方法については、 列と式の指定 をご参照ください。
異なる列でテーブルをそれ自体と結合する必要がある場合は、単一の DataFrame で自己結合を実行することはできません。単一の DataFrame を使用して自己結合を実行する次の例は、 "id"
の列式が結合の左側と右側に存在するため、失敗します。
>>> from snowflake.snowpark.exceptions import SnowparkJoinException >>> df = session.table("sample_product_data") >>> # This fails because columns named "id" and "parent_id" >>> # are in the left and right DataFrames in the join. >>> try: ... df_joined = df.join(df, col("id") == col("parent_id")) # fails ... except SnowparkJoinException as e: ... print(e.message) You cannot join a DataFrame with itself because the column references cannot be resolved correctly. Instead, create a copy of the DataFrame with copy.copy(), and join the DataFrame with this copy. >>> # This fails because columns named "id" and "parent_id" >>> # are in the left and right DataFrames in the join. >>> try: ... df_joined = df.join(df, df["id"] == df["parent_id"]) # fails ... except SnowparkJoinException as e: ... print(e.message) You cannot join a DataFrame with itself because the column references cannot be resolved correctly. Instead, create a copy of the DataFrame with copy.copy(), and join the DataFrame with this copy.
代わりに、Pythonのビルトイン copy()
メソッドを使用してDataFrameオブジェクトのクローンを作成し、2つのDataFrameオブジェクトを使用して結合を実行します。
>>> from copy import copy >>> # Create a DataFrame object for the "sample_product_data" table for the left-hand side of the join. >>> df_lhs = session.table("sample_product_data") >>> # Clone the DataFrame object to use as the right-hand side of the join. >>> df_rhs = copy(df_lhs) >>> # Create a DataFrame that joins the two DataFrames >>> # for the "sample_product_data" table on the >>> # "id" and "parent_id" columns. >>> df_joined = df_lhs.join(df_rhs, df_lhs.col("id") == df_rhs.col("parent_id")) >>> df_joined.count() 8
列と式の指定¶
これらの変換メソッドを呼び出すときは、列または列を使用する式を指定する必要がある場合があります。たとえば、 select
メソッドを呼び出すときは、選択する必要のある列を指定する必要があります。
列を参照するには、 snowflake.snowpark.functions
モジュールで col
関数を呼び出して、 Column
オブジェクトを作成します。
>>> # Import the col function from the functions module.
>>> from snowflake.snowpark.functions import col
>>> df_product_info = session.table("sample_product_data").select(col("id"), col("name"))
>>> df_product_info.show()
---------------------
|"ID" |"NAME" |
---------------------
|1 |Product 1 |
|2 |Product 1A |
|3 |Product 1B |
|4 |Product 2 |
|5 |Product 2A |
|6 |Product 2B |
|7 |Product 3 |
|8 |Product 3A |
|9 |Product 3B |
|10 |Product 4 |
---------------------
注釈
リテラルの Column
オブジェクトを作成するには、 列オブジェクトとしてのリテラルの使用 をご参照ください。
フィルター、プロジェクション、結合条件などを指定する場合は、式で Column
オブジェクトを使用できます。例:
filter
メソッドでColumn
オブジェクトを使用して、フィルター条件を指定できます。>>> # Specify the equivalent of "WHERE id = 20" >>> # in a SQL SELECT statement. >>> df_filtered = df.filter(col("id") == 20)
>>> df = session.create_dataframe([[1, 3], [2, 10]], schema=["a", "b"]) >>> # Specify the equivalent of "WHERE a + b < 10" >>> # in a SQL SELECT statement. >>> df_filtered = df.filter((col("a") + col("b")) < 10) >>> df_filtered.show() ------------- |"A" |"B" | ------------- |1 |3 | -------------
select
メソッドでColumn
オブジェクトを使用して、エイリアスを定義できます。>>> # Specify the equivalent of "SELECT b * 10 AS c" >>> # in a SQL SELECT statement. >>> df_selected = df.select((col("b") * 10).as_("c")) >>> df_selected.show() ------- |"C" | ------- |30 | |100 | -------
join
メソッドでColumn
オブジェクトを使用して、結合条件を定義できます。>>> dfX = session.create_dataframe([[1], [2]], schema=["a_in_X"]) >>> dfY = session.create_dataframe([[1], [3]], schema=["b_in_Y"]) >>> # Specify the equivalent of "X JOIN Y on X.a_in_X = Y.b_in_Y" >>> # in a SQL SELECT statement. >>> df_joined = dfX.join(dfY, col("a_in_X") == col("b_in_Y")).select(dfX["a_in_X"].alias("the_joined_column")) >>> df_joined.show() ----------------------- |"THE_JOINED_COLUMN" | ----------------------- |1 | -----------------------
同じ名前の2つの異なる DataFrame オブジェクトの列を参照する場合(例: その列の DataFrames を結合する場合)、1つの DataFrame オブジェクトにある DataFrame.col
メソッドを使用して、そのオブジェクトにある列(例: df1.col("name")
と df2.col("name")
)を参照できます。
次の例は、 DataFrame.col
メソッドを使用して特定の DataFrame の列を参照する方法を示しています。この例では、両方とも key
という名前の列を持つ2つの DataFrame オブジェクトを結合します。この例では、 Column.as
メソッドを使用して、新しく作成された DataFrame の列の名前を変更します。
>>> # Create two DataFrames to join
>>> df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value"])
>>> df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value"])
>>> # Create a DataFrame that joins two other DataFrames (df_lhs and df_rhs).
>>> # Use the DataFrame.col method to refer to the columns used in the join.
>>> df_joined = df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs.col("key").as_("key"), df_lhs.col("value").as_("L"), df_rhs.col("value").as_("R"))
>>> df_joined.show()
---------------------
|"KEY" |"L" |"R" |
---------------------
|a |1 |3 |
|b |2 |4 |
---------------------
オブジェクト識別子(テーブル名、列名など)の前後での二重引用符の使用¶
指定するデータベース、スキーマ、テーブル、およびステージの名前は、 Snowflake識別子の要件 に準拠している必要があります。
大文字と小文字を区別する列を持つテーブルを作成します。
>>> session.sql("""
... create or replace temp table "10tablename"(
... id123 varchar, -- case insensitive because it's not quoted.
... "3rdID" varchar, -- case sensitive.
... "id with space" varchar -- case sensitive.
... )""").collect()
[Row(status='Table 10tablename successfully created.')]
>>> session.sql("""insert into "10tablename" (id123, "3rdID", "id with space") values ('a', 'b', 'c')""").collect()
[Row(number of rows inserted=1)]
>>> df = session.table('"10tablename"')
>>> df.show()
---------------------------------------
|"ID123" |"3rdID" |"id with space" |
---------------------------------------
|a |b |c |
---------------------------------------
名前を指定すると、Snowflakeはその名前を大文字と見なします。たとえば、次の呼び出しは同等です。
>>> # The following calls are equivalent:
>>> df.select(col("id123")).collect()
[Row(ID123='a')]
>>> df.select(col("ID123")).collect()
[Row(ID123='a')]
名前が識別子の要件に準拠していない場合は、名前を二重引用符("
)で囲む必要があります。バックスラッシュ(\
)を使用して、文字列リテラル内の二重引用符をエスケープします。たとえば、次のテーブル名は文字やアンダースコアで始まらないため、名前を二重引用符で囲む必要があります。
>>> df = session.table("\"10tablename\"")
または、バックスラッシュの代わりに一重引用符を使用して、文字列リテラル内の二重引用符をエスケープすることもできます。
>>> df = session.table('"10tablename"')
列 の名前を指定するときは、名前を二重引用符で囲む必要はありません。名前が識別子の要件に準拠していない場合、Snowparkライブラリは、列名を自動的に二重引用符で囲みます。
>>> # The following calls are equivalent:
>>> df.select(col("3rdID")).collect()
[Row(3rdID='b')]
>>> df.select(col("\"3rdID\"")).collect()
[Row(3rdID='b')]
>>> # The following calls are equivalent:
>>> df.select(col("id with space")).collect()
[Row(id with space='c')]
>>> df.select(col("\"id with space\"")).collect()
[Row(id with space='c')]
すでに列名を二重引用符で囲んでいる場合、ライブラリは名前を二重引用符で囲みません。
場合によっては、列名に二重引用符が含まれることがあります。
>>> session.sql('''
... create or replace temp table quoted(
... "name_with_""air""_quotes" varchar,
... """column_name_quoted""" varchar
... )''').collect()
[Row(status='Table QUOTED successfully created.')]
>>> session.sql('''insert into quoted ("name_with_""air""_quotes", """column_name_quoted""") values ('a', 'b')''').collect()
[Row(number of rows inserted=1)]
識別子の要件 で説明されているように、二重引用符で囲まれた識別子内の二重引用符文字ごとに、2つの二重引用符文字(例: "name_with_""air""_quotes"
と """column_name_quoted"""
)を使用する必要があります。
>>> df_table = session.table("quoted")
>>> df_table.select("\"name_with_\"\"air\"\"_quotes\"").collect()
[Row(name_with_"air"_quotes='a')]
>>> df_table.select("\"\"\"column_name_quoted\"\"\"").collect()
[Row("column_name_quoted"='b')]
識別子が二重引用符で囲まれている場合は(明示的に引用符を追加したか、ライブラリが引用符を追加したかに関係なく)、 Snowflakeは識別子を大文字と小文字を区別する ものとして扱います。
>>> # The following calls are NOT equivalent!
>>> # The Snowpark library adds double quotes around the column name,
>>> # which makes Snowflake treat the column name as case-sensitive.
>>> df.select(col("id with space")).collect()
[Row(id with space='c')]
>>> from snowflake.snowpark.exceptions import SnowparkSQLException
>>> try:
... df.select(col("ID WITH SPACE")).collect()
... except SnowparkSQLException as e:
... print(e.message)
000904 (42000): SQL compilation error: error line 1 at position 7
invalid identifier '"ID WITH SPACE"'
列オブジェクトとしてのリテラルの使用¶
Column
オブジェクトを引数として取るメソッドでリテラルを使用するには、リテラルを snowflake.snowpark.functions
モジュールの lit
関数に渡して、リテラルの Column
オブジェクトを作成します。例:
>>> # Import for the lit and col functions.
>>> from snowflake.snowpark.functions import col, lit
>>> # Show the first 10 rows in which num_items is greater than 5.
>>> # Use `lit(5)` to create a Column object for the literal 5.
>>> df_filtered = df.filter(col("num_items") > lit(5))
列オブジェクトの特定の型へのキャスト¶
Column
オブジェクトを特定の型にキャストするには、 cast
メソッドを呼び出し、 snowflake.snowpark.types
モジュールから型オブジェクトを渡します。たとえば、リテラルを5の精度と2のスケールで NUMBER としてキャストするには、次のようにします。
>>> # Import for the lit function.
>>> from snowflake.snowpark.functions import lit
>>> # Import for the DecimalType class.
>>> from snowflake.snowpark.types import DecimalType
>>> decimal_value = lit(0.05).cast(DecimalType(5,2))
メソッド呼び出しのチェーン¶
DataFrame オブジェクトを変換するメソッド ごとに、変換が適用された新しい DataFrame オブジェクトが返されるため、 メソッド呼び出しのチェーン により、追加の方法で変換される新しい DataFrame を生成できます。
次の例では、次のように構成された DataFrame を返します。
sample_product_data
テーブルをクエリします。id = 1
で行を返します。name
およびserial_number
列を選択します。>>> df_product_info = session.table("sample_product_data").filter(col("id") == 1).select(col("name"), col("serial_number")) >>> df_product_info.show() ------------------------------- |"NAME" |"SERIAL_NUMBER" | ------------------------------- |Product 1 |prod-1 | -------------------------------
この例では、
session.table("sample_product_data")
は、sample_product_data
テーブルの DataFrame を返します。DataFrame にはまだテーブルのデータが含まれていませんが、オブジェクトにはテーブルの列の定義が含まれています。
filter(col("id") == 1)
は、id = 1
で行を返すように設定されたsample_product_data
テーブルの DataFrame を返します。DataFrameには、テーブルからの一致する行がまだ含まれていないことに改めて注意してください。 アクションメソッドを呼び出す まで、一致する行は取得されません。
select(col("name"), col("serial_number"))
は、id = 1
を持つsample_product_data
テーブルの行のname
列とserial_number
列を含む DataFrame を返します。
メソッド呼び出しをチェーンするときは、呼び出しの順序が重要であることに注意してください。各メソッド呼び出しは、変換された DataFrame を返します。後続の呼び出しが変換された DataFrame で機能することを確認します。
たとえば、次のコードでは、 select
メソッドは name
と serial_number
の2つの列のみを含む DataFrame を返します。この DataFrame の filter
メソッド呼び出しは、変換された DataFrame にない id
列を使用しているため失敗します。
>>> # This fails with the error "invalid identifier 'ID'."
>>> df_product_info = session.table("sample_product_data").select(col("name"), col("serial_number")).filter(col("id") == 1)
>>> try:
... df_product_info.show()
... except SnowparkSQLException as e:
... print(e.message)
000904 (42000): SQL compilation error: error line 1 at position 121
invalid identifier 'ID'
対照的に、次のコードは、 sample_product_data
テーブルのすべての列(id
列を含む)を含む DataFrame で filter()
メソッドが呼び出されるため、正常に実行されます。
>>> # This succeeds because the DataFrame returned by the table() method
>>> # includes the "id" column.
>>> df_product_info = session.table("sample_product_data").filter(col("id") == 1).select(col("name"), col("serial_number"))
>>> df_product_info.show()
-------------------------------
|"NAME" |"SERIAL_NUMBER" |
-------------------------------
|Product 1 |prod-1 |
-------------------------------
SQL ステートメントで同等のキーワード(SELECT および WHERE)を使用する場合とは異なる順序で、 select
および filter
メソッドの呼び出しが必要になる場合があることに注意してください。
列定義の取得¶
DataFrameのためにデータセット内の列の定義を取得するには、 schema
プロパティを呼び出します。このメソッドは、 StructField
オブジェクトの list
を含む StructType
オブジェクトを返します。各 StructField
オブジェクトには、列の定義が含まれています。
>>> # Get the StructType object that describes the columns in the
>>> # underlying rowset.
>>> table_schema = session.table("sample_product_data").schema
>>> table_schema
StructType([StructField('ID', LongType(), nullable=True), StructField('PARENT_ID', LongType(), nullable=True), StructField('CATEGORY_ID', LongType(), nullable=True), StructField('NAME', StringType(), nullable=True), StructField('SERIAL_NUMBER', StringType(), nullable=True), StructField('KEY', LongType(), nullable=True), StructField('"3rd"', LongType(), nullable=True)])
返された StructType
オブジェクトでは、列名は常に正規化されています。引用符で囲まれていない識別子は大文字で返され、引用符で囲まれた識別子は定義された正確な大文字小文字で返されます。
次の例では、 ID
および 3rd
という名前の列を含む DataFrame を作成します。列名 3rd
の場合は、 名前が識別子の要件に準拠していない ため、Snowparkライブラリは名前を自動的に二重引用符("3rd"
)で囲みます。
この例では、 schema
プロパティを呼び出してから、返された StructType
オブジェクトで names
プロパティを呼び出して、列名の list
を取得します。名前は、 schema
プロパティによって返される StructType
で正規化されます。
>>> # Create a DataFrame containing the "id" and "3rd" columns.
>>> df_selected_columns = session.table("sample_product_data").select(col("id"), col("3rd"))
>>> # Print out the names of the columns in the schema. This prints out:
>>> # This prints List["ID", "\"3rd\""]
>>> df_selected_columns.schema.names
['ID', '"3rd"']
DataFrame を評価するアクションの実行¶
前述のように、 DataFrame は遅延評価されます。つまり、アクションを実行するまで、 SQL ステートメントは実行のためにサーバーに送信されません。アクションにより、 DataFrame が評価され、対応する SQL ステートメントが実行のためにサーバーに送信されます。
このリリースでは、次のメソッドがアクションを実行します。
クラス |
メソッド |
説明 |
---|---|---|
|
|
DataFrame を評価し、結果のデータセットを |
|
|
DataFrame を評価し、行数を返します。 |
|
|
DataFrame を評価し、行をコンソールに出力します。この方法では、行数が10に制限されることに注意してください(デフォルト)。 |
|
|
DataFrame のデータを指定したテーブルに保存します。 テーブルへのデータの保存 をご参照ください。 |
たとえば、テーブルに対してクエリを実行して結果を返すには、 collect
メソッドを呼び出します。
>>> # Create a DataFrame with the "id" and "name" columns from the "sample_product_data" table.
>>> # This does not execute the query.
>>> df = session.table("sample_product_data").select(col("id"), col("name"))
>>> # Send the query to the server for execution and
>>> # return a list of Rows containing the results.
>>> results = df.collect()
クエリを実行して結果の数を返すには、 count
メソッドを呼び出します。
>>> # Create a DataFrame for the "sample_product_data" table.
>>> df_products = session.table("sample_product_data")
>>> # Send the query to the server for execution and
>>> # print the count of rows in the table.
>>> print(df_products.count())
12
クエリを実行して結果をコンソールに出力するには、 show
メソッドを呼び出します。
>>> # Create a DataFrame for the "sample_product_data" table.
>>> df_products = session.table("sample_product_data")
>>> # Send the query to the server for execution and
>>> # print the results to the console.
>>> # The query limits the number of rows to 10 by default.
>>> df_products.show()
-------------------------------------------------------------------------------------
|"ID" |"PARENT_ID" |"CATEGORY_ID" |"NAME" |"SERIAL_NUMBER" |"KEY" |"3rd" |
-------------------------------------------------------------------------------------
|1 |0 |5 |Product 1 |prod-1 |1 |10 |
|2 |1 |5 |Product 1A |prod-1-A |1 |20 |
|3 |1 |5 |Product 1B |prod-1-B |1 |30 |
|4 |0 |10 |Product 2 |prod-2 |2 |40 |
|5 |4 |10 |Product 2A |prod-2-A |2 |50 |
|6 |4 |10 |Product 2B |prod-2-B |2 |60 |
|7 |0 |20 |Product 3 |prod-3 |3 |70 |
|8 |7 |20 |Product 3A |prod-3-A |3 |80 |
|9 |7 |20 |Product 3B |prod-3-B |3 |90 |
|10 |0 |50 |Product 4 |prod-4 |4 |100 |
-------------------------------------------------------------------------------------
>>> # Limit the number of rows to 20, rather than 10.
>>> df_products.show(20)
-------------------------------------------------------------------------------------
|"ID" |"PARENT_ID" |"CATEGORY_ID" |"NAME" |"SERIAL_NUMBER" |"KEY" |"3rd" |
-------------------------------------------------------------------------------------
|1 |0 |5 |Product 1 |prod-1 |1 |10 |
|2 |1 |5 |Product 1A |prod-1-A |1 |20 |
|3 |1 |5 |Product 1B |prod-1-B |1 |30 |
|4 |0 |10 |Product 2 |prod-2 |2 |40 |
|5 |4 |10 |Product 2A |prod-2-A |2 |50 |
|6 |4 |10 |Product 2B |prod-2-B |2 |60 |
|7 |0 |20 |Product 3 |prod-3 |3 |70 |
|8 |7 |20 |Product 3A |prod-3-A |3 |80 |
|9 |7 |20 |Product 3B |prod-3-B |3 |90 |
|10 |0 |50 |Product 4 |prod-4 |4 |100 |
|11 |10 |50 |Product 4A |prod-4-A |4 |100 |
|12 |10 |50 |Product 4B |prod-4-B |4 |100 |
-------------------------------------------------------------------------------------
注: DataFrameの列の定義を取得するために schema
プロパティを呼び出す場合は、アクションメソッドを呼び出す必要はありません。
DataFrameのコンテンツをPandas DataFrameとして返す¶
DataFrameのコンテンツをPandas DataFrameとして返すには、 to_pandas
メソッドを使用します。
例:
>>> python_df = session.create_dataframe(["a", "b", "c"])
>>> pandas_df = python_df.to_pandas()
テーブルへのデータの保存¶
DataFrame の内容をテーブルに保存するには、
write
プロパティを呼び出して、DataFrameWriter
オブジェクトを取得します。DataFrameWriter
オブジェクトのmode
メソッドを呼び出し、テーブルの行を挿入するか更新するかを指定します。このメソッドは、指定されたモードで構成された新しいDataFrameWriter
オブジェクトを返します。DataFrameWriter
オブジェクトのsave_as_table
メソッドを呼び出し、 DataFrame の内容を指定されたテーブルに保存します。
データをテーブルに保存する SQL ステートメントを実行するために、別のメソッド(例: collect
)を呼び出す必要はありません。
例:
>>> df.write.mode("overwrite").save_as_table("table1")
DataFrame からのビューの作成¶
DataFrameからビューを作成するには、 create_or_replace_view
メソッドを呼び出します。これにより新しいビューがすぐに作成されます。
>>> import os
>>> database = os.environ["snowflake_database"] # use your own database and schema
>>> schema = os.environ["snowflake_schema"]
>>> view_name = "my_view"
>>> df.create_or_replace_view(f"{database}.{schema}.{view_name}")
[Row(status='View MY_VIEW successfully created.')]
create_or_replace_view
を呼び出して作成したビューは永続的です。そのビューが不要になった場合は、 ビューを手動でドロップ できます。
または、仮ビューを作成する create_or_replace_temp_view
メソッドを使用します。仮ビューは、それが作成されたセッションでのみ使用できます。
ステージでのファイルの操作¶
このセクションでは、Snowflakeステージでファイル内のデータをクエリする方法について説明します。ファイルに対するその他の操作には、 SQL ステートメント を使用します。
Snowflakeステージのファイル内のデータをクエリするには、 DataFrameReader
クラスを使用します。
Session
クラスのread
メソッドを呼び出して、DataFrameReader
オブジェクトにアクセスします。ファイルが CSV 形式の場合は、ファイルのフィールドを記述します。これを実行するには、
ファイル内のフィールドを説明する
StructType
オブジェクトのlist
で構成されるStructField
オブジェクトを作成します。StructField
オブジェクトごとに、以下を指定します。フィールドの名前。
フィールドのデータ型(
snowflake.snowpark.types
モジュールでオブジェクトとして指定)。フィールドがNULL可能かどうか。
例:
>>> from snowflake.snowpark.types import * >>> schema_for_data_file = StructType([ ... StructField("id", StringType()), ... StructField("name", StringType()) ... ])
DataFrameReader
オブジェクトでschema
プロパティを呼び出し、StructType
オブジェクトを渡します。例:
>>> df_reader = session.read.schema(schema_for_data_file)
schema
プロパティは、指定されたフィールドを含むファイルを読み取るように構成されたDataFrameReader
オブジェクトを返します。他の形式(JSON など)のファイルでは、これを実行する必要はありません。これらのファイルの場合、
DataFrameReader
は、データをフィールド名$1
の VARIANT 型の単一フィールドとして扱います。
データの読み取り方法に関する追加情報を指定する必要がある(例: データが圧縮されている、またはCSVファイルがフィールドを区切るためにコンマではなくセミコロンを使用している)場合は、
DataFrameReader
オブジェクトのoption
またはoptions
メソッドを呼び出します。option
メソッドは、設定するオプションの名前と値をとり、複数のチェーン呼び出しを組み合わせることができますが、options
メソッドは、オプションの名前とそれに対応する値のディクショナリをとります。ファイル形式オプションの名前と値については、 CREATE FILE FORMAT のドキュメント をご参照ください。
COPY INTO TABLE ドキュメント で説明されているコピーオプションを設定することもできます。コピーオプションを設定すると、 DataFrame にデータを取得 するときに、よりコストのかかる実行戦略の実行される可能性があることに注意してください。
次の例では、圧縮されておらず、フィールド区切り文字にセミコロンを使用する CSV ファイル内のデータをクエリするように、
DataFrameReader
オブジェクトを設定します。>>> df_reader = df_reader.option("field_delimiter", ";").option("COMPRESSION", "NONE")
option
およびoptions
メソッドは、指定されたオプションで構成されたDataFrameReader
オブジェクトを返します。ファイルの形式に対応するメソッド(例:
csv
メソッド)を呼び出し、ファイルの場所を渡します。>>> df = df_reader.csv("@s3_ts_stage/emails/data_0_0_0.csv")
ファイルの形式に対応するメソッドは、そのファイルにデータを保持するように構成された DataFrame オブジェクトを返します。
DataFrame オブジェクトメソッドを使用して、データセットで必要な 変換を実行 します(例: 特定のフィールドの選択、行のフィルタリングなど)。
たとえば、
my_stage
という名前のステージで JSON ファイルからcolor
要素を抽出するには、>>> # Import the sql_expr function from the functions module. >>> from snowflake.snowpark.functions import sql_expr >>> df = session.read.json("@my_stage").select(sql_expr("$1:color"))
前に説明したように、 CSV 以外の形式のファイル(例: JSON)の場合、
DataFrameReader
は、ファイル内のデータを$1
という名前の単一の VARIANT 列として扱います。この例では、
snowflake.snowpark.functions
モジュールのsql_expr
関数を使用して、color
要素へのパスを指定します。sql_expr
関数は入力引数を解釈または変更しないことに注意してください。この関数を使用すると、Snowpark API でまだサポートされていない式とスニペットを SQL で作成できます。アクションメソッドの呼び出し により、ファイル内のデータをクエリします。
テーブルの DataFrames の場合と同様、データは、アクションメソッドを呼び出すまで DataFrame に取得されません。
SQL ステートメントの実行¶
指定した SQL ステートメントを実行するには、 Session
クラスの sql
メソッドを呼び出し、実行するステートメントを渡します。メソッドは DataFrame を返します。
アクションメソッドの呼び出し があるまで、 SQL ステートメントは実行されないことに注意してください。
>>> # Get the list of the files in a stage.
>>> # The collect() method causes this SQL statement to be executed.
>>> session.sql("create or replace temp stage my_stage").collect()
[Row(status='Stage area MY_STAGE successfully created.')]
>>> stage_files_df = session.sql("ls @my_stage").collect()
>>> # Resume the operation of a warehouse.
>>> # Note that you must call the collect method in order to execute
>>> # the SQL statement.
>>> session.sql("alter warehouse if exists my_warehouse resume if suspended").collect()
[Row(status='Statement executed successfully.')]
>>> # Set up a SQL statement to copy data from a stage to a table.
>>> session.sql("copy into sample_product_data from @my_stage file_format=(type = csv)").collect()
[Row(status='Copy executed with 0 files processed.')]
メソッドを呼び出してDataFrame を変換する場合(例: filter
、 select
など)、これらのメソッドは、基になるSQLステートメントがSELECTステートメントである場合にのみ機能することに注意してください。変換メソッドは、他の種類の SQL ステートメントではサポートされていません。
>>> df = session.sql("select id, parent_id from sample_product_data where id < 10")
>>> # Because the underlying SQL statement for the DataFrame is a SELECT statement,
>>> # you can call the filter method to transform this DataFrame.
>>> results = df.filter(col("id") < 3).select(col("id")).collect()
>>> # In this example, the underlying SQL statement is not a SELECT statement.
>>> df = session.sql("ls @my_stage")
>>> # Calling the filter method results in an error.
>>> try:
... df.filter(col("size") > 50).collect()
... except SnowparkSQLException as e:
... print(e.message)
000904 (42000): SQL compilation error: error line 1 at position 104
invalid identifier 'SIZE'