pandas on Snowflake

pandas on Snowflake를 사용하면 pandas 코드를 분산된 방식으로 Snowflake의 데이터에서 직접 실행할 수 있습니다. 가져오기 문과 몇 줄의 코드만 변경하면 익숙한 pandas 환경을 Snowflake의 확장성 및 보안 이점과 함께 활용할 있습니다. pandas on Snowflake를 사용하면 훨씬 더 큰 데이터 세트로 작업할 수 있으며, pandas 파이프라인을 다른 빅데이터 프레임워크로 포팅하거나 크고 비싼 머신을 프로비저닝하는 데 드는 시간과 비용을 피할 수 있습니다. SQL로의 변환을 통해 기본적으로 Snowflake에서 워크로드를 실행하여 병렬화와 Snowflake의 데이터 거버넌스 및 보안 이점을 활용할 수 있습니다. pandas on Snowflake는 Snowpark Python 라이브러리 의 일부인 Snowpark pandas API를 통해 제공되며 Snowflake 플랫폼 내에서 Python 코드의 확장 가능한 데이터 처리를 활성화합니다.

pandas on Snowflake 사용의 이점

  • Python 개발자에게 익숙한 인터페이스 제공 – pandas on Snowflake는 Snowflake에서 기본적으로 실행할 수 있는 pandas 호환 계층을 공급하여 Python 개발자에게 익숙한 인터페이스를 제공합니다.

  • 확장 가능한 분산형 pandas – Snowflake에서 pandas on Snowflake는 기존의 쿼리 최적화 기법을 활용하여 pandas의 편의성과 Snowflake의 확장성을 연결합니다. 최소한의 코드 재작성만 필요하므로 마이그레이션 과정이 간소화되어 프로토타입에서 프로덕션으로 원활하게 변환할 수 있습니다.

  • 보안 및 거버넌스 – 데이터는 Snowflake의 보안 플랫폼을 벗어나지 않습니다. pandas on Snowflake는 데이터 조직 내에서 데이터 액세스 방식에 대한 일관성을 유지하고 감사 및 거버넌스를 더 쉽게 수행할 수 있게 해줍니다.

  • 관리 및 조정할 추가 컴퓨팅 인프라가 필요 없음 - pandas on Snowflake는 Snowflake의 강력한 컴퓨팅 엔진을 활용하므로 추가 컴퓨팅 인프라를 설정하거나 관리할 필요가 없습니다.

pandas on Snowflake를 사용해야 하는 경우

다음 중 하나라도 해당하는 경우 pandas on Snowflake를 사용해야 합니다.

  • 사용자가 pandas API와 더 광범위한 PyData 생태계에 대해 익숙합니다.

  • 사용자가 pandas에 익숙하고 동일한 코드베이스에서 협업하고자 하는 다른 사람들과 함께 팀에서 일하고 있습니다.

  • pandas로 작성된 기존 코드가 있습니다.

  • 워크플로에 주문 관련 요구 사항이 있는 경우 pandas DataFrames에서 지원합니다. 예를 들어, 전체 워크플로에 대해 데이터 세트가 동일한 순서로 있어야 합니다.

  • 보다 정확한 코드 완성을 위해 AI 기반 Copilot 도구를 선호합니다.

pandas on Snowflake 시작하기

pandas on Snowflake를 설치하려면 conda 또는 pip를 사용하여 패키지를 설치할 수 있습니다. 자세한 지침은 설치 섹션을 참조하십시오.

pip install "snowflake-snowpark-python[modin]"
Copy

pandas on Snowflake가 설치되면 pandas를 import pandas as pd 로 가져오는 대신 다음 두 줄을 사용합니다.

import modin.pandas as pd
import snowflake.snowpark.modin.plugin
Copy

다음은 Modin을 사용하여 pandas on Snowpark Python 라이브러리를 통해 pandas on Snowflake를 사용하는 방법의 예제입니다.

import modin.pandas as pd

# Import the Snowpark plugin for modin.
import snowflake.snowpark.modin.plugin

# Create a Snowpark session with a default connection.
from snowflake.snowpark.session import Session
session = Session.builder.create()

# Create a Snowpark pandas DataFrame from existing Snowflake table
df = pd.read_snowflake('SNOWFALL')

# Alternatively, create a Snowpark pandas DataFrame with sample data.
df = pd.DataFrame([[1, 'Big Bear', 8],[2, 'Big Bear', 10],[3, 'Big Bear', None],
                    [1, 'Tahoe', 3],[2, 'Tahoe', None],[3, 'Tahoe', 13],
                    [1, 'Whistler', None],['Friday', 'Whistler', 40],[3, 'Whistler', 25]],
                    columns=["DAY", "LOCATION", "SNOWFALL"])

# Inspect the DataFrame
df
Copy
      DAY  LOCATION  SNOWFALL
0       1  Big Bear       8.0
1       2  Big Bear      10.0
2       3  Big Bear       NaN
3       1     Tahoe       3.0
4       2     Tahoe       NaN
5       3     Tahoe      13.0
6       1  Whistler       NaN
7  Friday  Whistler      40.0
8       3  Whistler      25.0
# In-place point update to fix data error.
df.loc[df["DAY"]=="Friday","DAY"]=2

# Inspect the columns after update.
# Note how the data type is updated automatically after transformation.
df["DAY"]
Copy
0    1
1    2
2    3
3    1
4    2
5    3
6    1
7    2
8    3
Name: DAY, dtype: int64
# Drop rows with null values.
df.dropna()
Copy
  DAY  LOCATION  SNOWFALL
0   1  Big Bear       8.0
1   2  Big Bear      10.0
3   1     Tahoe       3.0
5   3     Tahoe      13.0
7   2  Whistler      40.0
8   3  Whistler      25.0
# Compute the average daily snowfall across locations.
df.groupby("LOCATION").mean()["SNOWFALL"]
Copy
LOCATION
Big Bear     9.0
Tahoe        8.0
Whistler    32.5
Name: SNOWFALL, dtype: float64

Snowpark DataFrames과 함께 pandas on Snowflake 사용하기

pandas on Snowflake와 DataFrame API는 상호 운용성이 뛰어나기 때문에 두 가지 APIs를 모두 활용하는 파이프라인을 구축할 수 있습니다.

다음 작업을 사용하여 Snowpark DataFrames와 Snowpark pandas DataFrames 간의 변환을 수행할 수 있습니다.

작업

입력

출력

참고

to_snowpark_pandas

Snowpark DataFrame

Snowpark pandas DataFrame

이 작업은 각 행에 암시적 순서를 할당하고 DataFrame의 수명 동안 이 행 순서를 유지합니다. 이 변환에는 I/O 비용이 발생합니다.

to_snowpark

Snowpark pandas DataFrame 또는 Snowpark pandas 시리즈

Snowpark DataFrame

이 작업은 행 순서를 유지하지 않으며, 결과 Snowpark DataFrame은 소스 Snowpark pandas DataFrame의 데이터 스냅샷에서 작동합니다. 테이블에서 직접 생성된 Snowpark DataFrames와 달리, 이 동작은 기초 테이블의 변경 사항이 Snowpark 작업을 평가하는 동안 반영되지 않음을 의미합니다. DDL 작업과 제한된 DML 작업은 DataFrame에 적용할 수 없습니다. 이 변환에는 I/O 비용이 발생하지 않습니다.

가능하면 불필요한 변환 비용을 방지하기 위해 Snowpark DataFrame으로 변환하는 대신 read_snowflake 를 사용하여 Snowflake에서 테이블을 직접 읽어오는 것이 좋습니다.

자세한 내용은 Snowpark DataFrames vs Snowpark pandas DataFrame: 어떤 것을 선택해야 할까요? 섹션을 참조하십시오.

pandas on Snowflake와 네이티브 pandas 비교 방법

pandas on Snowflake와 네이티브 pandas는 일치하는 시그니처와 유사한 DataFrame APIs와 유사한 의미 체계를 가지고 있습니다. pandas on Snowflake는 네이티브 pandas와 동일한 API 시그니처를 제공하며(pandas 2.2.1), 확장 가능한 계산을 Snowflake로 제공합니다. pandas on Snowflake는 네이티브 pandas 문서에서 설명하는 의미 체계를 최대한 존중하지만 Snowflake 계산 및 유형 시스템을 사용합니다. 그러나 네이티브 pandas가 클라이언트 머신에서 실행되면 Python 계산 및 유형 시스템을 사용합니다. pandas on Snowflake와 Snowflake 간의 유형 매핑에 대한 정보는 데이터 타입 섹션을 참조하십시오.

네이티브 pandas와 마찬가지로 pandas on Snowflake도 인덱스 개념을 가지고 있으며 행 순서를 유지합니다. 그러나 서로 다른 실행 환경으로 인해 동작에 미묘한 차이가 있습니다. 이 섹션에서는 주의해야 할 주요 차이점을 설명합니다.

pandas on Snowflake는 이미 Snowflake에 있는 데이터와 함께 사용하는 것이 가장 좋지만, 다음 작업을 사용하여 네이티브 pandas와 pandas on Snowflake 간에 변환할 수 있습니다.

작업

입력

출력

참고

to_pandas

Snowpark pandas DataFrame

네이티브 pandas DataFrame

모든 데이터를 로컬 환경에 구체화합니다. 데이터 세트가 큰 경우 메모리 부족 오류가 발생할 수 있습니다.

pd.DataFrame(…)

네이티브 pandas DataFrame, 원시 데이터, Snowpark pandas 오브젝트

Snowpark pandas DataFrame

소규모 DataFrames용으로 예약해야 합니다. 대량의 로컬 데이터로 DataFrame을 생성하면 임시 테이블이 생성되고 데이터 업로드에 따른 성능 문제가 발생할 수 있습니다.

session.write_pandas

네이티브 pandas DataFrame, Snowpark pandas 오브젝트

Snowflake 테이블

이후 write_pandas 호출에 지정된 테이블 이름을 사용하여 pd.read_snowflake 를 사용하여 결과를 pandas on Snowflake에 로드할 수 있습니다.

실행 환경

  • pandas: 단일 머신에서 동작하고 메모리에 있는 데이터를 처리합니다.

  • pandas on Snowflake: Snowflake와 통합되어 여러 머신 클러스터에서 분산 컴퓨팅이 가능합니다. 이러한 통합을 통해 단일 머신의 메모리 용량을 초과하는 훨씬 더 큰 데이터 세트를 처리할 수 있습니다. Snowpark pandas API를 사용하려면 Snowflake에 연결해야 합니다.

지연 평가 vs. 즉시 평가

  • pandas: 각 작업 후에 즉시 작업을 실행하고 결과를 메모리에 완전히 구체화합니다. 이 즉시 작업 평가는 머신 내에서 데이터를 광범위하게 이동해야 하기 때문에 메모리 부담을 가중시킬 수 있습니다.

  • pandas on Snowflake: pandas와 동일한 API 경험을 제공합니다. pandas의 즉시 평가 모델을 모방하지만, 내부적으로는 지연 평가 쿼리 그래프를 구축하여 작업 전반에 걸쳐 최적화를 가능하게 합니다.

    쿼리 그래프를 통해 작업을 융합하고 트랜스파일링하면 기본 분산형 Snowflake 컴퓨팅 엔진에 대한 추가적인 최적화 기회를 얻을 수 있어, pandas를 Snowflake 내에서 직접 실행할 때보다 비용과 엔드투엔드 파이프라인 런타임이 모두 감소합니다.

    참고

    반환값이 Snowpark pandas 오브젝트(즉, DataFrame, Series 또는 Index)가 아닌 I/O 관련 APIs 및 APIs는 항상 즉시 평가됩니다. 예:

    • read_snowflake

    • to_snowflake

    • to_pandas

    • to_dict

    • to_list

    • __repr__

    • dunder 메서드 __array__ 는 scikit-learn과 같은 일부 서드 파티 라이브러리에서 자동으로 호출할 수 있습니다. 이 메서드를 호출하면 로컬 머신에 결과가 표시됩니다.

데이터 소스 및 저장소

  • pandas: IO 도구(텍스트, CSV, HDF5, …) 에서 pandas 설명서에 나열된 다양한 독자 및 작성자를 지원합니다.

  • pandas on Snowflake: Snowflake 테이블에서 읽고 쓸 수 있으며 로컬 또는 스테이징된 CSV, JSON 또는 parquet 파일을 읽을 수 있습니다. 자세한 내용은 IO(읽기 및 쓰기) 섹션을 참조하십시오.

데이터 타입

  • pandas: 정수, 부동 소수점, 문자열, datetime 타입, 범주형 타입 등 다양한 데이터 타입을 지원합니다. 또한 사용자 정의 데이터 타입도 지원합니다. pandas의 데이터 타입은 일반적으로 기본 데이터에서 파생되며 엄격하게 적용됩니다.

  • pandas on Snowflake: pandas 데이터 타입을 Snowflake의 SQL 타입으로 변환하여 pandas 오브젝트를 SQL에 매핑하는 Snowflake 타입 시스템에 의해 제약됩니다. 대부분의 pandas 유형은 Snowflake에서 자연스럽게 대응되지만 매핑이 항상 1:1로 이루어지는 것은 아닙니다. 경우에 따라 여러 개의 pandas 유형이 동일한 SQL 유형에 매핑되는 경우도 있습니다.

다음 테이블에는 pandas와 Snowflake SQL 간 유형 매핑이 나와 있습니다.

pandas 타입

Snowflake 유형

모든 부호화/부호화되지 않은 정수 유형, pandas 확장 정수 유형 포함

NUMBER(38, 0)

모든 부동소수점 타입, pandas 확장 부동소수점 데이터 타입 포함

FLOAT

bool, BooleanDtype

BOOLEAN

str, StringDtype

STRING

datetime.time

TIME

datetime.date

DATE

모든 시간대 무관 datetime 타입

TIMESTAMP_NTZ

모든 시간대 인식 datetime 타입

TIMESTAMP_TZ

list, tuple, array

ARRAY

dict, json

MAP

혼합 데이터 타입을 갖는 오브젝트 열

VARIANT

Timedelta64[ns]

NUMBER(38, 0)

참고

범주형, 기간형, 간격형, 희소형 및 사용자 정의 데이터 타입은 지원되지 않습니다. Timedelta는 현재 Snowpark 클라이언트의 panda에서만 지원됩니다. Timedelta를 다시 Snowflake에 쓰면 Number 유형으로 저장됩니다.

다음 테이블은 df.dtypes 를 사용하여 Snowflake SQL 유형을 pandas on Snowflake 유형에 다시 매핑한 것입니다.

Snowflake 유형

pandas on Snowflake 유형(df.dtypes)

NUMBER(scale = 0)

int64

NUMBER (scale > 0), REAL

float64

BOOLEAN

bool

STRING, TEXT

object (str)

VARIANT, BINARY, GEOMETRY, GEOGRAPHY

object

ARRAY

object (list)

OBJECT

object (dict)

TIME

object (datetime.time)

TIMESTAMP, TIMESTAMP_NTZ, TIMESTAMP_LTZ, TIMESTAMP_TZ

datetime64[ns]

DATE

object (datetime.date)

Snowpark pandas DataFrame에서 to_pandas() 를 사용하는 네이티브 pandas DataFrame으로 변환할 때, 네이티브 pandas DataFrame은 함수 및 프로시저에 대해 SQL-Python 데이터 타입 매핑 와 호환되는 pandas on Snowflake에에 비해 정제된 데이터 타입을 갖게 됩니다.

형변환 및 타입 추론

  • pandas: NumPy 를 준수하며 기본적으로 암시적 형변환과 추론을 위해 NumPy 및 Python 타입 시스템을 따릅니다. 예를 들어, 부울을 정수형으로 취급하므로 1 + True2 를 반환합니다.

  • pandas on Snowflake: 앞의 테이블에 따라 NumPy 및 Python 유형을 Snowflake 유형에 매핑하고, 암시적 타입 형변환 및 추론 에 기본 Snowflake 타입 시스템을 사용합니다. 예를 들어, 논리 데이터 타입 에 따라 부울을 정수 유형으로 암시적으로 변환하지 않으므로 1 + True 는 형변환 오류가 발생합니다.

Null 값 처리

  • pandas: pandas 1.x 버전에서는 누락된 데이터를 처리 할 때 유연성이 우수하므로 모든 Python None, np.nan, pd.NaN, pd.NApd.NaT 를 누락된 값으로 처리했습니다. 이후 버전의 pandas(2.2.x)에서는 이러한 값이 다른 값으로 처리됩니다.

  • pandas on Snowflake: 나열된 모든 앞의 값을 누락된 값으로 처리하는 이전 pandas 버전과 유사한 접근법을 채택합니다. Snowpark는 pandas의 NaN, NANaT 를 재사용합니다. 그러나 이러한 모든 누락된 값은 서로 교환 가능하도록 취급되며 Snowflake 테이블에 SQL NULL로 저장됩니다.

오프셋/빈도 별칭

  • pandas: 버전 2.2.1에서는 pandas의 날짜 오프셋이 변경되었습니다. 한 글자 별칭 'M', 'Q', 'Y' 등은 더 이상 두 글자 오프셋용으로 사용되지 않습니다.

  • pandas on Snowflake: pandas 시계열 설명서 에 설명되는 새로운 오프셋을 독점적으로 사용합니다.

설치

전제 조건: Python 3.9, 3.10 또는 3.11, modin 버전 0.28.1, pandas 버전 2.2.1이 필요합니다.

Snowflake Notebooks 에서 pandas on Snowflake를 사용하려면 노트북의 pandas on Snowflake 의 설정 지침을 참조하십시오.

개발 환경에 pandas on Snowflake를 설치하려면 다음 단계를 따릅니다.

  1. 프로젝트 디렉터리로 변경하고 Python 가상 환경을 활성화합니다.

    참고

    API는 현재 개발 중이므로 시스템 전체에 설치하기보다는 Python 가상 환경에 설치하는 것을 권장합니다. 그러면 생성하는 각 프로젝트에서 특정 버전을 사용할 수 있으므로 향후 버전이 변경되는 것을 방지할 수 있습니다.

    Anaconda, Miniconda 또는 virtualenv 와 같은 도구를 사용하여 특정 Python 버전에 대한 Python 가상 환경을 만들 수 있습니다.

    예를 들어, conda를 사용하여 Python 3.9 가상 환경을 구축하려면 다음을 입력합니다.

    conda create --name snowpark_pandas python=3.9
    conda activate snowpark_pandas
    
    Copy

    참고

    이전에 Python 3.8 및 pandas 1.5.3을 사용하여 이전 버전의 pandas on Snowflake를 설치한 경우 위에 설명된 대로 Python 및 pandas 버전을 업그레이드해야 합니다. Python 3.9, 3.10 또는 3.11로 새 환경을 구축하는 단계를 따르십시오.

  2. Modin으로 Snowpark Python 라이브러리를 설치합니다.

    pip install "snowflake-snowpark-python[modin]"
    
    Copy

    또는

    conda install snowflake-snowpark-python modin==0.28.1
    
    Copy

참고

snowflake-snowpark-python 버전 1.17.0 이상이 설치되어 있는지 확인합니다.

Snowflake에 인증하기

pandas on Snowflake 를 사용하기 전에 Snowflake 데이터베이스와 세션을 설정해야 합니다. 세션에 대한 연결 매개 변수를 선택하려면 구성 파일을 사용하거나 코드에서 열거할 수 있습니다. 자세한 내용은 Snowpark Python용 세션 만들기 섹션을 참조하십시오. 활성 상태의 고유 Snowpark Python 세션이 존재하면 pandas on Snowflake가 자동으로 해당 세션을 사용합니다. 예:

import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session

CONNECTION_PARAMETERS = {
    'account': '<myaccount>',
    'user': '<myuser>',
    'password': '<mypassword>',
    'role': '<myrole>',
    'database': '<mydatabase>',
    'schema': '<myschema>',
    'warehouse': '<mywarehouse>',
}
session = Session.builder.configs(CONNECTION_PARAMETERS).create()

# pandas on Snowflake will automatically pick up the Snowpark session created above.
# It will use that session to create new DataFrames.
df = pd.DataFrame([1, 2])
df2 = pd.read_snowflake('CUSTOMER')
Copy

pd.session 은 Snowpark 세션이므로 다른 Snowpark 세션에서 할 수 있는 모든 기능을 수행할 수 있습니다. 예를 들어, 이를 사용하여 임의의 SQL 쿼리를 실행할 수 있으며, 이 쿼리는 세션 API 에 따라 Snowpark DataFrame을 생성하지만, 그 결과는 Snowpark pandas DataFrame이 아닌 Snowpark DataFrame이 된다는 점에 유의하십시오.

# pd.session is the session that pandas on Snowflake is using for new DataFrames.
# In this case it is the same as the Snowpark session that we've created.
assert pd.session is session

# Run SQL query with returned result as Snowpark DataFrame
snowpark_df = pd.session.sql('select * from customer')
snowpark_df.show()
Copy

또는 구성 파일 에서 Snowpark 연결 매개 변수를 구성할 수 있습니다. 이렇게 하면 코드에서 연결 매개 변수를 열거할 필요가 없으므로 일반적으로 pandas 코드를 작성할 때와 거의 동일하게 pandas on Snowflake 코드를 작성할 수 있습니다. 이를 위해 ~/.snowflake/connections.toml 에 다음과 같은 구성 파일을 생성합니다.

default_connection_name = "default"

[default]
account = "<myaccount>"
user = "<myuser>"
password = "<mypassword>"
role="<myrole>"
database = "<mydatabase>"
schema = "<myschema>"
warehouse = "<mywarehouse>"
Copy

그런 다음 코드에서 snowflake.snowpark.Session.builder.create() 를 사용하여 이러한 자격 증명을 사용하여 세션을 생성하기만 하면 됩니다.

import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session

# Session.builder.create() will create a default Snowflake connection.
Session.builder.create()
# create a DataFrame.
df = pd.DataFrame([[1, 2], [3, 4]])
Copy

여러 개의 Snowpark 세션을 생성한 다음 그 중 하나를 pandas on Snowflake에 할당할 수도 있습니다. pandas on Snowflake는 하나의 세션만 사용하므로 세션 중 하나를 명시적으로 pd.session = pandas_session 을 사용하여 pandas on Snowflake에 할당해야 합니다.

import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session

pandas_session = Session.builder.configs({"user": "<user>", "password": "<password>", "account": "<account1>").create()
other_session = Session.builder.configs({"user": "<user>", "password": "<password>", "account": "<account2>").create()
pd.session = pandas_session
df = pd.DataFrame([1, 2, 3])
Copy

다음 예제는 활성 Snowpark 세션이 없는 상태에서 pandas on Snowflake를 사용하려고 하면 “pandas on Snowflake requires an active snowpark session, but there is none.”와 같은 오류와 함께 SnowparkSessionException 이 발생하는 것을 보여 줍니다. 세션을 생성한 후에는 pandas on Snowflake를 사용할 수 있습니다. 예:

import modin.pandas as pd
import snowflake.snowpark.modin.plugin

df = pd.DataFrame([1, 2, 3])
Copy

다음 예제에서는 활성 Snowpark 세션이 여러 개 있을 때 pandas on Snowflake를 사용하려고 하면 “There are multiple active snowpark sessions, but you need to choose one for pandas on Snowflake.”라는 메시지와 함께 SnowparkSessionException 이 발생한다는 것을 보여 줍니다.

import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session

pandas_session = Session.builder.configs({"user": "<user>", "password": "<password>", "account": "<account1>"}).create()
other_session = Session.builder.configs({"user": "<user>", "password": "<password>", "account": "<account2>"}).create()
df = pd.DataFrame([1, 2, 3])
Copy

참고

새로운 Snowpark pandas DataFrame 또는 계열에 사용되는 세션을 modin.pandas.session 을 통해 설정해야 합니다. 단, 다른 세션에서 생성된 DataFrames을 합치거나 병합하는 것은 지원되지 않으므로 워크플로에서 다른 세션을 설정하고 다른 세션으로 DataFrames을 반복해서 생성하는 것은 피해야 합니다.

API 참조

현재 구현된 APIs와 사용 가능한 메서드의 전체 목록은 pandas on Snowflake API 참조 섹션을 참조하십시오.

지원되는 작업의 전체 목록은 pandas on Snowflake 참조의 다음 테이블을 참조하십시오.

Snowflake 노트북에서 pandas on Snowflake 사용하기

Snowflake 노트북에서 pandas on Snowflake를 사용하려면 Snowflake 노트북의 pandas on Snowflake 를 참고하십시오.

저장 프로시저에서 pandas on Snowflake 사용하기

저장 프로시저의 pandas on Snowflake를 사용하여 데이터 파이프라인을 구축하고 작업 으로 저장 프로시저 의 실행을 예약할 수 있습니다.

from snowflake.snowpark.context import get_active_session
session = get_active_session()

from snowflake.snowpark import Session

def data_transformation_pipeline(session: Session) -> str:
  import modin.pandas as pd
  import snowflake.snowpark.modin.plugin
  from datetime import datetime
  # Create a Snowpark pandas DataFrame with sample data.
  df = pd.DataFrame([[1, 'Big Bear', 8],[2, 'Big Bear', 10],[3, 'Big Bear', None],
                     [1, 'Tahoe', 3],[2, 'Tahoe', None],[3, 'Tahoe', 13],
                     [1, 'Whistler', None],['Friday', 'Whistler', 40],[3, 'Whistler', 25]],
                      columns=["DAY", "LOCATION", "SNOWFALL"])
  # Drop rows with null values.
  df = df.dropna()
  # In-place point update to fix data error.
  df.loc[df["DAY"]=="Friday","DAY"]=2
  # Save Results as a Snowflake Table
  timestamp = datetime.now().strftime("%Y_%m_%d_%H_%M")
  save_path = f"OUTPUT_{timestamp}"
  df.to_snowflake(name=save_path, if_exists="replace", index=False)
  return f'Transformed DataFrame saved to {save_path}.'


  dt_pipeline_sproc = session.sproc.register(name="run_data_transformation_pipeline_sp",
                             func=data_transformation_pipeline,
                             replace=True,
                             packages=['modin', 'snowflake-snowpark-python'])
Copy

저장 프로시저를 호출하려면 Python에서 CALL run_data_transformation_pipeline_sp() 를 실행하거나 SQL에서 dt_pipeline_sproc() 를 실행하면 됩니다.

저장 프로시저를 작업으로 예약하려면 Snowflake Python API 를 사용하여 작업을 생성하면 됩니다.

서드 파티 라이브러리와 함께 pandas on Snowflake 사용하기

서드 파티 라이브러리 APIs를 서드 파티 라이브러리 DataFrame으로 호출하는 경우, 서드 파티 라이브러리 호출에 DataFrame을 전달하기 전에 to_pandas() 를 호출하여 Snowpark pandas DataFrame을 pandas DataFrame으로 변환하는 것을 권장합니다.

참고

to_pandas() 를 호출하면 데이터가 Snowflake에서 메모리로 이동하므로 대규모 데이터 세트와 민감한 사용 사례의 경우 이 점을 염두에 두십시오.

pandas on Snowflake는 현재 np.where 에 대한 분산 구현 및 df.plot 과의 상호 운용성 등 특정 NumPyMatplotlib APIs 에 대한 호환성이 제한되어 있습니다. 이러한 서드 파티 라이브러리로 작업할 때 to_pandas() 를 통해 Snowpark pandas DataFrames을 변환하면 여러 번의 I/O 호출을 피할 수 있습니다.

다음은 시각화용 Altair 와 머신 러닝용 scikit-learn 을 사용한 예제입니다.

# Create a Snowpark session with a default connection.
session = Session.builder.create()

train = pd.read_snowflake('TITANIC')

train[['Pclass', 'Parch', 'Sex', 'Survived']].head()
Copy
    Pclass  Parch     Sex       Survived
0       3      0     male               0
1       1      0   female               1
2       3      0   female               1
3       1      0   female               1
4       3      0     male               0
import altair as alt
# Convert to pandas DataFrame
train_df_pandas = train.to_pandas()
survived_per_age_plot = alt.Chart(train_df_pandas).mark_bar(
).encode(
    x=alt.X('Age', bin=alt.Bin(maxbins=25)),
    y='count()',
    column='Survived:N',
    color='Survived:N',
).properties(
    width=300,
    height=300
).configure_axis(
    grid=False
)
survived_per_age_plot
Copy
altair

이제 pandas로 변환한 후 scikit-learn을 사용하여 간단한 모델을 훈련시킬 수 있습니다.

feature_cols = ['Pclass', 'Parch']
# Convert features DataFrame to pandas DataFrames
X_pandas = train_snowpark_pandas.loc[:, feature_cols].to_pandas()
# Convert labels Series to pandas Series
y_pandas = train_snowpark_pandas["Survived"].to_pandas()

from sklearn.linear_model import LogisticRegression

logreg = LogisticRegression()

logreg.fit(X_pandas, y_pandas)

y_pred_pandas = logreg.predict(X_pandas)

acc_eval = accuracy_score(y_pandas, y_pred_pandas)
Copy
scikit 모델

제한 사항

pandas on Snowflake에는 다음과 같은 제한 사항이 있습니다.

  • pandas on Snowflake는 OSS 서드 파티 라이브러리와의 호환성을 보장하지 않습니다. 그러나 1.14.0a1 버전부터 Snowpark pandas에는 NumPy, 특히 np.where 사용에 대한 제한적 호환성이 적용됩니다. 자세한 내용은 NumPy 상호 운용성 섹션을 참조하십시오.

    서드 파티 라이브러리 APIs를 서드 파티 라이브러리 pandas DataFrame으로 호출하는 경우, 서드 파티 라이브러리 호출에 DataFrame을 전달하기 전에 to_pandas() 를 호출하여 Snowpark pandas DataFrame을 pandas DataFrame으로 변환할 것을 Snowflake는 권장합니다. 자세한 내용은 서드 파티 라이브러리와 함께 pandas on Snowflake 사용하기 섹션을 참조하십시오.

  • pandas on Snowflake는 Snowpark ML 과 통합되지 않습니다. Snowpark ML을 사용하는 경우, Snowpark ML을 호출하기 전에 to_snowpark() 를 사용하여 Snowpark pandas DataFrame을 Snowpark DataFrame으로 변환하는 것이 좋습니다.

  • 지연 MultiIndex 오브젝트는 지원되지 않습니다. MultiIndex 를 사용하면 모든 데이터를 클라이언트 측으로 가져와야 하는 네이티브 pandas MultiIndex 오브젝트를 반환합니다.

  • 모든 pandas APIs가 아직 pandas on Snowflake에서 분산 구현된 것은 아닙니다. 지원되지 않는 APIs의 경우, NotImplementedError 가 throw됩니다. 분산 구현이 없는 작업은 저장 프로시저로 대체됩니다. 지원되는 APIs에 대한 자세한 내용은 API 참조 설명서를 참조하십시오.

  • pandas on Snowflake에는 특정 pandas 버전이 필요합니다. pandas on Snowflake는 pandas 2.2.1이 필요하며, pandas 2.2.1과의 호환성만 제공합니다.

  • pandas on Snowflake는 pandas on Snowflake apply() 함수 내에서 참조할 수 없습니다. 네이티브 pandas는 apply() 내에서만 사용할 수 있습니다.

문제 해결하기

이 섹션에서는 pandas on Snowflake를 사용할 때 발생하는 문제 해결 팁을 설명합니다.

  • 문제를 해결할 때 네이티브 pandas DataFrame(또는 샘플)에서 동일한 작업을 실행하여 동일한 오류가 지속되는지 확인합니다. 이러한 접근법은 사용자의 쿼리를 수정하는 방법에 대한 힌트를 제공할 수 있습니다. 예:

    df = pd.DataFrame({"a": [1,2,3], "b": ["x", "y", "z"]})
    # Running this in Snowpark pandas throws an error
    df["A"].sum()
    # Convert a small sample of 10 rows to pandas DataFrame for testing
    pandas_df = df.head(10).to_pandas()
    # Run the same operation. KeyError indicates that the column reference is incorrect
    pandas_df["A"].sum()
    # Fix the column reference to get the Snowpark pandas query working
    df["a"].sum()
    
    Copy
  • 장기 실행 중인 노트북이 열려 있는 경우, 기본적으로 세션이 240분(4시간) 동안 유휴 상태가 되면 Snowflake 세션 시간 제한이 설정됩니다. 세션이 만료되면 추가 pandas on Snowflake 쿼리를 실행하면 “인증 토큰이 만료되었습니다. 사용자가 다시 인증해야 합니다.” 오류가 발생합니다. 이 시점에 Snowflake에 대한 연결을 다시 설정해야 합니다. 이로 인해 보존되지 않은 세션 변수가 손실될 수 있습니다. 세션 유휴 시간 제한 매개 변수를 구성하는 방법에 대한 자세한 내용은 세션 정책 섹션을 참조하십시오.

모범 사례

이 섹션에서는 pandas on Snowflake를 사용할 때 따라야 할 모범 사례에 대해 설명합니다.

  • for 루프, iterrows, iteritems 등 반복적 코드 패턴을 사용하지 마십시오. 반복적인 코드 패턴은 생성되는 쿼리 복잡성을 빠르게 증가시킵니다. 데이터 배포 및 계산 병렬화는 클라이언트 코드가 아닌 pandas on Snowflake가 수행하도록 합니다. 반복적인 코드 패턴의 경우 전체 DataFrame에서 수행할 수 있는 연산을 찾아서 해당 작업을 대신 사용합니다.

for i in np.arange(0, 50):
  if i % 2 == 0:
    data = pd.concat([data, pd.DataFrame({'A': i, 'B': i + 1}, index=[0])], ignore_index=True)
  else:
    data = pd.concat([data, pd.DataFrame({'A': i}, index=[0])], ignore_index=True)

# Instead of creating one DataFrame per row and concatenating them,
# try to directly create the DataFrame out of the data, like this:

data = pd.DataFrame(
      {
          "A": range(0, 50),
          "B": [i + 1 if i % 2 == 0 else None for i in range(50)],
      },
)
Copy
  • apply, applymaptransform 을 호출하지 마십시오. 그러면 UDFs 또는 UDTFs 가 구현되며, 이는 일반 SQL 쿼리에 비해 성능이 떨어질 수 있습니다. 적용된 함수에 동등한 DataFrame 또는 계열 작업이 있는 경우 해당 작업을 대신 사용합니다. 예를 들어, df.groupby('col1').apply('sum') 이 아닌 df.groupby('col1').sum() 을 직접 호출하십시오.

  • 서드 파티 라이브러리 호출에 DataFrame 또는 계열을 전달하기 전에 to_pandas() 를 호출합니다. pandas on Snowflake는 서드 파티 라이브러리와의 호환성을 보장하지 않습니다.

  • 추가 I/O 오버헤드를 피하려면 구체화된 일반 Snowflake 테이블을 사용합니다. pandas on Snowflake는 일반 테이블에서만 작동하는 데이터 스냅샷을 기반으로 작동합니다. 외부 테이블, 뷰 및 Apache Iceberg™ 테이블을 포함한 다른 유형의 경우 스냅샷을 생성하기 전에 임시 테이블이 생성되므로 구체화 오버헤드가 추가로 발생합니다.

  • pandas on Snowflake는 빠른 제로 카피 복제 기능을 제공하는 동시에 read_snowflake 를 사용하여 Snowflake 테이블에서 DataFrames을 생성합니다. 그러나 스냅샷 기능은 일반 데이터베이스의 일반 Snowflake 테이블에만 제공됩니다. 하이브리드, Iceberg 등의 유형이 포함된 테이블이나 공유 데이터베이스의 테이블을 로드하는 경우 일반 Snowflake 테이블에 대한 추가적인 구체화가 도입됩니다. 스냅샷은 데이터 일관성 및 주문 보장을 위해 필요하며, 현재 추가 구체화를 해결할 수 있는 다른 방법이 없으므로 pandas on Snowflake를 사용할 때는 가급적 일반 Snowflake 테이블을 사용하시기 바랍니다.

  • 다른 작업을 진행하기 전에 결과 타입을 다시 확인하고 필요한 경우 astype 을 사용하여 명시적 타입 형변환을 수행합니다.

    유형 추론 기능이 제한되어 있기 때문에 유형 힌트가 제공되지 않으면 결과에 모든 정수 값이 포함되어 있어도 df.apply 는 오브젝트(베리언트) 유형의 결과를 반환합니다. 다른 작업에서 dtypeint 여야 하는 경우 계속하기 전에 astype 메서드를 호출하여 열 타입을 수정하여 명시적 타입 형변환을 수행할 수 있습니다.

  • 평가와 구체화가 필요한 APIs의 경우 필요하지 않으면 호출하지 마십시오.

    Series 또는 Dataframe 을 반환하지 않는 APIs가 올바른 유형으로 결과를 도출하려면 즉시 평가와 구체화가 필요합니다. 플로팅 방법에도 마찬가지입니다. 불필요한 평가와 구체화를 최소화하기 위해 해당 APIs에 대한 호출을 줄이십시오.

  • 대규모 데이터 세트에서는 np.where(<cond>, <스칼라>, n) 를 호출하지 마십시오. <스칼라> 가 DataFrame에 <cond> 의 크기로 브로드캐스트되어 속도가 느려질 수 있습니다.

  • 반복적으로 구축된 쿼리로 작업할 때 df.cache_result 를 사용하여 중간 결과를 구체화함으로써 반복되는 평가를 줄이고 전체 쿼리의 지연 시간을 개선하고 복잡성을 줄일 수 있습니다. 예:

    df = pd.read_snowflake('pandas_test')
    df2 = pd.pivot_table(df, index='index_col', columns='pivot_col') # expensive operation
    df3 = df.merge(df2)
    df4 = df3.where(df2 == True)
    
    Copy

    위의 예제에서 df2 를 생성하는 쿼리는 계산 비용이 많이 들며, df3df4 의 생성에 모두 재사용됩니다. df2 를 임시 테이블로 구체화하면(df2 와 관련된 후속 작업을 피벗이 아닌 테이블 스캔으로 만들면) 코드 블록의 전체 대기 시간을 줄일 수 있습니다.

    df = pd.read_snowflake('pandas_test')
    df2 = pd.pivot_table(df, index='index_col', columns='pivot_col') # expensive operation
    df2.cache_result(inplace=True)
    df3 = df.merge(df2)
    df4 = df3.where(df2 == True)
    
    Copy

다음은 pandas 작업이 포함된 코드 예제입니다. 세 개의 열(COL_STR, COL_FLOATCOL_INT)을 포함하는 pandas_test 라는 이름의 Snowpark pandas DataFrame으로 시작합니다. 이 예제와 관련된 노트북을 보려면 Snowflake-Labs 리포지토리의 pandas on Snowflake 예제 를 참조하십시오.

import modin.pandas as pd
import snowflake.snowpark.modin.plugin

from snowflake.snowpark import Session

CONNECTION_PARAMETERS = {
    'account': '<myaccount>',
    'user': '<myuser>',
    'password': '<mypassword>',
    'role': '<myrole>',
    'database': '<mydatabase>',
    'schema': '<myschema>',
    'warehouse': '<mywarehouse>',
}
session = Session.builder.configs(CONNECTION_PARAMETERS).create()

df = pd.DataFrame([['a', 2.1, 1],['b', 4.2, 2],['c', 6.3, None]], columns=["COL_STR", "COL_FLOAT", "COL_INT"])

df
Copy
  COL_STR    COL_FLOAT    COL_INT
0       a          2.1        1.0
1       b          4.2        2.0
2       c          6.3        NaN

DataFrame을 이름이 pandas_test 인 Snowflake 테이블로 저장하며, 이 테이블은 예제 전체에서 사용할 것입니다.

df.to_snowflake("pandas_test", if_exists='replace',index=False)
Copy

다음으로, Snowflake 테이블에서 DataFrame을 생성합니다. 열 COL_INT 를 삭제한 다음 결과를 row_position 열을 사용하여 Snowflake에 다시 저장합니다.

# Create a DataFrame out of a Snowflake table.
df = pd.read_snowflake('pandas_test')

df.shape
Copy
(3, 3)
df.head(2)
Copy
    COL_STR  COL_FLOAT  COL_INT
0         a        2.1        1
1         b        4.2        2
df.dropna(subset=["COL_FLOAT"], inplace=True)

df
Copy
    COL_STR  COL_FLOAT  COL_INT
0         a        2.1        1
1         c        6.3        2
df.shape
Copy
(2, 3)
df.dtypes
Copy
COL_STR       object
COL_FLOAT    float64
COL_INT        int64
dtype: object
# Save the result back to Snowflake with a row_pos column.
df.reset_index(drop=True).to_snowflake('pandas_test2', if_exists='replace', index=True, index_label=['row_pos'])
Copy

이 작업을 통해 새로운 테이블 pandas_test2 가 생성되고, 이 테이블은 다음과 같습니다.

     row_pos  COL_STR  COL_FLOAT  COL_INT
0          1         a       2.0        1
1          2         b       4.0        2

IO(읽기 및 쓰기)

# Reading and writing to Snowflake
df = pd.DataFrame({"fruit": ["apple", "orange"], "size": [3.4, 5.4], "weight": [1.4, 3.2]})
df.to_snowflake("test_table", if_exists="replace", index=False )

df_table = pd.read_snowflake("test_table")


# Generate sample CSV file
with open("data.csv", "w") as f:
    f.write('fruit,size,weight\napple,3.4,1.4\norange,5.4,3.2')
# Read from local CSV file
df_csv = pd.read_csv("data.csv")

# Generate sample JSON file
with open("data.json", "w") as f:
    f.write('{"fruit":"apple", "size":3.4, "weight":1.4},{"fruit":"orange", "size":5.4, "weight":3.2}')
# Read from local JSON file
df_json = pd.read_json('data.json')

# Upload data.json and data.csv to Snowflake stage named @TEST_STAGE
# Read CSV and JSON file from stage
df_csv = pd.read_csv('@TEST_STAGE/data.csv')
df_json = pd.read_json('@TEST_STAGE/data.json')
Copy

자세한 내용은 입력/출력 섹션을 참조하십시오.

인덱싱

df = pd.DataFrame({"a": [1,2,3], "b": ["x", "y", "z"]})
df.columns
Copy
Index(['a', 'b'], dtype='object')
df.index
Copy
Index([0, 1, 2], dtype='int8')
df["a"]
Copy
0    1
1    2
2    3
Name: a, dtype: int8
df["b"]
Copy
0    x
1    y
2    z
Name: b, dtype: object
df.iloc[0,1]
Copy
'x'
df.loc[df["a"] > 2]
Copy
a  b
2  3  z
df.columns = ["c", "d"]
df
Copy
     c  d
0    1  x
1    2  y
2    3  z
df = df.set_index("c")
df
Copy
   d
c
1  x
2  y
3  z
df.rename(columns={"d": "renamed"})
Copy
    renamed
c
1       x
2       y
3       z

누락된 값

import numpy as np
df = pd.DataFrame([[np.nan, 2, np.nan, 0],
                [3, 4, np.nan, 1],
                [np.nan, np.nan, np.nan, np.nan],
                [np.nan, 3, np.nan, 4]],
                columns=list("ABCD"))
df
Copy
     A    B   C    D
0  NaN  2.0 NaN  0.0
1  3.0  4.0 NaN  1.0
2  NaN  NaN NaN  NaN
3  NaN  3.0 NaN  4.0
df.isna()
Copy
       A      B     C      D
0   True  False  True  False
1  False  False  True  False
2   True   True  True   True
3   True  False  True  False
df.fillna(0)
Copy
     A    B    C    D
0   0.0  2.0  0.0  0.0
1   3.0  4.0  0.0  1.0
2   0.0  0.0  0.0  0.0
3   0.0  3.0  0.0  4.0
df.dropna(how="all")
Copy
     A    B   C    D
0   NaN  2.0 NaN  0.0
1   3.0  4.0 NaN  1.0
3   NaN  3.0 NaN  4.0

형식 변환

df = pd.DataFrame({"int": [1,2,3], "str": ["4", "5", "6"]})
df
Copy
   int str
0    1   4
1    2   5
2    3   6
df_float = df.astype(float)
df_float
Copy
   int  str
0  1.0  4.0
1  2.0  5.0
2  3.0  6.0
df_float.dtypes
Copy
int    float64
str    float64
dtype: object
pd.to_numeric(df.str)
Copy
0    4.0
1    5.0
2    6.0
Name: str, dtype: float64
df = pd.DataFrame({'year': [2015, 2016],
                'month': [2, 3],
                'day': [4, 5]})
pd.to_datetime(df)
Copy
0   2015-02-04
1   2016-03-05
dtype: datetime64[ns]

이진 연산

df_1 = pd.DataFrame([[1,2,3],[4,5,6]])
df_2 = pd.DataFrame([[6,7,8]])
df_1.add(df_2)
Copy
    0    1     2
0  7.0  9.0  11.0
1  NaN  NaN   NaN
s1 = pd.Series([1, 2, 3])
s2 = pd.Series([2, 2, 2])
s1 + s2
Copy
0    3
1    4
2    5
dtype: int64
df = pd.DataFrame({"A": [1,2,3], "B": [4,5,6]})
df["A+B"] = df["A"] + df["B"]
df
Copy
   A  B  A+B
0  1  4    5
1  2  5    7
2  3  6    9

집계

df = pd.DataFrame([[1, 2, 3],
                [4, 5, 6],
                [7, 8, 9],
                [np.nan, np.nan, np.nan]],
                columns=['A', 'B', 'C'])
df.agg(['sum', 'min'])
Copy
        A     B     C
sum  12.0  15.0  18.0
min   1.0   2.0   3.0
df.median()
Copy
A    4.0
B    5.0
C    6.0
dtype: float64

병합

df1 = pd.DataFrame({'lkey': ['foo', 'bar', 'baz', 'foo'],
                    'value': [1, 2, 3, 5]})
df1
Copy
  lkey  value
0  foo      1
1  bar      2
2  baz      3
3  foo      5
df2 = pd.DataFrame({'rkey': ['foo', 'bar', 'baz', 'foo'],
                    'value': [5, 6, 7, 8]})
df2
Copy
  rkey  value
0  foo      5
1  bar      6
2  baz      7
3  foo      8
df1.merge(df2, left_on='lkey', right_on='rkey')
Copy
  lkey  value_x rkey  value_y
0  foo        1  foo        5
1  foo        1  foo        8
2  bar        2  bar        6
3  baz        3  baz        7
4  foo        5  foo        5
5  foo        5  foo        8
df = pd.DataFrame({'key': ['K0', 'K1', 'K2', 'K3', 'K4', 'K5'],
                'A': ['A0', 'A1', 'A2', 'A3', 'A4', 'A5']})
df
Copy
  key   A
0  K0  A0
1  K1  A1
2  K2  A2
3  K3  A3
4  K4  A4
5  K5  A5
other = pd.DataFrame({'key': ['K0', 'K1', 'K2'],
                    'B': ['B0', 'B1', 'B2']})
df.join(other, lsuffix='_caller', rsuffix='_other')
Copy
  key_caller   A key_other     B
0         K0  A0        K0    B0
1         K1  A1        K1    B1
2         K2  A2        K2    B2
3         K3  A3      None  None
4         K4  A4      None  None
5         K5  A5      None  None

Groupby

df = pd.DataFrame({'Animal': ['Falcon', 'Falcon','Parrot', 'Parrot'],
               'Max Speed': [380., 370., 24., 26.]})

df
Copy
   Animal  Max Speed
0  Falcon      380.0
1  Falcon      370.0
2  Parrot       24.0
3  Parrot       26.0
df.groupby(['Animal']).mean()
Copy
        Max Speed
Animal
Falcon      375.0
Parrot       25.0

자세한 내용은 GroupBy 섹션을 참조하십시오.

피벗

df = pd.DataFrame({"A": ["foo", "foo", "foo", "foo", "foo",
                        "bar", "bar", "bar", "bar"],
                "B": ["one", "one", "one", "two", "two",
                        "one", "one", "two", "two"],
                "C": ["small", "large", "large", "small",
                        "small", "large", "small", "small",
                        "large"],
                "D": [1, 2, 2, 3, 3, 4, 5, 6, 7],
                "E": [2, 4, 5, 5, 6, 6, 8, 9, 9]})
df
Copy
     A    B      C  D  E
0  foo  one  small  1  2
1  foo  one  large  2  4
2  foo  one  large  2  5
3  foo  two  small  3  5
4  foo  two  small  3  6
5  bar  one  large  4  6
6  bar  one  small  5  8
7  bar  two  small  6  9
8  bar  two  large  7  9
pd.pivot_table(df, values='D', index=['A', 'B'],
                   columns=['C'], aggfunc="sum")
Copy
    C    large  small
A   B
bar one    4.0      5
    two    7.0      6
foo one    4.0      1
    two    NaN      6
df = pd.DataFrame({'foo': ['one', 'one', 'one', 'two', 'two', 'two'],
                'bar': ['A', 'B', 'C', 'A', 'B', 'C'],
                'baz': [1, 2, 3, 4, 5, 6],
                'zoo': ['x', 'y', 'z', 'q', 'w', 't']})
df
Copy
   foo bar  baz zoo
0  one   A    1   x
1  one   B    2   y
2  one   C    3   z
3  two   A    4   q
4  two   B    5   w
5  two   C    6   t

리소스