pandas on Snowflake¶
pandas on Snowflakeを使用すると、Snowflakeのデータ上でpandasコードを直接分散実行することができます。インポートステートメントと数行のコードを変更するだけで、Snowflakeのスケーラビリティとセキュリティの利点を備えたpandasネイティブのエクスペリエンスを得ることができます。pandas on Snowflakeにより、より大規模なデータセットを扱うことができ、pandasパイプラインを他のビッグデータフレームワークに移植したり、より大規模で高価なマシンを使用したりする時間やコストを避けることができます。SQL へのトランスパイルを通じてワークロードを Snowflake でネイティブに実行し、Snowflake の並列化、データガバナンス、セキュリティの利点を活用できるようにします。pandas on Snowflakeは、 Snowpark Python library の一部として、Snowpark pandas API を通じて提供され、Snowflake プラットフォーム内で Python コードのスケーラブルなデータ処理を可能にします。
pandas on Snowflakeを使うメリット¶
Python開発者の必要に合致 -- pandas on Snowflakeは、Snowflakeでネイティブに動作するpandas互換のレイヤーにより、Python開発者に親しみやすいインターフェイスを提供します。
スケーラブルな分散pandas -- pandas on Snowflakeは、Snowflakeの既存のクエリ最適化技術を活用することで、pandasの利便性とSnowflakeのスケーラビリティを橋渡しします。コードの書き換えは最小限に抑えられ、移行作業が簡素化されるため、プロトタイプから本番稼動までシームレスに移行できます。
セキュリティとガバナンス -- データはSnowflakeのセキュアなプラットフォームから出ることはありません。pandas on Snowflakeは、データ組織内でデータへのアクセス方法を統一し、監査とガバナンスを容易にします。
管理とチューニングに追加のコンピューティングインフラストラクチャは不要 -- pandas on SnowflakeはSnowflakeのパワフルなコンピューティングエンジンを活用するため、追加のコンピューティングインフラストラクチャを設定したり管理したりする必要はありません。
pandas on Snowflakeを使うべき時¶
以下のいずれかに当てはまる場合は、pandas on Snowflakeを使用する必要があります。
あなたはpandas API と、より広い PyData エコシステムに精通しています。
pandasに精通し、同じコードベースで共同作業をしたい人とチームで仕事をします。
pandasで書かれた既存のコードがあります
あなたのワークフローには、pandas DataFrames でサポートされているように、注文に関連するニーズがあります。例えば、ワークフロー全体でデータセットが同じ順序で並んでいる必要があります
AI-ベースのコパイロット・ツールによる、より正確なコード補完を好みます。
pandas on Snowflake入門¶
pandas on Snowflakeをインストールするには、condaまたはpipを使用してパッケージをインストールします。詳細な手順については、 Installation をご参照ください。
pip install "snowflake-snowpark-python[modin]"
pandas on Snowflakeがインストールされたら、pandasを import pandas as pd
としてインポートする代わりに、以下の2行を使用します。
import modin.pandas as pd
import snowflake.snowpark.modin.plugin
ここでは、pandas on Snowpark Python ライブラリを通じて、pandas on Snowflake を使用する方法の一例を紹介します。
import modin.pandas as pd
# Import the Snowpark plugin for modin.
import snowflake.snowpark.modin.plugin
# Create a Snowpark session with a default connection.
from snowflake.snowpark.session import Session
session = Session.builder.create()
# Create a Snowpark pandas DataFrame from existing Snowflake table
df = pd.read_snowflake('SNOWFALL')
# Alternatively, create a Snowpark pandas DataFrame with sample data.
df = pd.DataFrame([[1, 'Big Bear', 8],[2, 'Big Bear', 10],[3, 'Big Bear', None],
[1, 'Tahoe', 3],[2, 'Tahoe', None],[3, 'Tahoe', 13],
[1, 'Whistler', None],['Friday', 'Whistler', 40],[3, 'Whistler', 25]],
columns=["DAY", "LOCATION", "SNOWFALL"])
# Inspect the DataFrame
df
DAY LOCATION SNOWFALL
0 1 Big Bear 8.0
1 2 Big Bear 10.0
2 3 Big Bear NaN
3 1 Tahoe 3.0
4 2 Tahoe NaN
5 3 Tahoe 13.0
6 1 Whistler NaN
7 Friday Whistler 40.0
8 3 Whistler 25.0
# In-place point update to fix data error.
df.loc[df["DAY"]=="Friday","DAY"]=2
# Inspect the columns after update.
# Note how the data type is updated automatically after transformation.
df["DAY"]
0 1
1 2
2 3
3 1
4 2
5 3
6 1
7 2
8 3
Name: DAY, dtype: int64
# Drop rows with null values.
df.dropna()
DAY LOCATION SNOWFALL
0 1 Big Bear 8.0
1 2 Big Bear 10.0
3 1 Tahoe 3.0
5 3 Tahoe 13.0
7 2 Whistler 40.0
8 3 Whistler 25.0
# Compute the average daily snowfall across locations.
df.groupby("LOCATION").mean()["SNOWFALL"]
LOCATION
Big Bear 9.0
Tahoe 8.0
Whistler 32.5
Name: SNOWFALL, dtype: float64
Snowparkで pandas on Snowflakeを使う DataFrames¶
pandas on Snowflakeと DataFrame API は相互運用性が高いので、 APIs の両方を活用したパイプラインを構築することができます。
Snowpark DataFrames とSnowpark pandas DataFrames 間の変換を行うには、以下の操作を使用できます。
操作 |
入力 |
出力 |
メモ |
---|---|---|---|
Snowpark DataFrame |
Snowpark pandas DataFrame |
この操作は、各行に暗黙の順序を割り当て、 DataFrame の有効期間中、この行の順序を維持します。この変換にはI/Oコストが発生します。 |
|
Snowpark pandas DataFrame またはSnowpark pandas Series |
Snowpark DataFrame |
この操作では行の順序は維持されず、結果のSnowpark DataFrame は、元のSnowpark pandas DataFrame のデータスナップショットを操作します。テーブルから直接作成されるSnowpark DataFrames とは異なり、この動作は、基になるテーブルの変更がSnowpark操作の評価中に反映されないことを意味します。DDL 操作と制限された DML 操作は DataFrame に適用することができません。この変換にI/Oコストは発生しません。 |
可能な限り、不要な変換コストを避けるために、Snowpark DataFrame との間で変換する代わりに、 read_snowflake を使用して、Snowflake から直接テーブルを読み込むことをお勧めします。
詳細については、 Snowpark DataFrames vs Snowpark pandas DataFrame: どちらを選ぶべきでしょうか? をご参照ください。
pandas on Snowflakeがネイティブpandasを比較する方法¶
pandas on Snowflakeとネイティブpandasは、一致した APIs シグネチャと類似したセマンティクスを持つ類似した DataFrame を持っています。pandas on Snowflakeは、ネイティブpandas(pandas 2.2.1)と同じ API シグネチャを提供し、Snowflakeによるスケーラブルな計算を提供します。pandas on Snowflakeは、ネイティブpandasのドキュメントに記述されているセマンティクスを可能な限り尊重しますが、Snowflakeの計算と型システムを使用します。しかし、ネイティブpandasがクライアントマシン上で実行される場合は、Pythonの計算と型システムを使用します。pandas on SnowflakeとSnowflake間の型マッピングについては、 Data types をご参照ください。
ネイティブpandasと同様に、pandas on Snowflakeもインデックスの概念を持ち、行の順序を保持します。しかし、それぞれの実行環境が異なるため、動作に微妙な違いが生じます。このセクションでは、注意すべき主な違いについて説明します。
pandas on SnowflakeはすでにSnowflakeにあるデータで使用するのが最適ですが、以下の操作を使ってネイティブpandasとpandas on Snowflakeを変換することができます。
操作 |
入力 |
出力 |
メモ |
---|---|---|---|
Snowpark pandas DataFrame |
ネイティブpandas DataFrame |
すべてのデータをローカル環境に具現化します。データセットが大きい場合は、メモリ不足エラーになることがあります。 |
|
ネイティブpandas DataFrame、生データ、Snowpark pandasオブジェクト |
Snowpark pandas DataFrame |
これは小型 DataFrames のために確保されています。大量のローカルデータを含む DataFrame を作成すると、データのアップロードに起因するパフォーマンスの問題が発生する可能性があります。 |
|
ネイティブpamdas DataFrame、Snowpark pandasオブジェクト |
Snowflakeテーブル |
結果は、 |
実行環境¶
pandas
: 単一のマシンで動作し、メモリ内でデータを処理します。pandas on Snowflake
: Snowflakeとの統合により、クラスタリングされたマシンで分散コンピューティングが可能です。この統合により、単一マシンのメモリ容量をはるかに超える大規模なデータセットを取り扱えるようになります。Snowpark pandas API を使用するには、Snowflakeへの接続が必要であることに注意してください。
遅延評価と先行評価¶
pandas
: 即座に操作を実行し、操作後ごとに結果をメモリ内で完全に具現化します。このような操作の先行評価は、データをマシン内で広範囲に移動させる必要があるため、メモリ負荷の増加につながる可能性があります。pandas on Snowflake
: pandasと同じ API エクスペリエンスを提供します。pandasの先行評価モデルを模倣していますが、内部的には遅延評価されたクエリグラフを構築し、処理全体の最適化を実現しています。クエリグラフを通じて操作を融合およびトランスパイルすることで、基になる分散Snowflakeコンピューティングエンジンにさらなる最適化の機会がもたらされ、Snowflake内で直接pandasを実行する場合と比較して、コストとエンドツーエンドのパイプライン実行時間の両方が削減されます。
注釈
I/O関連 APIs と、Snowpark pandasオブジェクトではない戻り値(つまり、
DataFrame
、Series
またはIndex
)の APIs は、常に先行評価されます。例:read_snowflake
to_snowflake
to_pandas
to_dict
to_list
__repr__
scikit-learnのようなサードパーティライブラリによって自動的に呼び出されるダンダーメソッド
__array__
。このメソッドを呼び出すと、ローカルマシンに結果が具現化されます。
データソースおよびストレージ¶
pandas
: IO ツール(text、 CSV、 HDF5 など) のpandasドキュメントにリストされている様々なリーダーとライターをサポートします。pandas on Snowflake
: Snowflakeテーブルからの読み取りと書き込み、ローカルまたはステージングされた CSV、 JSON、parquetファイルの読み込みが可能です。詳細については、 IO (読み取りおよび書き込み) をご参照ください。
データ型¶
pandas
: 整数型、浮動小数点数型、文字列型、datetime
型、カテゴリ型など、豊富なデータ型を備えています。また、ユーザー定義のデータ型もサポートしています。pandasのデータ型は通常、基になるデータから派生し、厳格に適用されます。pandas on Snowflake
: Snowflakeでpandasデータ型を SQL に変換してpandasオブジェクトを SQL にマッピングする、Snowflakeの型システムにより制約されます。pandas型の大部分は、Snowflakeで自然に等価なものを持っていますが、マッピングは必ずしも1対1であるとは限りません。場合によっては、複数のpandas型が同じ SQL 型にマッピングされます。
次のテーブルは、pandasとSnowflake SQL 間の型マッピングのリストです。
pandas型 |
Snowflake型 |
---|---|
pandasの拡張整数型を含む、すべての符号付き/符号なし整数型 |
NUMBER(38, 0) |
pandasの拡張浮動小数点数データ型を含む、すべての浮動小数点数型 |
FLOAT |
|
BOOLEAN |
|
STRING |
|
TIME |
|
DATE |
タイムゾーンなしの全 |
TIMESTAMP_NTZ |
すべてのタイムゾーン対応 |
TIMESTAMP_TZ |
|
ARRAY |
|
MAP |
データ型が混在するオブジェクト列 |
VARIANT |
Timedelta64[ns] |
NUMBER(38, 0) |
注釈
カテゴリ型、周期型、区間型、スパース型、およびユーザー定義データ型はサポートされていません。Timedeltaは現在、Snowparkクライアントでのみサポートされています。TimedeltaをSnowflakeに書き戻す場合、Number型として保存されます。
次のテーブルでは、 df.dtypes
を使用した、Snowflake SQL 型からpandas on Snowflake型へのマッピングを提供します。
Snowflake型 |
pandas on Snowflake型( |
---|---|
NUMBER ( |
|
NUMBER ( |
|
BOOLEAN |
|
STRING, TEXT |
|
VARIANT, BINARY, GEOMETRY, GEOGRAPHY |
|
ARRAY |
|
OBJECT |
|
TIME |
|
TIMESTAMP, TIMESTAMP_NTZ, TIMESTAMP_LTZ, TIMESTAMP_TZ |
|
DATE |
|
to_pandas()
を使用してSnowpark pandas DataFrame からネイティブpandas DataFrame に変換する場合、ネイティブpandas DataFrame はpandas on Snowflakeに比べて洗練されたデータ型になりますが、関数やプロシージャについては SQL-Pythonデータ型マッピング と互換性があります。
キャストおよび型推論¶
NULL値の取り扱い¶
pandas
: pandasバージョン1.xでは、 欠損データの処理 が柔軟であったため、PythonNone
、np.nan
、pd.NaN
、pd.NA
、pd.NaT
のすべてを欠損値として扱っていました。それ以降のpandasバージョン(2.2.x)では、これらの値は異なる値として扱われます。pandas on Snowflake
: 以前のpandasバージョンと同様のアプローチを採用し、前述の先行値をすべて欠損値として扱います。Snowparkは、pandasからのNaN
、NA
、NaT
を再利用します。しかし、これらの欠損値はすべてが同じように扱われ、Snowflakeテーブル内で SQL NULL として格納されます。
オフセット/頻度エイリアス¶
pandas
: pandasの日付オフセットがバージョン2.2.1で変更されました。単一文字のエイリアス'M'
、'Q'
、'Y'
などは、2文字のオフセットに取って代わられました。pandas on Snowflake
: pandas時系列ドキュメント で説明されている新しいオフセットを排他的に使用します。
インストール¶
前提条件: Python 3.9、3.10または3.11、modinバージョン0.28.1、およびpandasバージョン2.2.1が必要です。
Tip
Snowflake Notebooks で pandas on Snowflakeを使用するには、 ノートブックのpandas on Snowflake のセットアップ手順をご参照ください。
開発環境のpandas on Snowflakeをインストールするには、以下の手順に従ってください。
プロジェクトディレクトリに移動し、Python仮想環境を有効にします。
注釈
API は現在開発中のため、システム全体ではなくPythonの仮想環境にインストールすることをお勧めします。この方法によって、作成するプロジェクトごとに特定のバージョンを使用することができ、将来のバージョンの変更から隔離することができます。
Anaconda、 Miniconda、 virtualenv などのツールを使用して、特定のPythonバージョン用のPython仮想環境を作成できます。
たとえば、condaを使用してPython 3.9の仮想環境を作成するには、次のように入力します。
conda create --name snowpark_pandas python=3.9 conda activate snowpark_pandas
注釈
以前にPython 3.8とpandas 1.5.3を使用して古いバージョンのpandas on Snowflakeをインストールした場合は、上記のようにPythonとpandasのバージョンをアップグレードする必要があります。Python 3.9、3.10、3.11 で新しい環境を作成する手順に従います。
ModinでSnowpark Pythonライブラリをインストールします。
pip install "snowflake-snowpark-python[modin]"
または
conda install snowflake-snowpark-python modin==0.28.1
注釈
snowflake-snowpark-python
バージョン1.17.0以降がインストールされていることを確認してください。
Snowflakeに対する認証¶
pandas on Snowflake を使用する前に、Snowflake データベースとのセッションを確立する必要があります。構成ファイルを使用してセッションの接続パラメーターを選択することも、コード内でパラメーターを列挙することもできます。詳細については、 Snowpark Pythonのセッションの作成 をご参照ください。アクティブなSnowpark Pythonセッションが存在する場合、pandas on Snowflake は自動的にそれを使用します。例:
import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session
CONNECTION_PARAMETERS = {
'account': '<myaccount>',
'user': '<myuser>',
'password': '<mypassword>',
'role': '<myrole>',
'database': '<mydatabase>',
'schema': '<myschema>',
'warehouse': '<mywarehouse>',
}
session = Session.builder.configs(CONNECTION_PARAMETERS).create()
# pandas on Snowflake will automatically pick up the Snowpark session created above.
# It will use that session to create new DataFrames.
df = pd.DataFrame([1, 2])
df2 = pd.read_snowflake('CUSTOMER')
pd.session
はSnowparkセッションであるため、他のSnowparkセッションと同じように操作できます。たとえば、これを使用して任意の SQL クエリを実行することができます。その結果、 セッション API に従ってSnowpark DataFrame が生成されますが、この結果はSnowpark pandas DataFrame ではなくSnowpark DataFrame になることに注意してください。
# pd.session is the session that pandas on Snowflake is using for new DataFrames.
# In this case it is the same as the Snowpark session that we've created.
assert pd.session is session
# Run SQL query with returned result as Snowpark DataFrame
snowpark_df = pd.session.sql('select * from customer')
snowpark_df.show()
または、 構成ファイル でSnowpark接続パラメーターを設定することもできます。これにより、コード内で接続パラメーターを列挙する必要がなくなり、pandas on Snowflakeコードを通常のpandasコードとほぼ同じように記述することができるようになります。これを実現するには、 ~/.snowflake/connections.toml
に次のような構成ファイルを作成します。
default_connection_name = "default"
[default]
account = "<myaccount>"
user = "<myuser>"
password = "<mypassword>"
role="<myrole>"
database = "<mydatabase>"
schema = "<myschema>"
warehouse = "<mywarehouse>"
そしてコード内で snowflake.snowpark.Session.builder.create()
を使用し、これらの認証情報を使用してセッションを作成するだけです。
import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session
# Session.builder.create() will create a default Snowflake connection.
Session.builder.create()
# create a DataFrame.
df = pd.DataFrame([[1, 2], [3, 4]])
また、複数の Snowpark セッションを作成し、そのうちの 1 つを pandas on Snowflake に割り当てることもできます。pandas on Snowflake は 1 つのセッションしか使用しないため、 pd.session = pandas_session
で明示的にセッションの 1 つを pandas on Snowflake に割り当てる必要があります。
import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session
pandas_session = Session.builder.configs({"user": "<user>", "password": "<password>", "account": "<account1>").create()
other_session = Session.builder.configs({"user": "<user>", "password": "<password>", "account": "<account2>").create()
pd.session = pandas_session
df = pd.DataFrame([1, 2, 3])
次の例では、アクティブなSnowparkセッションがないときにpandas on Snowflakeを使用しようとすると、「pandas on Snowflake requires an active snowpark session, but there is none.」のようなエラーで SnowparkSessionException
が発生します。セッションを作成すると、pandas on Snowflakeを使用できるようになります。例:
import modin.pandas as pd
import snowflake.snowpark.modin.plugin
df = pd.DataFrame([1, 2, 3])
次の例では、複数のアクティブなSnowparkセッションがあるときにpandas on Snowflakeを使用しようとすると、「There are multiple active snowpark sessions, but you need to choose one for pandas on Snowflake.」のようなメッセージで SnowparkSessionException
が発生します。
import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session
pandas_session = Session.builder.configs({"user": "<user>", "password": "<password>", "account": "<account1>"}).create()
other_session = Session.builder.configs({"user": "<user>", "password": "<password>", "account": "<account2>"}).create()
df = pd.DataFrame([1, 2, 3])
注釈
新しい Snowpark pandas DataFrame または Series で使用するセッションは、 modin.pandas.session
で設定する必要があります。ただし、異なるセッションで作成された DataFrames を結合またはマージすることはサポートされていないため、繰り返し異なるセッションを設定したり、ワークフローで異なるセッションを使用したりして DataFrames を作成することは避けてください。
API 参照情報¶
現在実装されている APIs と利用可能なメソッドの全リストは the pandas on Snowflake API 参照 をご参照ください。
サポートされている操作の完全なリストについては、pandas on Snowflakeリファレンスの以下のテーブルをご参照ください。
Snowflakeノートブックでpandas on Snowflakeを使用する¶
Snowflakeノートブックで pandas on Snowflakeを使用するには、 notebooksの pandas on Snowflake をご参照ください。
ストアドプロシージャでpandas on Snowflakeを使用する¶
ストアドプロシージャ でpandas on Snowflakeを使用してデータパイプラインを構築し、 タスク でストアドプロシージャの実行をスケジュールすることができます。
from snowflake.snowpark.context import get_active_session
session = get_active_session()
from snowflake.snowpark import Session
def data_transformation_pipeline(session: Session) -> str:
import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from datetime import datetime
# Create a Snowpark pandas DataFrame with sample data.
df = pd.DataFrame([[1, 'Big Bear', 8],[2, 'Big Bear', 10],[3, 'Big Bear', None],
[1, 'Tahoe', 3],[2, 'Tahoe', None],[3, 'Tahoe', 13],
[1, 'Whistler', None],['Friday', 'Whistler', 40],[3, 'Whistler', 25]],
columns=["DAY", "LOCATION", "SNOWFALL"])
# Drop rows with null values.
df = df.dropna()
# In-place point update to fix data error.
df.loc[df["DAY"]=="Friday","DAY"]=2
# Save Results as a Snowflake Table
timestamp = datetime.now().strftime("%Y_%m_%d_%H_%M")
save_path = f"OUTPUT_{timestamp}"
df.to_snowflake(name=save_path, if_exists="replace", index=False)
return f'Transformed DataFrame saved to {save_path}.'
dt_pipeline_sproc = session.sproc.register(name="run_data_transformation_pipeline_sp",
func=data_transformation_pipeline,
replace=True,
packages=['modin', 'snowflake-snowpark-python'])
ストアドプロシージャを呼び出すには、Python で dt_pipeline_sproc()
を実行するか、 SQL で CALL run_data_transformation_pipeline_sp()
を実行します。
ストアドプロシージャをタスクとしてスケジュールするには、 Snowflake Python API を使用してタスクを作成します。
サードパーティライブラリでpandas on Snowflakeを使用する¶
サードパーティライブラリ APIs を Snowpark pandas DataFrame で呼び出す場合、サードパーティライブラリの呼び出しに DataFrame を渡す前に to_pandas() を呼び出して、Snowpark pandas DataFrame を pandas DataFrame に変換することを推奨します。
注釈
to_pandas()
を呼び出すと、データがSnowflakeからメモリに取り出されるため、大規模なデータセットや機密性の高いユースケースの場合はその点に注意してください。
pandas on Snowflakeは、現在のところ、一部の NumPy および Matplotlib APIs、 np.where
用の分散実装 と df.plot
との相互運用性については限定された互換性を持ちます。これらのサードパーティライブラリで作業する場合、Snowpark pandas DataFrames を to_pandas()
経由で変換することで、複数の I/O 呼び出しを避けることができます。
以下は、視覚化用に Altair を、機械学習用に scikit-learn を使用した例です。
# Create a Snowpark session with a default connection.
session = Session.builder.create()
train = pd.read_snowflake('TITANIC')
train[['Pclass', 'Parch', 'Sex', 'Survived']].head()
Pclass Parch Sex Survived
0 3 0 male 0
1 1 0 female 1
2 3 0 female 1
3 1 0 female 1
4 3 0 male 0
import altair as alt
# Convert to pandas DataFrame
train_df_pandas = train.to_pandas()
survived_per_age_plot = alt.Chart(train_df_pandas).mark_bar(
).encode(
x=alt.X('Age', bin=alt.Bin(maxbins=25)),
y='count()',
column='Survived:N',
color='Survived:N',
).properties(
width=300,
height=300
).configure_axis(
grid=False
)
survived_per_age_plot

pandasに変換した後、scikit-learnを使って簡単なモデルを学習することができます。
feature_cols = ['Pclass', 'Parch']
# Convert features DataFrame to pandas DataFrames
X_pandas = train_snowpark_pandas.loc[:, feature_cols].to_pandas()
# Convert labels Series to pandas Series
y_pandas = train_snowpark_pandas["Survived"].to_pandas()
from sklearn.linear_model import LogisticRegression
logreg = LogisticRegression()
logreg.fit(X_pandas, y_pandas)
y_pred_pandas = logreg.predict(X_pandas)
acc_eval = accuracy_score(y_pandas, y_pred_pandas)

制限事項¶
pandas on Snowflakeには以下の制限があります。
pandas on Snowflakeは、 OSS サードパーティライブラリとの互換性を保証するものではありません。しかし、バージョン1.14.0a1以降、Snowpark pandasは NumPy の互換性、特に
np.where
の使用を限定的に導入しています。詳細については、 NumPy の相互運用性 をご参照ください。Snowpark pandas DataFrame を使用してサードパーティライブラリ APIs を呼び出す場合は、サードパーティライブラリの呼び出しに DataFrame を渡す前に
to_pandas()
を呼び出して、Snowpark pandas DataFrame をpandas DataFrame に変換することをSnowflakeはお勧めします。詳細については、 サードパーティライブラリでpandas on Snowflakeを使用する をご参照ください。pandas on Snowflakeは Snowpark ML と統合されていません。Snowpark ML を使用する場合は、Snowpark ML を呼び出す前に to_snowpark() を使用して、Snowpark pandas DataFrame をSnowpark DataFrame に変換することをお勧めします。
遅延
MultiIndex
オブジェクトはサポートされていません。MultiIndex
が使用されると、ネイティブpandasMultiIndex
オブジェクトが返され、すべてのデータをクライアント側にプルする必要があります。pandas on Snowflakeでは、まだすべてのpandas APIs に分散実装があるわけではありません。サポートされていない APIs の場合は、
NotImplementedError
がスローされます。分散実装を持たない操作は、ストアドプロシージャにフォールバックします。サポートされる APIs については、 API のリファレンスドキュメントをご参照ください。pandas on Snowflakeは特定のpandasバージョンを必要とします。pandas on Snowflakeはpandas 2.2.1を必要とし、pandas 2.2.1との互換性のみを提供します。
pandas on Snowflake
apply()
関数内で pandas on Snowflake を参照することはできません。apply()
内では、ネイティブpandasしか使用できません。
トラブルシューティング¶
このセクションでは、pandas on Snowflakeを使用する際のトラブルシューティングのヒントについて説明します。
トラブルシューティングの場合には、ネイティブpandas DataFrame (またはサンプル)で同じ操作を実行してみて、pandasで同じエラーが続くかどうかを確認してください。この方法により、クエリを修正するヒントが得られる場合があります。例:
df = pd.DataFrame({"a": [1,2,3], "b": ["x", "y", "z"]}) # Running this in Snowpark pandas throws an error df["A"].sum() # Convert a small sample of 10 rows to pandas DataFrame for testing pandas_df = df.head(10).to_pandas() # Run the same operation. KeyError indicates that the column reference is incorrect pandas_df["A"].sum() # Fix the column reference to get the Snowpark pandas query working df["a"].sum()
長時間実行のノートブックを開いている場合、デフォルトでは、セッションが240分(4時間)アイドル状態になると、Snowflakeセッションがタイムアウトすることに注意してください。セッションの有効期限が切れると、pandas on Snowflakeのクエリを追加で実行した場合に以下のエラーが発生します。「Authentication token has expired.The user must authenticate again.」この時点で、Snowflakeへの接続を再度確立する必要があります。この結果、未接続のセッション変数が失われる可能性があります。セッションアイドルタイムアウトパラメーターの構成方法の詳細については、 セッションポリシー をご参照ください。
ベストプラクティス¶
このセクションでは、pandas on Snowflakeを使用する際のベストプラクティスについて説明します。
for
ループ、iterrows
、iteritems
などの反復コードパターンの使用は避けてください。反復コードパターンにより、生成されるクエリの複雑さが急速に増大します。pandas on Snowflakeに、クライアントコードではなくデータ分散と計算の並列化を実行させます。反復コードパターンに関しては、 DataFrame 全体に対して実行できる操作を探し、それに対応する操作を代わりに使用するようにします。
for i in np.arange(0, 50):
if i % 2 == 0:
data = pd.concat([data, pd.DataFrame({'A': i, 'B': i + 1}, index=[0])], ignore_index=True)
else:
data = pd.concat([data, pd.DataFrame({'A': i}, index=[0])], ignore_index=True)
# Instead of creating one DataFrame per row and concatenating them,
# try to directly create the DataFrame out of the data, like this:
data = pd.DataFrame(
{
"A": range(0, 50),
"B": [i + 1 if i % 2 == 0 else None for i in range(50)],
},
)
apply
、applymap
、transform
の呼び出しは避けます。これらは最終的に UDFs や UDTFs で実装されますが、通常の SQL クエリほどパフォーマンスが高くない可能性があります。適用された関数に同等の DataFrame または系列操作がある場合は、代わりにその操作を使用します。たとえば、df.groupby('col1').apply('sum')
の代わりに、df.groupby('col1').sum()
を直接呼び出します。サードパーティライブラリの呼び出しに DataFrame または series を渡す前に、
to_pandas()
を呼び出します。pandas on Snowflake は、サードパーティライブラリとの互換性を保証していません。余計なI/Oオーバーヘッドを避けるために、具現化された通常のSnowflakeテーブルを使用します。pandas on Snowflakeは、通常のテーブルに対してのみ機能するデータスナップショットの上で動作します。外部テーブル、ビュー、 Apache Iceberg™ テーブルを含む他の型では、スナップショットを取得する前に仮テーブルが作成されます。そのため、追加の具現化オーバーヘッドが必要になります。
pandas on Snowflakeは、Snowflakeテーブルから
read_snowflake
を使って DataFrames を作成する際に、高速なゼロコピークローン機能を提供します。ただし、スナップショット機能は、通常のデータベースでは通常のSnowflakeテーブルに対してのみ提供されます。ハイブリッド、icebergなどの型を持つテーブル、または共有データベース下のテーブルをロードする場合、通常のSnowflakeテーブルに対する追加のマテリアライゼーションが導入されます。スナップショットはデータの一貫性と順序を保証するために必要であり、現在のところ、余分な実体化を回避する他の方法はありません。pandas on Snowflakeを使用する場合は、可能なかぎり通常のSnowflakeのテーブルを使用するようにしてください。他の操作に進む前に結果の型を再度確認し、必要に応じて
astype
で明示的に型をキャストします。型推論機能が限られているため、型ヒントが与えられていない場合、
df.apply
は、結果すべてに整数値が含まれていても、オブジェクト(バリアント)型の結果を返します。他の操作でdtype
をint
にする必要がある場合は、astype
メソッドを呼び出して明示的に型をキャストし、列の型を修正してから処理を続行します。評価や具現化が必要でない場合は、 APIs の呼び出しを避けます。
Series
やDataframe
を返さない APIs には、正しい型で結果を得るために評価と具現化を実行する必要がありますプロットの方法も同じです。不要な評価や具現化を最小限に抑えるために、これらの APIs への呼び出しを減らします。大規模データセットでは、
np.where(<cond>, <scalar>, n)
の呼び出しを避けます。<scalar>
は、時間がかかる可能性のある<cond>
のサイズの DataFrame にブロードキャストされます。反復的に構築されるクエリを扱う場合、
df.cache_result
は、中間結果を具現化することで、繰り返される評価を減らし、クエリ全体の待ち時間を改善し、複雑さを軽減することができます。例:df = pd.read_snowflake('pandas_test') df2 = pd.pivot_table(df, index='index_col', columns='pivot_col') # expensive operation df3 = df.merge(df2) df4 = df3.where(df2 == True)
上記の例では、
df2
を生成するクエリは計算コストが高く、df3
とdf4
の両方の生成に再利用されます。df2
を仮テーブルに具現化する(df2
を含む後続操作をピボットではなくテーブルスキャンにする)と、コードブロック全体の遅延時間を短縮できます。df = pd.read_snowflake('pandas_test') df2 = pd.pivot_table(df, index='index_col', columns='pivot_col') # expensive operation df2.cache_result(inplace=True) df3 = df.merge(df2) df4 = df3.where(df2 == True)
例¶
以下はpandas操作を使ったコード例です。まず、 pandas_test
という名前のSnowpark pandas DataFrame から始めます。これには、 COL_STR
、 COL_FLOAT
、 そして COL_INT
の3つの列があります。これらの例に関連するノートブックを表示するには、 Snowflake-Labsリポジトリ内の pandas on Snowflakeの例 をご参照ください。
import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session
CONNECTION_PARAMETERS = {
'account': '<myaccount>',
'user': '<myuser>',
'password': '<mypassword>',
'role': '<myrole>',
'database': '<mydatabase>',
'schema': '<myschema>',
'warehouse': '<mywarehouse>',
}
session = Session.builder.configs(CONNECTION_PARAMETERS).create()
df = pd.DataFrame([['a', 2.1, 1],['b', 4.2, 2],['c', 6.3, None]], columns=["COL_STR", "COL_FLOAT", "COL_INT"])
df
COL_STR COL_FLOAT COL_INT
0 a 2.1 1.0
1 b 4.2 2.0
2 c 6.3 NaN
例を通して使用する DataFrame を pandas_test
という名前のSnowflakeテーブルとして保存します。
df.to_snowflake("pandas_test", if_exists='replace',index=False)
次に、Snowflakeテーブルから DataFrame を作成します。列 COL_INT
をドロップし、結果を列名 row_position
でSnowflakeに保存します。
# Create a DataFrame out of a Snowflake table.
df = pd.read_snowflake('pandas_test')
df.shape
(3, 3)
df.head(2)
COL_STR COL_FLOAT COL_INT
0 a 2.1 1
1 b 4.2 2
df.dropna(subset=["COL_FLOAT"], inplace=True)
df
COL_STR COL_FLOAT COL_INT
0 a 2.1 1
1 c 6.3 2
df.shape
(2, 3)
df.dtypes
COL_STR object
COL_FLOAT float64
COL_INT int64
dtype: object
# Save the result back to Snowflake with a row_pos column.
df.reset_index(drop=True).to_snowflake('pandas_test2', if_exists='replace', index=True, index_label=['row_pos'])
新しいテーブル、 pandas_test2
は次のようになります。
row_pos COL_STR COL_FLOAT COL_INT
0 1 a 2.0 1
1 2 b 4.0 2
IO (読み取りおよび書き込み)¶
# Reading and writing to Snowflake
df = pd.DataFrame({"fruit": ["apple", "orange"], "size": [3.4, 5.4], "weight": [1.4, 3.2]})
df.to_snowflake("test_table", if_exists="replace", index=False )
df_table = pd.read_snowflake("test_table")
# Generate sample CSV file
with open("data.csv", "w") as f:
f.write('fruit,size,weight\napple,3.4,1.4\norange,5.4,3.2')
# Read from local CSV file
df_csv = pd.read_csv("data.csv")
# Generate sample JSON file
with open("data.json", "w") as f:
f.write('{"fruit":"apple", "size":3.4, "weight":1.4},{"fruit":"orange", "size":5.4, "weight":3.2}')
# Read from local JSON file
df_json = pd.read_json('data.json')
# Upload data.json and data.csv to Snowflake stage named @TEST_STAGE
# Read CSV and JSON file from stage
df_csv = pd.read_csv('@TEST_STAGE/data.csv')
df_json = pd.read_json('@TEST_STAGE/data.json')
詳細については、 入出力 をご参照ください。
インデックス作成¶
df = pd.DataFrame({"a": [1,2,3], "b": ["x", "y", "z"]})
df.columns
Index(['a', 'b'], dtype='object')
df.index
Index([0, 1, 2], dtype='int8')
df["a"]
0 1
1 2
2 3
Name: a, dtype: int8
df["b"]
0 x
1 y
2 z
Name: b, dtype: object
df.iloc[0,1]
'x'
df.loc[df["a"] > 2]
a b
2 3 z
df.columns = ["c", "d"]
df
c d
0 1 x
1 2 y
2 3 z
df = df.set_index("c")
df
d
c
1 x
2 y
3 z
df.rename(columns={"d": "renamed"})
renamed
c
1 x
2 y
3 z
欠損値¶
import numpy as np
df = pd.DataFrame([[np.nan, 2, np.nan, 0],
[3, 4, np.nan, 1],
[np.nan, np.nan, np.nan, np.nan],
[np.nan, 3, np.nan, 4]],
columns=list("ABCD"))
df
A B C D
0 NaN 2.0 NaN 0.0
1 3.0 4.0 NaN 1.0
2 NaN NaN NaN NaN
3 NaN 3.0 NaN 4.0
df.isna()
A B C D
0 True False True False
1 False False True False
2 True True True True
3 True False True False
df.fillna(0)
A B C D
0 0.0 2.0 0.0 0.0
1 3.0 4.0 0.0 1.0
2 0.0 0.0 0.0 0.0
3 0.0 3.0 0.0 4.0
df.dropna(how="all")
A B C D
0 NaN 2.0 NaN 0.0
1 3.0 4.0 NaN 1.0
3 NaN 3.0 NaN 4.0
型変換¶
df = pd.DataFrame({"int": [1,2,3], "str": ["4", "5", "6"]})
df
int str
0 1 4
1 2 5
2 3 6
df_float = df.astype(float)
df_float
int str
0 1.0 4.0
1 2.0 5.0
2 3.0 6.0
df_float.dtypes
int float64
str float64
dtype: object
pd.to_numeric(df.str)
0 4.0
1 5.0
2 6.0
Name: str, dtype: float64
df = pd.DataFrame({'year': [2015, 2016],
'month': [2, 3],
'day': [4, 5]})
pd.to_datetime(df)
0 2015-02-04
1 2016-03-05
dtype: datetime64[ns]
2項演算¶
df_1 = pd.DataFrame([[1,2,3],[4,5,6]])
df_2 = pd.DataFrame([[6,7,8]])
df_1.add(df_2)
0 1 2
0 7.0 9.0 11.0
1 NaN NaN NaN
s1 = pd.Series([1, 2, 3])
s2 = pd.Series([2, 2, 2])
s1 + s2
0 3
1 4
2 5
dtype: int64
df = pd.DataFrame({"A": [1,2,3], "B": [4,5,6]})
df["A+B"] = df["A"] + df["B"]
df
A B A+B
0 1 4 5
1 2 5 7
2 3 6 9
集計¶
df = pd.DataFrame([[1, 2, 3],
[4, 5, 6],
[7, 8, 9],
[np.nan, np.nan, np.nan]],
columns=['A', 'B', 'C'])
df.agg(['sum', 'min'])
A B C
sum 12.0 15.0 18.0
min 1.0 2.0 3.0
df.median()
A 4.0
B 5.0
C 6.0
dtype: float64
マージ¶
df1 = pd.DataFrame({'lkey': ['foo', 'bar', 'baz', 'foo'],
'value': [1, 2, 3, 5]})
df1
lkey value
0 foo 1
1 bar 2
2 baz 3
3 foo 5
df2 = pd.DataFrame({'rkey': ['foo', 'bar', 'baz', 'foo'],
'value': [5, 6, 7, 8]})
df2
rkey value
0 foo 5
1 bar 6
2 baz 7
3 foo 8
df1.merge(df2, left_on='lkey', right_on='rkey')
lkey value_x rkey value_y
0 foo 1 foo 5
1 foo 1 foo 8
2 bar 2 bar 6
3 baz 3 baz 7
4 foo 5 foo 5
5 foo 5 foo 8
df = pd.DataFrame({'key': ['K0', 'K1', 'K2', 'K3', 'K4', 'K5'],
'A': ['A0', 'A1', 'A2', 'A3', 'A4', 'A5']})
df
key A
0 K0 A0
1 K1 A1
2 K2 A2
3 K3 A3
4 K4 A4
5 K5 A5
other = pd.DataFrame({'key': ['K0', 'K1', 'K2'],
'B': ['B0', 'B1', 'B2']})
df.join(other, lsuffix='_caller', rsuffix='_other')
key_caller A key_other B
0 K0 A0 K0 B0
1 K1 A1 K1 B1
2 K2 A2 K2 B2
3 K3 A3 None None
4 K4 A4 None None
5 K5 A5 None None
グループ別¶
df = pd.DataFrame({'Animal': ['Falcon', 'Falcon','Parrot', 'Parrot'],
'Max Speed': [380., 370., 24., 26.]})
df
Animal Max Speed
0 Falcon 380.0
1 Falcon 370.0
2 Parrot 24.0
3 Parrot 26.0
df.groupby(['Animal']).mean()
Max Speed
Animal
Falcon 375.0
Parrot 25.0
詳細については、 GroupBy をご参照ください。
ピボット¶
df = pd.DataFrame({"A": ["foo", "foo", "foo", "foo", "foo",
"bar", "bar", "bar", "bar"],
"B": ["one", "one", "one", "two", "two",
"one", "one", "two", "two"],
"C": ["small", "large", "large", "small",
"small", "large", "small", "small",
"large"],
"D": [1, 2, 2, 3, 3, 4, 5, 6, 7],
"E": [2, 4, 5, 5, 6, 6, 8, 9, 9]})
df
A B C D E
0 foo one small 1 2
1 foo one large 2 4
2 foo one large 2 5
3 foo two small 3 5
4 foo two small 3 6
5 bar one large 4 6
6 bar one small 5 8
7 bar two small 6 9
8 bar two large 7 9
pd.pivot_table(df, values='D', index=['A', 'B'],
columns=['C'], aggfunc="sum")
C large small
A B
bar one 4.0 5
two 7.0 6
foo one 4.0 1
two NaN 6
df = pd.DataFrame({'foo': ['one', 'one', 'one', 'two', 'two', 'two'],
'bar': ['A', 'B', 'C', 'A', 'B', 'C'],
'baz': [1, 2, 3, 4, 5, 6],
'zoo': ['x', 'y', 'z', 'q', 'w', 't']})
df
foo bar baz zoo
0 one A 1 x
1 one B 2 y
2 one C 3 z
3 two A 4 q
4 two B 5 w
5 two C 6 t