pandas on Snowflake

pandas on Snowflakeを使用すると、Snowflakeのデータ上でpandasコードを直接分散実行することができます。インポートステートメントと数行のコードを変更するだけで、Snowflakeのスケーラビリティとセキュリティの利点を備えたpandasネイティブのエクスペリエンスを得ることができます。pandas on Snowflakeにより、より大規模なデータセットを扱うことができ、pandasパイプラインを他のビッグデータフレームワークに移植したり、より大規模で高価なマシンを使用したりする時間やコストを避けることができます。SQL へのトランスパイルを通じてワークロードを Snowflake でネイティブに実行し、Snowflake の並列化、データガバナンス、セキュリティの利点を活用できるようにします。pandas on Snowflakeは、 Snowpark Python library の一部として、Snowpark pandas API を通じて提供され、Snowflake プラットフォーム内で Python コードのスケーラブルなデータ処理を可能にします。

pandas on Snowflakeを使うメリット

  • Python開発者の必要に合致 -- pandas on Snowflakeは、Snowflakeでネイティブに動作するpandas互換のレイヤーにより、Python開発者に親しみやすいインターフェイスを提供します。

  • スケーラブルな分散pandas -- pandas on Snowflakeは、Snowflakeの既存のクエリ最適化技術を活用することで、pandasの利便性とSnowflakeのスケーラビリティを橋渡しします。コードの書き換えは最小限に抑えられ、移行作業が簡素化されるため、プロトタイプから本番稼動までシームレスに移行できます。

  • セキュリティとガバナンス -- データはSnowflakeのセキュアなプラットフォームから出ることはありません。pandas on Snowflakeは、データ組織内でデータへのアクセス方法を統一し、監査とガバナンスを容易にします。

  • 管理とチューニングに追加のコンピューティングインフラストラクチャは不要 -- pandas on SnowflakeはSnowflakeのパワフルなコンピューティングエンジンを活用するため、追加のコンピューティングインフラストラクチャを設定したり管理したりする必要はありません。

pandas on Snowflakeを使うべき時

以下のいずれかに当てはまる場合は、pandas on Snowflakeを使用する必要があります。

  • あなたはpandas API と、より広い PyData エコシステムに精通しています。

  • pandasに精通し、同じコードベースで共同作業をしたい人とチームで仕事をします。

  • pandasで書かれた既存のコードがあります

  • あなたのワークフローには、pandas DataFrames でサポートされているように、注文に関連するニーズがあります。例えば、ワークフロー全体でデータセットが同じ順序で並んでいる必要があります

  • AI-ベースのコパイロット・ツールによる、より正確なコード補完を好みます。

pandas on Snowflake入門

pandas on Snowflakeをインストールするには、condaまたはpipを使用してパッケージをインストールします。詳細な手順については、 Installation をご参照ください。

pip install "snowflake-snowpark-python[modin]"
Copy

pandas on Snowflakeがインストールされたら、pandasを import pandas as pd としてインポートする代わりに、以下の2行を使用します。

import modin.pandas as pd
import snowflake.snowpark.modin.plugin
Copy

ここでは、pandas on Snowpark Python ライブラリを通じて、pandas on Snowflake を使用する方法の一例を紹介します。

import modin.pandas as pd

# Import the Snowpark plugin for modin.
import snowflake.snowpark.modin.plugin

# Create a Snowpark session with a default connection.
from snowflake.snowpark.session import Session
session = Session.builder.create()

# Create a Snowpark pandas DataFrame from existing Snowflake table
df = pd.read_snowflake('SNOWFALL')

# Alternatively, create a Snowpark pandas DataFrame with sample data.
df = pd.DataFrame([[1, 'Big Bear', 8],[2, 'Big Bear', 10],[3, 'Big Bear', None],
                    [1, 'Tahoe', 3],[2, 'Tahoe', None],[3, 'Tahoe', 13],
                    [1, 'Whistler', None],['Friday', 'Whistler', 40],[3, 'Whistler', 25]],
                    columns=["DAY", "LOCATION", "SNOWFALL"])

# Inspect the DataFrame
df
Copy
      DAY  LOCATION  SNOWFALL
0       1  Big Bear       8.0
1       2  Big Bear      10.0
2       3  Big Bear       NaN
3       1     Tahoe       3.0
4       2     Tahoe       NaN
5       3     Tahoe      13.0
6       1  Whistler       NaN
7  Friday  Whistler      40.0
8       3  Whistler      25.0
# In-place point update to fix data error.
df.loc[df["DAY"]=="Friday","DAY"]=2

# Inspect the columns after update.
# Note how the data type is updated automatically after transformation.
df["DAY"]
Copy
0    1
1    2
2    3
3    1
4    2
5    3
6    1
7    2
8    3
Name: DAY, dtype: int64
# Drop rows with null values.
df.dropna()
Copy
  DAY  LOCATION  SNOWFALL
0   1  Big Bear       8.0
1   2  Big Bear      10.0
3   1     Tahoe       3.0
5   3     Tahoe      13.0
7   2  Whistler      40.0
8   3  Whistler      25.0
# Compute the average daily snowfall across locations.
df.groupby("LOCATION").mean()["SNOWFALL"]
Copy
LOCATION
Big Bear     9.0
Tahoe        8.0
Whistler    32.5
Name: SNOWFALL, dtype: float64

Snowparkで pandas on Snowflakeを使う DataFrames

pandas on Snowflakeと DataFrame API は相互運用性が高いので、 APIs の両方を活用したパイプラインを構築することができます。

Snowpark DataFrames とSnowpark pandas DataFrames 間の変換を行うには、以下の操作を使用できます。

操作

入力

出力

メモ

to_snowpark_pandas

Snowpark DataFrame

Snowpark pandas DataFrame

この操作は、各行に暗黙の順序を割り当て、 DataFrame の有効期間中、この行の順序を維持します。この変換にはI/Oコストが発生します。

to_snowpark

Snowpark pandas DataFrame またはSnowpark pandas Series

Snowpark DataFrame

この操作では行の順序は維持されず、結果のSnowpark DataFrame は、元のSnowpark pandas DataFrame のデータスナップショットを操作します。テーブルから直接作成されるSnowpark DataFrames とは異なり、この動作は、基になるテーブルの変更がSnowpark操作の評価中に反映されないことを意味します。DDL 操作と制限された DML 操作は DataFrame に適用することができません。この変換にI/Oコストは発生しません。

可能な限り、不要な変換コストを避けるために、Snowpark DataFrame との間で変換する代わりに、 read_snowflake を使用して、Snowflake から直接テーブルを読み込むことをお勧めします。

詳細については、 Snowpark DataFrames vs Snowpark pandas DataFrame: どちらを選ぶべきでしょうか? をご参照ください。

pandas on Snowflakeがネイティブpandasを比較する方法

pandas on Snowflakeとネイティブpandasは、一致した APIs シグネチャと類似したセマンティクスを持つ類似した DataFrame を持っています。pandas on Snowflakeは、ネイティブpandas(pandas 2.2.1)と同じ API シグネチャを提供し、Snowflakeによるスケーラブルな計算を提供します。pandas on Snowflakeは、ネイティブpandasのドキュメントに記述されているセマンティクスを可能な限り尊重しますが、Snowflakeの計算と型システムを使用します。しかし、ネイティブpandasがクライアントマシン上で実行される場合は、Pythonの計算と型システムを使用します。pandas on SnowflakeとSnowflake間の型マッピングについては、 Data types をご参照ください。

ネイティブpandasと同様に、pandas on Snowflakeもインデックスの概念を持ち、行の順序を保持します。しかし、それぞれの実行環境が異なるため、動作に微妙な違いが生じます。このセクションでは、注意すべき主な違いについて説明します。

pandas on SnowflakeはすでにSnowflakeにあるデータで使用するのが最適ですが、以下の操作を使ってネイティブpandasとpandas on Snowflakeを変換することができます。

操作

入力

出力

メモ

to_pandas

Snowpark pandas DataFrame

ネイティブpandas DataFrame

すべてのデータをローカル環境に具現化します。データセットが大きい場合は、メモリ不足エラーになることがあります。

pd.DataFrame(...)

ネイティブpandas DataFrame、生データ、Snowpark pandasオブジェクト

Snowpark pandas DataFrame

これは小型 DataFrames のために確保されています。大量のローカルデータを含む DataFrame を作成すると、データのアップロードに起因するパフォーマンスの問題が発生する可能性があります。

session.write_pandas

ネイティブpamdas DataFrame、Snowpark pandasオブジェクト

Snowflakeテーブル

結果は、 write_pandas の呼び出しで指定されたテーブル名を使用して、 pd.read_snowflake によりpandas on Snowflakeにロードすることができます。

実行環境

  • pandas: 単一のマシンで動作し、メモリ内でデータを処理します。

  • pandas on Snowflake: Snowflakeとの統合により、クラスタリングされたマシンで分散コンピューティングが可能です。この統合により、単一マシンのメモリ容量をはるかに超える大規模なデータセットを取り扱えるようになります。Snowpark pandas API を使用するには、Snowflakeへの接続が必要であることに注意してください。

遅延評価と先行評価

  • pandas: 即座に操作を実行し、操作後ごとに結果をメモリ内で完全に具現化します。このような操作の先行評価は、データをマシン内で広範囲に移動させる必要があるため、メモリ負荷の増加につながる可能性があります。

  • pandas on Snowflake: pandasと同じ API エクスペリエンスを提供します。pandasの先行評価モデルを模倣していますが、内部的には遅延評価されたクエリグラフを構築し、処理全体の最適化を実現しています。

    クエリグラフを通じて操作を融合およびトランスパイルすることで、基になる分散Snowflakeコンピューティングエンジンにさらなる最適化の機会がもたらされ、Snowflake内で直接pandasを実行する場合と比較して、コストとエンドツーエンドのパイプライン実行時間の両方が削減されます。

    注釈

    I/O関連 APIs と、Snowpark pandasオブジェクトではない戻り値(つまり、 DataFrameSeries または Index)の APIs は、常に先行評価されます。例:

    • read_snowflake

    • to_snowflake

    • to_pandas

    • to_dict

    • to_list

    • __repr__

    • scikit-learnのようなサードパーティライブラリによって自動的に呼び出されるダンダーメソッド __array__。このメソッドを呼び出すと、ローカルマシンに結果が具現化されます。

データソースおよびストレージ

  • pandas: IO ツール(text、 CSV、 HDF5 など) のpandasドキュメントにリストされている様々なリーダーとライターをサポートします。

  • pandas on Snowflake: Snowflakeテーブルからの読み取りと書き込み、ローカルまたはステージングされた CSV、 JSON、parquetファイルの読み込みが可能です。詳細については、 IO (読み取りおよび書き込み) をご参照ください。

データ型

  • pandas: 整数型、浮動小数点数型、文字列型、 datetime 型、カテゴリ型など、豊富なデータ型を備えています。また、ユーザー定義のデータ型もサポートしています。pandasのデータ型は通常、基になるデータから派生し、厳格に適用されます。

  • pandas on Snowflake: Snowflakeでpandasデータ型を SQL に変換してpandasオブジェクトを SQL にマッピングする、Snowflakeの型システムにより制約されます。pandas型の大部分は、Snowflakeで自然に等価なものを持っていますが、マッピングは必ずしも1対1であるとは限りません。場合によっては、複数のpandas型が同じ SQL 型にマッピングされます。

次のテーブルは、pandasとSnowflake SQL 間の型マッピングのリストです。

pandas型

Snowflake型

pandasの拡張整数型を含む、すべての符号付き/符号なし整数型

NUMBER(38, 0)

pandasの拡張浮動小数点数データ型を含む、すべての浮動小数点数型

FLOAT

boolBooleanDtype

BOOLEAN

strStringDtype

STRING

datetime.time

TIME

datetime.date

DATE

タイムゾーンなしの全 datetime

TIMESTAMP_NTZ

すべてのタイムゾーン対応 datetime

TIMESTAMP_TZ

listtuplearray

ARRAY

dictjson

MAP

データ型が混在するオブジェクト列

VARIANT

Timedelta64[ns]

NUMBER(38, 0)

注釈

カテゴリ型、周期型、区間型、スパース型、およびユーザー定義データ型はサポートされていません。Timedeltaは現在、Snowparkクライアントでのみサポートされています。TimedeltaをSnowflakeに書き戻す場合、Number型として保存されます。

次のテーブルでは、 df.dtypes を使用した、Snowflake SQL 型からpandas on Snowflake型へのマッピングを提供します。

Snowflake型

pandas on Snowflake型(df.dtypes

NUMBER (scale = 0)

int64

NUMBER (scale > 0), REAL

float64

BOOLEAN

bool

STRING, TEXT

object (str)

VARIANT, BINARY, GEOMETRY, GEOGRAPHY

object

ARRAY

object (list)

OBJECT

object (dict)

TIME

object (datetime.time)

TIMESTAMP, TIMESTAMP_NTZ, TIMESTAMP_LTZ, TIMESTAMP_TZ

datetime64[ns]

DATE

object (datetime.date)

to_pandas() を使用してSnowpark pandas DataFrame からネイティブpandas DataFrame に変換する場合、ネイティブpandas DataFrame はpandas on Snowflakeに比べて洗練されたデータ型になりますが、関数やプロシージャについては SQL-Pythonデータ型マッピング と互換性があります。

キャストおよび型推論

  • pandas: NumPy に依存し、デフォルトでは暗黙の型キャストと推論のために NumPy とPythonの型システムに従います。たとえば、ブーリアンは整数型として扱われるため、 1 + True2 を返します。

  • pandas on Snowflake: NumPy およびPythonの型を前述のマッピングテーブルに従ってSnowflake型にマッピングし、暗黙の 型キャストと推論 のために基になるSnowflake型システムを使用します。たとえば、 論理データ型 に従い、暗黙的にブーリアンを整数型に変換しないため、 1 + True の結果は型変換エラーになります。

NULL値の取り扱い

  • pandas: pandasバージョン1.xでは、 欠損データの処理 が柔軟であったため、Python Nonenp.nanpd.NaNpd.NApd.NaT のすべてを欠損値として扱っていました。それ以降のpandasバージョン(2.2.x)では、これらの値は異なる値として扱われます。

  • pandas on Snowflake: 以前のpandasバージョンと同様のアプローチを採用し、前述の先行値をすべて欠損値として扱います。Snowparkは、pandasからの NaNNANaT を再利用します。しかし、これらの欠損値はすべてが同じように扱われ、Snowflakeテーブル内で SQL NULL として格納されます。

オフセット/頻度エイリアス

  • pandas: pandasの日付オフセットがバージョン2.2.1で変更されました。単一文字のエイリアス 'M''Q''Y' などは、2文字のオフセットに取って代わられました。

  • pandas on Snowflake: pandas時系列ドキュメント で説明されている新しいオフセットを排他的に使用します。

インストール

前提条件: Python 3.9、3.10または3.11、modinバージョン0.28.1、およびpandasバージョン2.2.1が必要です。

Tip

Snowflake Notebooks で pandas on Snowflakeを使用するには、 ノートブックのpandas on Snowflake のセットアップ手順をご参照ください。

開発環境のpandas on Snowflakeをインストールするには、以下の手順に従ってください。

  1. プロジェクトディレクトリに移動し、Python仮想環境を有効にします。

    注釈

    API は現在開発中のため、システム全体ではなくPythonの仮想環境にインストールすることをお勧めします。この方法によって、作成するプロジェクトごとに特定のバージョンを使用することができ、将来のバージョンの変更から隔離することができます。

    AnacondaMinicondavirtualenv などのツールを使用して、特定のPythonバージョン用のPython仮想環境を作成できます。

    たとえば、condaを使用してPython 3.9の仮想環境を作成するには、次のように入力します。

    conda create --name snowpark_pandas python=3.9
    conda activate snowpark_pandas
    
    Copy

    注釈

    以前にPython 3.8とpandas 1.5.3を使用して古いバージョンのpandas on Snowflakeをインストールした場合は、上記のようにPythonとpandasのバージョンをアップグレードする必要があります。Python 3.9、3.10、3.11 で新しい環境を作成する手順に従います。

  2. ModinでSnowpark Pythonライブラリをインストールします。

    pip install "snowflake-snowpark-python[modin]"
    
    Copy

    または

    conda install snowflake-snowpark-python modin==0.28.1
    
    Copy

注釈

snowflake-snowpark-python バージョン1.17.0以降がインストールされていることを確認してください。

Snowflakeに対する認証

pandas on Snowflake を使用する前に、Snowflake データベースとのセッションを確立する必要があります。構成ファイルを使用してセッションの接続パラメーターを選択することも、コード内でパラメーターを列挙することもできます。詳細については、 Snowpark Pythonのセッションの作成 をご参照ください。アクティブなSnowpark Pythonセッションが存在する場合、pandas on Snowflake は自動的にそれを使用します。例:

import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session

CONNECTION_PARAMETERS = {
    'account': '<myaccount>',
    'user': '<myuser>',
    'password': '<mypassword>',
    'role': '<myrole>',
    'database': '<mydatabase>',
    'schema': '<myschema>',
    'warehouse': '<mywarehouse>',
}
session = Session.builder.configs(CONNECTION_PARAMETERS).create()

# pandas on Snowflake will automatically pick up the Snowpark session created above.
# It will use that session to create new DataFrames.
df = pd.DataFrame([1, 2])
df2 = pd.read_snowflake('CUSTOMER')
Copy

pd.session はSnowparkセッションであるため、他のSnowparkセッションと同じように操作できます。たとえば、これを使用して任意の SQL クエリを実行することができます。その結果、 セッション API に従ってSnowpark DataFrame が生成されますが、この結果はSnowpark pandas DataFrame ではなくSnowpark DataFrame になることに注意してください。

# pd.session is the session that pandas on Snowflake is using for new DataFrames.
# In this case it is the same as the Snowpark session that we've created.
assert pd.session is session

# Run SQL query with returned result as Snowpark DataFrame
snowpark_df = pd.session.sql('select * from customer')
snowpark_df.show()
Copy

または、 構成ファイル でSnowpark接続パラメーターを設定することもできます。これにより、コード内で接続パラメーターを列挙する必要がなくなり、pandas on Snowflakeコードを通常のpandasコードとほぼ同じように記述することができるようになります。これを実現するには、 ~/.snowflake/connections.toml に次のような構成ファイルを作成します。

default_connection_name = "default"

[default]
account = "<myaccount>"
user = "<myuser>"
password = "<mypassword>"
role="<myrole>"
database = "<mydatabase>"
schema = "<myschema>"
warehouse = "<mywarehouse>"
Copy

そしてコード内で snowflake.snowpark.Session.builder.create() を使用し、これらの認証情報を使用してセッションを作成するだけです。

import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session

# Session.builder.create() will create a default Snowflake connection.
Session.builder.create()
# create a DataFrame.
df = pd.DataFrame([[1, 2], [3, 4]])
Copy

また、複数の Snowpark セッションを作成し、そのうちの 1 つを pandas on Snowflake に割り当てることもできます。pandas on Snowflake は 1 つのセッションしか使用しないため、 pd.session = pandas_session で明示的にセッションの 1 つを pandas on Snowflake に割り当てる必要があります。

import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session

pandas_session = Session.builder.configs({"user": "<user>", "password": "<password>", "account": "<account1>").create()
other_session = Session.builder.configs({"user": "<user>", "password": "<password>", "account": "<account2>").create()
pd.session = pandas_session
df = pd.DataFrame([1, 2, 3])
Copy

次の例では、アクティブなSnowparkセッションがないときにpandas on Snowflakeを使用しようとすると、「pandas on Snowflake requires an active snowpark session, but there is none.」のようなエラーで SnowparkSessionException が発生します。セッションを作成すると、pandas on Snowflakeを使用できるようになります。例:

import modin.pandas as pd
import snowflake.snowpark.modin.plugin

df = pd.DataFrame([1, 2, 3])
Copy

次の例では、複数のアクティブなSnowparkセッションがあるときにpandas on Snowflakeを使用しようとすると、「There are multiple active snowpark sessions, but you need to choose one for pandas on Snowflake.」のようなメッセージで SnowparkSessionException が発生します。

import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session

pandas_session = Session.builder.configs({"user": "<user>", "password": "<password>", "account": "<account1>"}).create()
other_session = Session.builder.configs({"user": "<user>", "password": "<password>", "account": "<account2>"}).create()
df = pd.DataFrame([1, 2, 3])
Copy

注釈

新しい Snowpark pandas DataFrame または Series で使用するセッションは、 modin.pandas.session で設定する必要があります。ただし、異なるセッションで作成された DataFrames を結合またはマージすることはサポートされていないため、繰り返し異なるセッションを設定したり、ワークフローで異なるセッションを使用したりして DataFrames を作成することは避けてください。

API 参照情報

現在実装されている APIs と利用可能なメソッドの全リストは the pandas on Snowflake API 参照 をご参照ください。

サポートされている操作の完全なリストについては、pandas on Snowflakeリファレンスの以下のテーブルをご参照ください。

Snowflakeノートブックでpandas on Snowflakeを使用する

Snowflakeノートブックで pandas on Snowflakeを使用するには、 notebooksの pandas on Snowflake をご参照ください。

ストアドプロシージャでpandas on Snowflakeを使用する

ストアドプロシージャ でpandas on Snowflakeを使用してデータパイプラインを構築し、 タスク でストアドプロシージャの実行をスケジュールすることができます。

from snowflake.snowpark.context import get_active_session
session = get_active_session()

from snowflake.snowpark import Session

def data_transformation_pipeline(session: Session) -> str:
  import modin.pandas as pd
  import snowflake.snowpark.modin.plugin
  from datetime import datetime
  # Create a Snowpark pandas DataFrame with sample data.
  df = pd.DataFrame([[1, 'Big Bear', 8],[2, 'Big Bear', 10],[3, 'Big Bear', None],
                     [1, 'Tahoe', 3],[2, 'Tahoe', None],[3, 'Tahoe', 13],
                     [1, 'Whistler', None],['Friday', 'Whistler', 40],[3, 'Whistler', 25]],
                      columns=["DAY", "LOCATION", "SNOWFALL"])
  # Drop rows with null values.
  df = df.dropna()
  # In-place point update to fix data error.
  df.loc[df["DAY"]=="Friday","DAY"]=2
  # Save Results as a Snowflake Table
  timestamp = datetime.now().strftime("%Y_%m_%d_%H_%M")
  save_path = f"OUTPUT_{timestamp}"
  df.to_snowflake(name=save_path, if_exists="replace", index=False)
  return f'Transformed DataFrame saved to {save_path}.'


  dt_pipeline_sproc = session.sproc.register(name="run_data_transformation_pipeline_sp",
                             func=data_transformation_pipeline,
                             replace=True,
                             packages=['modin', 'snowflake-snowpark-python'])
Copy

ストアドプロシージャを呼び出すには、Python で dt_pipeline_sproc() を実行するか、 SQL で CALL run_data_transformation_pipeline_sp() を実行します。

ストアドプロシージャをタスクとしてスケジュールするには、 Snowflake Python API を使用してタスクを作成します。

サードパーティライブラリでpandas on Snowflakeを使用する

サードパーティライブラリ APIs を Snowpark pandas DataFrame で呼び出す場合、サードパーティライブラリの呼び出しに DataFrame を渡す前に to_pandas() を呼び出して、Snowpark pandas DataFrame を pandas DataFrame に変換することを推奨します。

注釈

to_pandas() を呼び出すと、データがSnowflakeからメモリに取り出されるため、大規模なデータセットや機密性の高いユースケースの場合はその点に注意してください。

pandas on Snowflakeは、現在のところ、一部の NumPy および Matplotlib APIs、 np.where 用の分散実装 と df.plot との相互運用性については限定された互換性を持ちます。これらのサードパーティライブラリで作業する場合、Snowpark pandas DataFrames を to_pandas() 経由で変換することで、複数の I/O 呼び出しを避けることができます。

以下は、視覚化用に Altair を、機械学習用に scikit-learn を使用した例です。

# Create a Snowpark session with a default connection.
session = Session.builder.create()

train = pd.read_snowflake('TITANIC')

train[['Pclass', 'Parch', 'Sex', 'Survived']].head()
Copy
    Pclass  Parch     Sex       Survived
0       3      0     male               0
1       1      0   female               1
2       3      0   female               1
3       1      0   female               1
4       3      0     male               0
import altair as alt
# Convert to pandas DataFrame
train_df_pandas = train.to_pandas()
survived_per_age_plot = alt.Chart(train_df_pandas).mark_bar(
).encode(
    x=alt.X('Age', bin=alt.Bin(maxbins=25)),
    y='count()',
    column='Survived:N',
    color='Survived:N',
).properties(
    width=300,
    height=300
).configure_axis(
    grid=False
)
survived_per_age_plot
Copy
altair

pandasに変換した後、scikit-learnを使って簡単なモデルを学習することができます。

feature_cols = ['Pclass', 'Parch']
# Convert features DataFrame to pandas DataFrames
X_pandas = train_snowpark_pandas.loc[:, feature_cols].to_pandas()
# Convert labels Series to pandas Series
y_pandas = train_snowpark_pandas["Survived"].to_pandas()

from sklearn.linear_model import LogisticRegression

logreg = LogisticRegression()

logreg.fit(X_pandas, y_pandas)

y_pred_pandas = logreg.predict(X_pandas)

acc_eval = accuracy_score(y_pandas, y_pred_pandas)
Copy
scikitモデル

制限事項

pandas on Snowflakeには以下の制限があります。

  • pandas on Snowflakeは、 OSS サードパーティライブラリとの互換性を保証するものではありません。しかし、バージョン1.14.0a1以降、Snowpark pandasは NumPy の互換性、特に np.where の使用を限定的に導入しています。詳細については、 NumPy の相互運用性 をご参照ください。

    Snowpark pandas DataFrame を使用してサードパーティライブラリ APIs を呼び出す場合は、サードパーティライブラリの呼び出しに DataFrame を渡す前に to_pandas() を呼び出して、Snowpark pandas DataFrame をpandas DataFrame に変換することをSnowflakeはお勧めします。詳細については、 サードパーティライブラリでpandas on Snowflakeを使用する をご参照ください。

  • pandas on Snowflakeは Snowpark ML と統合されていません。Snowpark ML を使用する場合は、Snowpark ML を呼び出す前に to_snowpark() を使用して、Snowpark pandas DataFrame をSnowpark DataFrame に変換することをお勧めします。

  • 遅延 MultiIndex オブジェクトはサポートされていません。 MultiIndex が使用されると、ネイティブpandas MultiIndex オブジェクトが返され、すべてのデータをクライアント側にプルする必要があります。

  • pandas on Snowflakeでは、まだすべてのpandas APIs に分散実装があるわけではありません。サポートされていない APIs の場合は、 NotImplementedError がスローされます。分散実装を持たない操作は、ストアドプロシージャにフォールバックします。サポートされる APIs については、 API のリファレンスドキュメントをご参照ください。

  • pandas on Snowflakeは特定のpandasバージョンを必要とします。pandas on Snowflakeはpandas 2.2.1を必要とし、pandas 2.2.1との互換性のみを提供します。

  • pandas on Snowflake apply() 関数内で pandas on Snowflake を参照することはできません。 apply() 内では、ネイティブpandasしか使用できません。

トラブルシューティング

このセクションでは、pandas on Snowflakeを使用する際のトラブルシューティングのヒントについて説明します。

  • トラブルシューティングの場合には、ネイティブpandas DataFrame (またはサンプル)で同じ操作を実行してみて、pandasで同じエラーが続くかどうかを確認してください。この方法により、クエリを修正するヒントが得られる場合があります。例:

    df = pd.DataFrame({"a": [1,2,3], "b": ["x", "y", "z"]})
    # Running this in Snowpark pandas throws an error
    df["A"].sum()
    # Convert a small sample of 10 rows to pandas DataFrame for testing
    pandas_df = df.head(10).to_pandas()
    # Run the same operation. KeyError indicates that the column reference is incorrect
    pandas_df["A"].sum()
    # Fix the column reference to get the Snowpark pandas query working
    df["a"].sum()
    
    Copy
  • 長時間実行のノートブックを開いている場合、デフォルトでは、セッションが240分(4時間)アイドル状態になると、Snowflakeセッションがタイムアウトすることに注意してください。セッションの有効期限が切れると、pandas on Snowflakeのクエリを追加で実行した場合に以下のエラーが発生します。「Authentication token has expired.The user must authenticate again.」この時点で、Snowflakeへの接続を再度確立する必要があります。この結果、未接続のセッション変数が失われる可能性があります。セッションアイドルタイムアウトパラメーターの構成方法の詳細については、 セッションポリシー をご参照ください。

ベストプラクティス

このセクションでは、pandas on Snowflakeを使用する際のベストプラクティスについて説明します。

  • for ループ、 iterrowsiteritems などの反復コードパターンの使用は避けてください。反復コードパターンにより、生成されるクエリの複雑さが急速に増大します。pandas on Snowflakeに、クライアントコードではなくデータ分散と計算の並列化を実行させます。反復コードパターンに関しては、 DataFrame 全体に対して実行できる操作を探し、それに対応する操作を代わりに使用するようにします。

for i in np.arange(0, 50):
  if i % 2 == 0:
    data = pd.concat([data, pd.DataFrame({'A': i, 'B': i + 1}, index=[0])], ignore_index=True)
  else:
    data = pd.concat([data, pd.DataFrame({'A': i}, index=[0])], ignore_index=True)

# Instead of creating one DataFrame per row and concatenating them,
# try to directly create the DataFrame out of the data, like this:

data = pd.DataFrame(
      {
          "A": range(0, 50),
          "B": [i + 1 if i % 2 == 0 else None for i in range(50)],
      },
)
Copy
  • applyapplymaptransform の呼び出しは避けます。これらは最終的に UDFsUDTFs で実装されますが、通常の SQL クエリほどパフォーマンスが高くない可能性があります。適用された関数に同等の DataFrame または系列操作がある場合は、代わりにその操作を使用します。たとえば、 df.groupby('col1').apply('sum') の代わりに、 df.groupby('col1').sum() を直接呼び出します。

  • サードパーティライブラリの呼び出しに DataFrame または series を渡す前に、 to_pandas() を呼び出します。pandas on Snowflake は、サードパーティライブラリとの互換性を保証していません。

  • 余計なI/Oオーバーヘッドを避けるために、具現化された通常のSnowflakeテーブルを使用します。pandas on Snowflakeは、通常のテーブルに対してのみ機能するデータスナップショットの上で動作します。外部テーブル、ビュー、 Apache Iceberg™ テーブルを含む他の型では、スナップショットを取得する前に仮テーブルが作成されます。そのため、追加の具現化オーバーヘッドが必要になります。

  • pandas on Snowflakeは、Snowflakeテーブルから read_snowflake を使って DataFrames を作成する際に、高速なゼロコピークローン機能を提供します。ただし、スナップショット機能は、通常のデータベースでは通常のSnowflakeテーブルに対してのみ提供されます。ハイブリッド、icebergなどの型を持つテーブル、または共有データベース下のテーブルをロードする場合、通常のSnowflakeテーブルに対する追加のマテリアライゼーションが導入されます。スナップショットはデータの一貫性と順序を保証するために必要であり、現在のところ、余分な実体化を回避する他の方法はありません。pandas on Snowflakeを使用する場合は、可能なかぎり通常のSnowflakeのテーブルを使用するようにしてください。

  • 他の操作に進む前に結果の型を再度確認し、必要に応じて astype で明示的に型をキャストします。

    型推論機能が限られているため、型ヒントが与えられていない場合、 df.apply は、結果すべてに整数値が含まれていても、オブジェクト(バリアント)型の結果を返します。他の操作で dtypeint にする必要がある場合は、 astype メソッドを呼び出して明示的に型をキャストし、列の型を修正してから処理を続行します。

  • 評価や具現化が必要でない場合は、 APIs の呼び出しを避けます。

    SeriesDataframe を返さない APIs には、正しい型で結果を得るために評価と具現化を実行する必要がありますプロットの方法も同じです。不要な評価や具現化を最小限に抑えるために、これらの APIs への呼び出しを減らします。

  • 大規模データセットでは、 np.where(<cond>, <scalar>, n) の呼び出しを避けます。 <scalar> は、時間がかかる可能性のある <cond> のサイズの DataFrame にブロードキャストされます。

  • 反復的に構築されるクエリを扱う場合、 df.cache_result は、中間結果を具現化することで、繰り返される評価を減らし、クエリ全体の待ち時間を改善し、複雑さを軽減することができます。例:

    df = pd.read_snowflake('pandas_test')
    df2 = pd.pivot_table(df, index='index_col', columns='pivot_col') # expensive operation
    df3 = df.merge(df2)
    df4 = df3.where(df2 == True)
    
    Copy

    上記の例では、 df2 を生成するクエリは計算コストが高く、 df3df4 の両方の生成に再利用されます。 df2 を仮テーブルに具現化する(df2 を含む後続操作をピボットではなくテーブルスキャンにする)と、コードブロック全体の遅延時間を短縮できます。

    df = pd.read_snowflake('pandas_test')
    df2 = pd.pivot_table(df, index='index_col', columns='pivot_col') # expensive operation
    df2.cache_result(inplace=True)
    df3 = df.merge(df2)
    df4 = df3.where(df2 == True)
    
    Copy

以下はpandas操作を使ったコード例です。まず、 pandas_test という名前のSnowpark pandas DataFrame から始めます。これには、 COL_STRCOL_FLOAT、 そして COL_INT の3つの列があります。これらの例に関連するノートブックを表示するには、 Snowflake-Labsリポジトリ内の pandas on Snowflakeの例 をご参照ください。

import modin.pandas as pd
import snowflake.snowpark.modin.plugin

from snowflake.snowpark import Session

CONNECTION_PARAMETERS = {
    'account': '<myaccount>',
    'user': '<myuser>',
    'password': '<mypassword>',
    'role': '<myrole>',
    'database': '<mydatabase>',
    'schema': '<myschema>',
    'warehouse': '<mywarehouse>',
}
session = Session.builder.configs(CONNECTION_PARAMETERS).create()

df = pd.DataFrame([['a', 2.1, 1],['b', 4.2, 2],['c', 6.3, None]], columns=["COL_STR", "COL_FLOAT", "COL_INT"])

df
Copy
  COL_STR    COL_FLOAT    COL_INT
0       a          2.1        1.0
1       b          4.2        2.0
2       c          6.3        NaN

例を通して使用する DataFrame を pandas_test という名前のSnowflakeテーブルとして保存します。

df.to_snowflake("pandas_test", if_exists='replace',index=False)
Copy

次に、Snowflakeテーブルから DataFrame を作成します。列 COL_INT をドロップし、結果を列名 row_position でSnowflakeに保存します。

# Create a DataFrame out of a Snowflake table.
df = pd.read_snowflake('pandas_test')

df.shape
Copy
(3, 3)
df.head(2)
Copy
    COL_STR  COL_FLOAT  COL_INT
0         a        2.1        1
1         b        4.2        2
df.dropna(subset=["COL_FLOAT"], inplace=True)

df
Copy
    COL_STR  COL_FLOAT  COL_INT
0         a        2.1        1
1         c        6.3        2
df.shape
Copy
(2, 3)
df.dtypes
Copy
COL_STR       object
COL_FLOAT    float64
COL_INT        int64
dtype: object
# Save the result back to Snowflake with a row_pos column.
df.reset_index(drop=True).to_snowflake('pandas_test2', if_exists='replace', index=True, index_label=['row_pos'])
Copy

新しいテーブル、 pandas_test2 は次のようになります。

     row_pos  COL_STR  COL_FLOAT  COL_INT
0          1         a       2.0        1
1          2         b       4.0        2

IO (読み取りおよび書き込み)

# Reading and writing to Snowflake
df = pd.DataFrame({"fruit": ["apple", "orange"], "size": [3.4, 5.4], "weight": [1.4, 3.2]})
df.to_snowflake("test_table", if_exists="replace", index=False )

df_table = pd.read_snowflake("test_table")


# Generate sample CSV file
with open("data.csv", "w") as f:
    f.write('fruit,size,weight\napple,3.4,1.4\norange,5.4,3.2')
# Read from local CSV file
df_csv = pd.read_csv("data.csv")

# Generate sample JSON file
with open("data.json", "w") as f:
    f.write('{"fruit":"apple", "size":3.4, "weight":1.4},{"fruit":"orange", "size":5.4, "weight":3.2}')
# Read from local JSON file
df_json = pd.read_json('data.json')

# Upload data.json and data.csv to Snowflake stage named @TEST_STAGE
# Read CSV and JSON file from stage
df_csv = pd.read_csv('@TEST_STAGE/data.csv')
df_json = pd.read_json('@TEST_STAGE/data.json')
Copy

詳細については、 入出力 をご参照ください。

インデックス作成

df = pd.DataFrame({"a": [1,2,3], "b": ["x", "y", "z"]})
df.columns
Copy
Index(['a', 'b'], dtype='object')
df.index
Copy
Index([0, 1, 2], dtype='int8')
df["a"]
Copy
0    1
1    2
2    3
Name: a, dtype: int8
df["b"]
Copy
0    x
1    y
2    z
Name: b, dtype: object
df.iloc[0,1]
Copy
'x'
df.loc[df["a"] > 2]
Copy
a  b
2  3  z
df.columns = ["c", "d"]
df
Copy
     c  d
0    1  x
1    2  y
2    3  z
df = df.set_index("c")
df
Copy
   d
c
1  x
2  y
3  z
df.rename(columns={"d": "renamed"})
Copy
    renamed
c
1       x
2       y
3       z

欠損値

import numpy as np
df = pd.DataFrame([[np.nan, 2, np.nan, 0],
                [3, 4, np.nan, 1],
                [np.nan, np.nan, np.nan, np.nan],
                [np.nan, 3, np.nan, 4]],
                columns=list("ABCD"))
df
Copy
     A    B   C    D
0  NaN  2.0 NaN  0.0
1  3.0  4.0 NaN  1.0
2  NaN  NaN NaN  NaN
3  NaN  3.0 NaN  4.0
df.isna()
Copy
       A      B     C      D
0   True  False  True  False
1  False  False  True  False
2   True   True  True   True
3   True  False  True  False
df.fillna(0)
Copy
     A    B    C    D
0   0.0  2.0  0.0  0.0
1   3.0  4.0  0.0  1.0
2   0.0  0.0  0.0  0.0
3   0.0  3.0  0.0  4.0
df.dropna(how="all")
Copy
     A    B   C    D
0   NaN  2.0 NaN  0.0
1   3.0  4.0 NaN  1.0
3   NaN  3.0 NaN  4.0

型変換

df = pd.DataFrame({"int": [1,2,3], "str": ["4", "5", "6"]})
df
Copy
   int str
0    1   4
1    2   5
2    3   6
df_float = df.astype(float)
df_float
Copy
   int  str
0  1.0  4.0
1  2.0  5.0
2  3.0  6.0
df_float.dtypes
Copy
int    float64
str    float64
dtype: object
pd.to_numeric(df.str)
Copy
0    4.0
1    5.0
2    6.0
Name: str, dtype: float64
df = pd.DataFrame({'year': [2015, 2016],
                'month': [2, 3],
                'day': [4, 5]})
pd.to_datetime(df)
Copy
0   2015-02-04
1   2016-03-05
dtype: datetime64[ns]

2項演算

df_1 = pd.DataFrame([[1,2,3],[4,5,6]])
df_2 = pd.DataFrame([[6,7,8]])
df_1.add(df_2)
Copy
    0    1     2
0  7.0  9.0  11.0
1  NaN  NaN   NaN
s1 = pd.Series([1, 2, 3])
s2 = pd.Series([2, 2, 2])
s1 + s2
Copy
0    3
1    4
2    5
dtype: int64
df = pd.DataFrame({"A": [1,2,3], "B": [4,5,6]})
df["A+B"] = df["A"] + df["B"]
df
Copy
   A  B  A+B
0  1  4    5
1  2  5    7
2  3  6    9

集計

df = pd.DataFrame([[1, 2, 3],
                [4, 5, 6],
                [7, 8, 9],
                [np.nan, np.nan, np.nan]],
                columns=['A', 'B', 'C'])
df.agg(['sum', 'min'])
Copy
        A     B     C
sum  12.0  15.0  18.0
min   1.0   2.0   3.0
df.median()
Copy
A    4.0
B    5.0
C    6.0
dtype: float64

マージ

df1 = pd.DataFrame({'lkey': ['foo', 'bar', 'baz', 'foo'],
                    'value': [1, 2, 3, 5]})
df1
Copy
  lkey  value
0  foo      1
1  bar      2
2  baz      3
3  foo      5
df2 = pd.DataFrame({'rkey': ['foo', 'bar', 'baz', 'foo'],
                    'value': [5, 6, 7, 8]})
df2
Copy
  rkey  value
0  foo      5
1  bar      6
2  baz      7
3  foo      8
df1.merge(df2, left_on='lkey', right_on='rkey')
Copy
  lkey  value_x rkey  value_y
0  foo        1  foo        5
1  foo        1  foo        8
2  bar        2  bar        6
3  baz        3  baz        7
4  foo        5  foo        5
5  foo        5  foo        8
df = pd.DataFrame({'key': ['K0', 'K1', 'K2', 'K3', 'K4', 'K5'],
                'A': ['A0', 'A1', 'A2', 'A3', 'A4', 'A5']})
df
Copy
  key   A
0  K0  A0
1  K1  A1
2  K2  A2
3  K3  A3
4  K4  A4
5  K5  A5
other = pd.DataFrame({'key': ['K0', 'K1', 'K2'],
                    'B': ['B0', 'B1', 'B2']})
df.join(other, lsuffix='_caller', rsuffix='_other')
Copy
  key_caller   A key_other     B
0         K0  A0        K0    B0
1         K1  A1        K1    B1
2         K2  A2        K2    B2
3         K3  A3      None  None
4         K4  A4      None  None
5         K5  A5      None  None

グループ別

df = pd.DataFrame({'Animal': ['Falcon', 'Falcon','Parrot', 'Parrot'],
               'Max Speed': [380., 370., 24., 26.]})

df
Copy
   Animal  Max Speed
0  Falcon      380.0
1  Falcon      370.0
2  Parrot       24.0
3  Parrot       26.0
df.groupby(['Animal']).mean()
Copy
        Max Speed
Animal
Falcon      375.0
Parrot       25.0

詳細については、 GroupBy をご参照ください。

ピボット

df = pd.DataFrame({"A": ["foo", "foo", "foo", "foo", "foo",
                        "bar", "bar", "bar", "bar"],
                "B": ["one", "one", "one", "two", "two",
                        "one", "one", "two", "two"],
                "C": ["small", "large", "large", "small",
                        "small", "large", "small", "small",
                        "large"],
                "D": [1, 2, 2, 3, 3, 4, 5, 6, 7],
                "E": [2, 4, 5, 5, 6, 6, 8, 9, 9]})
df
Copy
     A    B      C  D  E
0  foo  one  small  1  2
1  foo  one  large  2  4
2  foo  one  large  2  5
3  foo  two  small  3  5
4  foo  two  small  3  6
5  bar  one  large  4  6
6  bar  one  small  5  8
7  bar  two  small  6  9
8  bar  two  large  7  9
pd.pivot_table(df, values='D', index=['A', 'B'],
                   columns=['C'], aggfunc="sum")
Copy
    C    large  small
A   B
bar one    4.0      5
    two    7.0      6
foo one    4.0      1
    two    NaN      6
df = pd.DataFrame({'foo': ['one', 'one', 'one', 'two', 'two', 'two'],
                'bar': ['A', 'B', 'C', 'A', 'B', 'C'],
                'baz': [1, 2, 3, 4, 5, 6],
                'zoo': ['x', 'y', 'z', 'q', 'w', 't']})
df
Copy
   foo bar  baz zoo
0  one   A    1   x
1  one   B    2   y
2  one   C    3   z
3  two   A    4   q
4  two   B    5   w
5  two   C    6   t

リソース