pandas on Snowflake¶
pandas on Snowflake를 사용하면 pandas 코드를 분산된 방식으로 Snowflake의 데이터에서 직접 실행할 수 있습니다. 가져오기 문과 몇 줄의 코드만 변경하면 익숙한 pandas 환경을 Snowflake의 확장성 및 보안 이점과 함께 활용할 있습니다. pandas on Snowflake를 사용하면 훨씬 더 큰 데이터 세트로 작업할 수 있으며, pandas 파이프라인을 다른 빅데이터 프레임워크로 포팅하거나 크고 비싼 머신을 프로비저닝하는 데 드는 시간과 비용을 피할 수 있습니다. SQL로의 변환을 통해 기본적으로 Snowflake에서 워크로드를 실행하여 병렬화와 Snowflake의 데이터 거버넌스 및 보안 이점을 활용할 수 있습니다. pandas on Snowflake는 Snowpark Python 라이브러리 의 일부인 Snowpark pandas API를 통해 제공되며 Snowflake 플랫폼 내에서 Python 코드의 확장 가능한 데이터 처리를 활성화합니다.
pandas on Snowflake 사용의 이점¶
Python 개발자에게 익숙한 인터페이스 제공 – pandas on Snowflake는 Snowflake에서 기본적으로 실행할 수 있는 pandas 호환 계층을 공급하여 Python 개발자에게 익숙한 인터페이스를 제공합니다.
확장 가능한 분산형 pandas – Snowflake에서 pandas on Snowflake는 기존의 쿼리 최적화 기법을 활용하여 pandas의 편의성과 Snowflake의 확장성을 연결합니다. 최소한의 코드 재작성만 필요하므로 마이그레이션 과정이 간소화되어 프로토타입에서 프로덕션으로 원활하게 변환할 수 있습니다.
보안 및 거버넌스 – 데이터는 Snowflake의 보안 플랫폼을 벗어나지 않습니다. pandas on Snowflake는 데이터 조직 내에서 데이터 액세스 방식에 대한 일관성을 유지하고 감사 및 거버넌스를 더 쉽게 수행할 수 있게 해줍니다.
관리 및 조정할 추가 컴퓨팅 인프라가 필요 없음 - pandas on Snowflake는 Snowflake의 강력한 컴퓨팅 엔진을 활용하므로 추가 컴퓨팅 인프라를 설정하거나 관리할 필요가 없습니다.
pandas on Snowflake를 사용해야 하는 경우¶
다음 중 하나라도 해당하는 경우 pandas on Snowflake를 사용해야 합니다.
사용자가 pandas API와 더 광범위한 PyData 생태계에 대해 익숙합니다.
사용자가 pandas에 익숙하고 동일한 코드베이스에서 협업하고자 하는 다른 사람들과 함께 팀에서 일하고 있습니다.
pandas로 작성된 기존 코드가 있습니다.
워크플로에 주문 관련 요구 사항이 있는 경우 pandas DataFrames에서 지원합니다. 예를 들어, 전체 워크플로에 대해 데이터 세트가 동일한 순서로 있어야 합니다.
보다 정확한 코드 완성을 위해 AI 기반 Copilot 도구를 선호합니다.
pandas on Snowflake 시작하기¶
pandas on Snowflake를 설치하려면 conda 또는 pip를 사용하여 패키지를 설치할 수 있습니다. 자세한 지침은 설치 섹션을 참조하십시오.
pip install "snowflake-snowpark-python[modin]"
pandas on Snowflake가 설치되면 pandas를 import pandas as pd
로 가져오는 대신 다음 두 줄을 사용합니다.
import modin.pandas as pd
import snowflake.snowpark.modin.plugin
다음은 Modin을 사용하여 pandas on Snowpark Python 라이브러리를 통해 pandas on Snowflake를 사용하는 방법의 예제입니다.
import modin.pandas as pd
# Import the Snowpark plugin for modin.
import snowflake.snowpark.modin.plugin
# Create a Snowpark session with a default connection.
from snowflake.snowpark.session import Session
session = Session.builder.create()
# Create a Snowpark pandas DataFrame from existing Snowflake table
df = pd.read_snowflake('SNOWFALL')
# Alternatively, create a Snowpark pandas DataFrame with sample data.
df = pd.DataFrame([[1, 'Big Bear', 8],[2, 'Big Bear', 10],[3, 'Big Bear', None],
[1, 'Tahoe', 3],[2, 'Tahoe', None],[3, 'Tahoe', 13],
[1, 'Whistler', None],['Friday', 'Whistler', 40],[3, 'Whistler', 25]],
columns=["DAY", "LOCATION", "SNOWFALL"])
# Inspect the DataFrame
df
DAY LOCATION SNOWFALL
0 1 Big Bear 8.0
1 2 Big Bear 10.0
2 3 Big Bear NaN
3 1 Tahoe 3.0
4 2 Tahoe NaN
5 3 Tahoe 13.0
6 1 Whistler NaN
7 Friday Whistler 40.0
8 3 Whistler 25.0
# In-place point update to fix data error.
df.loc[df["DAY"]=="Friday","DAY"]=2
# Inspect the columns after update.
# Note how the data type is updated automatically after transformation.
df["DAY"]
0 1
1 2
2 3
3 1
4 2
5 3
6 1
7 2
8 3
Name: DAY, dtype: int64
# Drop rows with null values.
df.dropna()
DAY LOCATION SNOWFALL
0 1 Big Bear 8.0
1 2 Big Bear 10.0
3 1 Tahoe 3.0
5 3 Tahoe 13.0
7 2 Whistler 40.0
8 3 Whistler 25.0
# Compute the average daily snowfall across locations.
df.groupby("LOCATION").mean()["SNOWFALL"]
LOCATION
Big Bear 9.0
Tahoe 8.0
Whistler 32.5
Name: SNOWFALL, dtype: float64
Snowpark DataFrames과 함께 pandas on Snowflake 사용하기¶
pandas on Snowflake와 DataFrame API는 상호 운용성이 뛰어나기 때문에 두 가지 APIs를 모두 활용하는 파이프라인을 구축할 수 있습니다.
다음 작업을 사용하여 Snowpark DataFrames와 Snowpark pandas DataFrames 간의 변환을 수행할 수 있습니다.
작업 |
입력 |
출력 |
참고 |
---|---|---|---|
Snowpark DataFrame |
Snowpark pandas DataFrame |
이 작업은 각 행에 암시적 순서를 할당하고 DataFrame의 수명 동안 이 행 순서를 유지합니다. 이 변환에는 I/O 비용이 발생합니다. |
|
Snowpark pandas DataFrame 또는 Snowpark pandas 시리즈 |
Snowpark DataFrame |
이 작업은 행 순서를 유지하지 않으며, 결과 Snowpark DataFrame은 소스 Snowpark pandas DataFrame의 데이터 스냅샷에서 작동합니다. 테이블에서 직접 생성된 Snowpark DataFrames와 달리, 이 동작은 기초 테이블의 변경 사항이 Snowpark 작업을 평가하는 동안 반영되지 않음을 의미합니다. DDL 작업과 제한된 DML 작업은 DataFrame에 적용할 수 없습니다. 이 변환에는 I/O 비용이 발생하지 않습니다. |
가능하면 불필요한 변환 비용을 방지하기 위해 Snowpark DataFrame으로 변환하는 대신 read_snowflake 를 사용하여 Snowflake에서 테이블을 직접 읽어오는 것이 좋습니다.
자세한 내용은 Snowpark DataFrames vs Snowpark pandas DataFrame: 어떤 것을 선택해야 할까요? 섹션을 참조하십시오.
pandas on Snowflake와 네이티브 pandas 비교 방법¶
pandas on Snowflake와 네이티브 pandas는 일치하는 시그니처와 유사한 DataFrame APIs와 유사한 의미 체계를 가지고 있습니다. pandas on Snowflake는 네이티브 pandas와 동일한 API 시그니처를 제공하며(pandas 2.2.1), 확장 가능한 계산을 Snowflake로 제공합니다. pandas on Snowflake는 네이티브 pandas 문서에서 설명하는 의미 체계를 최대한 존중하지만 Snowflake 계산 및 유형 시스템을 사용합니다. 그러나 네이티브 pandas가 클라이언트 머신에서 실행되면 Python 계산 및 유형 시스템을 사용합니다. pandas on Snowflake와 Snowflake 간의 유형 매핑에 대한 정보는 데이터 타입 섹션을 참조하십시오.
네이티브 pandas와 마찬가지로 pandas on Snowflake도 인덱스 개념을 가지고 있으며 행 순서를 유지합니다. 그러나 서로 다른 실행 환경으로 인해 동작에 미묘한 차이가 있습니다. 이 섹션에서는 주의해야 할 주요 차이점을 설명합니다.
pandas on Snowflake는 이미 Snowflake에 있는 데이터와 함께 사용하는 것이 가장 좋지만, 다음 작업을 사용하여 네이티브 pandas와 pandas on Snowflake 간에 변환할 수 있습니다.
작업 |
입력 |
출력 |
참고 |
---|---|---|---|
Snowpark pandas DataFrame |
네이티브 pandas DataFrame |
모든 데이터를 로컬 환경에 구체화합니다. 데이터 세트가 큰 경우 메모리 부족 오류가 발생할 수 있습니다. |
|
네이티브 pandas DataFrame, 원시 데이터, Snowpark pandas 오브젝트 |
Snowpark pandas DataFrame |
소규모 DataFrames용으로 예약해야 합니다. 대량의 로컬 데이터로 DataFrame을 생성하면 임시 테이블이 생성되고 데이터 업로드에 따른 성능 문제가 발생할 수 있습니다. |
|
네이티브 pandas DataFrame, Snowpark pandas 오브젝트 |
Snowflake 테이블 |
이후 |
실행 환경¶
pandas
: 단일 머신에서 동작하고 메모리에 있는 데이터를 처리합니다.pandas on Snowflake
: Snowflake와 통합되어 여러 머신 클러스터에서 분산 컴퓨팅이 가능합니다. 이러한 통합을 통해 단일 머신의 메모리 용량을 초과하는 훨씬 더 큰 데이터 세트를 처리할 수 있습니다. Snowpark pandas API를 사용하려면 Snowflake에 연결해야 합니다.
지연 평가 vs. 즉시 평가¶
pandas
: 각 작업 후에 즉시 작업을 실행하고 결과를 메모리에 완전히 구체화합니다. 이 즉시 작업 평가는 머신 내에서 데이터를 광범위하게 이동해야 하기 때문에 메모리 부담을 가중시킬 수 있습니다.pandas on Snowflake
: pandas와 동일한 API 경험을 제공합니다. pandas의 즉시 평가 모델을 모방하지만, 내부적으로는 지연 평가 쿼리 그래프를 구축하여 작업 전반에 걸쳐 최적화를 가능하게 합니다.쿼리 그래프를 통해 작업을 융합하고 트랜스파일링하면 기본 분산형 Snowflake 컴퓨팅 엔진에 대한 추가적인 최적화 기회를 얻을 수 있어, pandas를 Snowflake 내에서 직접 실행할 때보다 비용과 엔드투엔드 파이프라인 런타임이 모두 감소합니다.
참고
반환값이 Snowpark pandas 오브젝트(즉,
DataFrame
,Series
또는Index
)가 아닌 I/O 관련 APIs 및 APIs는 항상 즉시 평가됩니다. 예:read_snowflake
to_snowflake
to_pandas
to_dict
to_list
__repr__
dunder 메서드
__array__
는 scikit-learn과 같은 일부 서드 파티 라이브러리에서 자동으로 호출할 수 있습니다. 이 메서드를 호출하면 로컬 머신에 결과가 표시됩니다.
데이터 소스 및 저장소¶
pandas
: IO 도구(텍스트, CSV, HDF5, …) 에서 pandas 설명서에 나열된 다양한 독자 및 작성자를 지원합니다.pandas on Snowflake
: Snowflake 테이블에서 읽고 쓸 수 있으며 로컬 또는 스테이징된 CSV, JSON 또는 parquet 파일을 읽을 수 있습니다. 자세한 내용은 IO(읽기 및 쓰기) 섹션을 참조하십시오.
데이터 타입¶
pandas
: 정수, 부동 소수점, 문자열,datetime
타입, 범주형 타입 등 다양한 데이터 타입을 지원합니다. 또한 사용자 정의 데이터 타입도 지원합니다. pandas의 데이터 타입은 일반적으로 기본 데이터에서 파생되며 엄격하게 적용됩니다.pandas on Snowflake
: pandas 데이터 타입을 Snowflake의 SQL 타입으로 변환하여 pandas 오브젝트를 SQL에 매핑하는 Snowflake 타입 시스템에 의해 제약됩니다. 대부분의 pandas 유형은 Snowflake에서 자연스럽게 대응되지만 매핑이 항상 1:1로 이루어지는 것은 아닙니다. 경우에 따라 여러 개의 pandas 유형이 동일한 SQL 유형에 매핑되는 경우도 있습니다.
다음 테이블에는 pandas와 Snowflake SQL 간 유형 매핑이 나와 있습니다.
pandas 타입 |
Snowflake 유형 |
---|---|
모든 부호화/부호화되지 않은 정수 유형, pandas 확장 정수 유형 포함 |
NUMBER(38, 0) |
모든 부동소수점 타입, pandas 확장 부동소수점 데이터 타입 포함 |
FLOAT |
|
BOOLEAN |
|
STRING |
|
TIME |
|
DATE |
모든 시간대 무관 |
TIMESTAMP_NTZ |
모든 시간대 인식 |
TIMESTAMP_TZ |
|
ARRAY |
|
MAP |
혼합 데이터 타입을 갖는 오브젝트 열 |
VARIANT |
Timedelta64[ns] |
NUMBER(38, 0) |
참고
범주형, 기간형, 간격형, 희소형 및 사용자 정의 데이터 타입은 지원되지 않습니다. Timedelta는 현재 Snowpark 클라이언트의 panda에서만 지원됩니다. Timedelta를 다시 Snowflake에 쓰면 Number 유형으로 저장됩니다.
다음 테이블은 df.dtypes
를 사용하여 Snowflake SQL 유형을 pandas on Snowflake 유형에 다시 매핑한 것입니다.
Snowflake 유형 |
pandas on Snowflake 유형( |
---|---|
NUMBER( |
|
NUMBER ( |
|
BOOLEAN |
|
STRING, TEXT |
|
VARIANT, BINARY, GEOMETRY, GEOGRAPHY |
|
ARRAY |
|
OBJECT |
|
TIME |
|
TIMESTAMP, TIMESTAMP_NTZ, TIMESTAMP_LTZ, TIMESTAMP_TZ |
|
DATE |
|
Snowpark pandas DataFrame에서 to_pandas()
를 사용하는 네이티브 pandas DataFrame으로 변환할 때, 네이티브 pandas DataFrame은 함수 및 프로시저에 대해 SQL-Python 데이터 타입 매핑 와 호환되는 pandas on Snowflake에에 비해 정제된 데이터 타입을 갖게 됩니다.
형변환 및 타입 추론¶
pandas
: NumPy 를 준수하며 기본적으로 암시적 형변환과 추론을 위해 NumPy 및 Python 타입 시스템을 따릅니다. 예를 들어, 부울을 정수형으로 취급하므로1 + True
는2
를 반환합니다.pandas on Snowflake
: 앞의 테이블에 따라 NumPy 및 Python 유형을 Snowflake 유형에 매핑하고, 암시적 타입 형변환 및 추론 에 기본 Snowflake 타입 시스템을 사용합니다. 예를 들어, 논리 데이터 타입 에 따라 부울을 정수 유형으로 암시적으로 변환하지 않으므로1 + True
는 형변환 오류가 발생합니다.
Null 값 처리¶
pandas
: pandas 1.x 버전에서는 누락된 데이터를 처리 할 때 유연성이 우수하므로 모든 PythonNone
,np.nan
,pd.NaN
,pd.NA
및pd.NaT
를 누락된 값으로 처리했습니다. 이후 버전의 pandas(2.2.x)에서는 이러한 값이 다른 값으로 처리됩니다.pandas on Snowflake
: 나열된 모든 앞의 값을 누락된 값으로 처리하는 이전 pandas 버전과 유사한 접근법을 채택합니다. Snowpark는 pandas의NaN
,NA
및NaT
를 재사용합니다. 그러나 이러한 모든 누락된 값은 서로 교환 가능하도록 취급되며 Snowflake 테이블에 SQL NULL로 저장됩니다.
오프셋/빈도 별칭¶
pandas
: 버전 2.2.1에서는 pandas의 날짜 오프셋이 변경되었습니다. 한 글자 별칭'M'
,'Q'
,'Y'
등은 더 이상 두 글자 오프셋용으로 사용되지 않습니다.pandas on Snowflake
: pandas 시계열 설명서 에 설명되는 새로운 오프셋을 독점적으로 사용합니다.
설치¶
전제 조건: Python 3.9, 3.10 또는 3.11, modin 버전 0.28.1, pandas 버전 2.2.1이 필요합니다.
팁
Snowflake Notebooks 에서 pandas on Snowflake를 사용하려면 노트북의 pandas on Snowflake 의 설정 지침을 참조하십시오.
개발 환경에 pandas on Snowflake를 설치하려면 다음 단계를 따릅니다.
프로젝트 디렉터리로 변경하고 Python 가상 환경을 활성화합니다.
참고
API는 현재 개발 중이므로 시스템 전체에 설치하기보다는 Python 가상 환경에 설치하는 것을 권장합니다. 그러면 생성하는 각 프로젝트에서 특정 버전을 사용할 수 있으므로 향후 버전이 변경되는 것을 방지할 수 있습니다.
Anaconda, Miniconda 또는 virtualenv 와 같은 도구를 사용하여 특정 Python 버전에 대한 Python 가상 환경을 만들 수 있습니다.
예를 들어, conda를 사용하여 Python 3.9 가상 환경을 구축하려면 다음을 입력합니다.
conda create --name snowpark_pandas python=3.9 conda activate snowpark_pandas
참고
이전에 Python 3.8 및 pandas 1.5.3을 사용하여 이전 버전의 pandas on Snowflake를 설치한 경우 위에 설명된 대로 Python 및 pandas 버전을 업그레이드해야 합니다. Python 3.9, 3.10 또는 3.11로 새 환경을 구축하는 단계를 따르십시오.
Modin으로 Snowpark Python 라이브러리를 설치합니다.
pip install "snowflake-snowpark-python[modin]"
또는
conda install snowflake-snowpark-python modin==0.28.1
참고
snowflake-snowpark-python
버전 1.17.0 이상이 설치되어 있는지 확인합니다.
Snowflake에 인증하기¶
pandas on Snowflake 를 사용하기 전에 Snowflake 데이터베이스와 세션을 설정해야 합니다. 세션에 대한 연결 매개 변수를 선택하려면 구성 파일을 사용하거나 코드에서 열거할 수 있습니다. 자세한 내용은 Snowpark Python용 세션 만들기 섹션을 참조하십시오. 활성 상태의 고유 Snowpark Python 세션이 존재하면 pandas on Snowflake가 자동으로 해당 세션을 사용합니다. 예:
import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session
CONNECTION_PARAMETERS = {
'account': '<myaccount>',
'user': '<myuser>',
'password': '<mypassword>',
'role': '<myrole>',
'database': '<mydatabase>',
'schema': '<myschema>',
'warehouse': '<mywarehouse>',
}
session = Session.builder.configs(CONNECTION_PARAMETERS).create()
# pandas on Snowflake will automatically pick up the Snowpark session created above.
# It will use that session to create new DataFrames.
df = pd.DataFrame([1, 2])
df2 = pd.read_snowflake('CUSTOMER')
pd.session
은 Snowpark 세션이므로 다른 Snowpark 세션에서 할 수 있는 모든 기능을 수행할 수 있습니다. 예를 들어, 이를 사용하여 임의의 SQL 쿼리를 실행할 수 있으며, 이 쿼리는 세션 API 에 따라 Snowpark DataFrame을 생성하지만, 그 결과는 Snowpark pandas DataFrame이 아닌 Snowpark DataFrame이 된다는 점에 유의하십시오.
# pd.session is the session that pandas on Snowflake is using for new DataFrames.
# In this case it is the same as the Snowpark session that we've created.
assert pd.session is session
# Run SQL query with returned result as Snowpark DataFrame
snowpark_df = pd.session.sql('select * from customer')
snowpark_df.show()
또는 구성 파일 에서 Snowpark 연결 매개 변수를 구성할 수 있습니다. 이렇게 하면 코드에서 연결 매개 변수를 열거할 필요가 없으므로 일반적으로 pandas 코드를 작성할 때와 거의 동일하게 pandas on Snowflake 코드를 작성할 수 있습니다. 이를 위해 ~/.snowflake/connections.toml
에 다음과 같은 구성 파일을 생성합니다.
default_connection_name = "default"
[default]
account = "<myaccount>"
user = "<myuser>"
password = "<mypassword>"
role="<myrole>"
database = "<mydatabase>"
schema = "<myschema>"
warehouse = "<mywarehouse>"
그런 다음 코드에서 snowflake.snowpark.Session.builder.create()
를 사용하여 이러한 자격 증명을 사용하여 세션을 생성하기만 하면 됩니다.
import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session
# Session.builder.create() will create a default Snowflake connection.
Session.builder.create()
# create a DataFrame.
df = pd.DataFrame([[1, 2], [3, 4]])
여러 개의 Snowpark 세션을 생성한 다음 그 중 하나를 pandas on Snowflake에 할당할 수도 있습니다. pandas on Snowflake는 하나의 세션만 사용하므로 세션 중 하나를 명시적으로 pd.session = pandas_session
을 사용하여 pandas on Snowflake에 할당해야 합니다.
import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session
pandas_session = Session.builder.configs({"user": "<user>", "password": "<password>", "account": "<account1>").create()
other_session = Session.builder.configs({"user": "<user>", "password": "<password>", "account": "<account2>").create()
pd.session = pandas_session
df = pd.DataFrame([1, 2, 3])
다음 예제는 활성 Snowpark 세션이 없는 상태에서 pandas on Snowflake를 사용하려고 하면 “pandas on Snowflake requires an active snowpark session, but there is none.”와 같은 오류와 함께 SnowparkSessionException
이 발생하는 것을 보여 줍니다. 세션을 생성한 후에는 pandas on Snowflake를 사용할 수 있습니다. 예:
import modin.pandas as pd
import snowflake.snowpark.modin.plugin
df = pd.DataFrame([1, 2, 3])
다음 예제에서는 활성 Snowpark 세션이 여러 개 있을 때 pandas on Snowflake를 사용하려고 하면 “There are multiple active snowpark sessions, but you need to choose one for pandas on Snowflake.”라는 메시지와 함께 SnowparkSessionException
이 발생한다는 것을 보여 줍니다.
import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session
pandas_session = Session.builder.configs({"user": "<user>", "password": "<password>", "account": "<account1>"}).create()
other_session = Session.builder.configs({"user": "<user>", "password": "<password>", "account": "<account2>"}).create()
df = pd.DataFrame([1, 2, 3])
참고
새로운 Snowpark pandas DataFrame 또는 계열에 사용되는 세션을 modin.pandas.session
을 통해 설정해야 합니다. 단, 다른 세션에서 생성된 DataFrames을 합치거나 병합하는 것은 지원되지 않으므로 워크플로에서 다른 세션을 설정하고 다른 세션으로 DataFrames을 반복해서 생성하는 것은 피해야 합니다.
API 참조¶
현재 구현된 APIs와 사용 가능한 메서드의 전체 목록은 pandas on Snowflake API 참조 섹션을 참조하십시오.
지원되는 작업의 전체 목록은 pandas on Snowflake 참조의 다음 테이블을 참조하십시오.
Snowflake 노트북에서 pandas on Snowflake 사용하기¶
Snowflake 노트북에서 pandas on Snowflake를 사용하려면 Snowflake 노트북의 pandas on Snowflake 를 참고하십시오.
저장 프로시저에서 pandas on Snowflake 사용하기¶
저장 프로시저의 pandas on Snowflake를 사용하여 데이터 파이프라인을 구축하고 작업 으로 저장 프로시저 의 실행을 예약할 수 있습니다.
from snowflake.snowpark.context import get_active_session
session = get_active_session()
from snowflake.snowpark import Session
def data_transformation_pipeline(session: Session) -> str:
import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from datetime import datetime
# Create a Snowpark pandas DataFrame with sample data.
df = pd.DataFrame([[1, 'Big Bear', 8],[2, 'Big Bear', 10],[3, 'Big Bear', None],
[1, 'Tahoe', 3],[2, 'Tahoe', None],[3, 'Tahoe', 13],
[1, 'Whistler', None],['Friday', 'Whistler', 40],[3, 'Whistler', 25]],
columns=["DAY", "LOCATION", "SNOWFALL"])
# Drop rows with null values.
df = df.dropna()
# In-place point update to fix data error.
df.loc[df["DAY"]=="Friday","DAY"]=2
# Save Results as a Snowflake Table
timestamp = datetime.now().strftime("%Y_%m_%d_%H_%M")
save_path = f"OUTPUT_{timestamp}"
df.to_snowflake(name=save_path, if_exists="replace", index=False)
return f'Transformed DataFrame saved to {save_path}.'
dt_pipeline_sproc = session.sproc.register(name="run_data_transformation_pipeline_sp",
func=data_transformation_pipeline,
replace=True,
packages=['modin', 'snowflake-snowpark-python'])
저장 프로시저를 호출하려면 Python에서 CALL run_data_transformation_pipeline_sp()
를 실행하거나 SQL에서 dt_pipeline_sproc()
를 실행하면 됩니다.
저장 프로시저를 작업으로 예약하려면 Snowflake Python API 를 사용하여 작업을 생성하면 됩니다.
서드 파티 라이브러리와 함께 pandas on Snowflake 사용하기¶
서드 파티 라이브러리 APIs를 서드 파티 라이브러리 DataFrame으로 호출하는 경우, 서드 파티 라이브러리 호출에 DataFrame을 전달하기 전에 to_pandas() 를 호출하여 Snowpark pandas DataFrame을 pandas DataFrame으로 변환하는 것을 권장합니다.
참고
to_pandas()
를 호출하면 데이터가 Snowflake에서 메모리로 이동하므로 대규모 데이터 세트와 민감한 사용 사례의 경우 이 점을 염두에 두십시오.
pandas on Snowflake는 현재 np.where
에 대한 분산 구현 및 df.plot
과의 상호 운용성 등 특정 NumPy 및 Matplotlib APIs 에 대한 호환성이 제한되어 있습니다. 이러한 서드 파티 라이브러리로 작업할 때 to_pandas()
를 통해 Snowpark pandas DataFrames을 변환하면 여러 번의 I/O 호출을 피할 수 있습니다.
다음은 시각화용 Altair 와 머신 러닝용 scikit-learn 을 사용한 예제입니다.
# Create a Snowpark session with a default connection.
session = Session.builder.create()
train = pd.read_snowflake('TITANIC')
train[['Pclass', 'Parch', 'Sex', 'Survived']].head()
Pclass Parch Sex Survived
0 3 0 male 0
1 1 0 female 1
2 3 0 female 1
3 1 0 female 1
4 3 0 male 0
import altair as alt
# Convert to pandas DataFrame
train_df_pandas = train.to_pandas()
survived_per_age_plot = alt.Chart(train_df_pandas).mark_bar(
).encode(
x=alt.X('Age', bin=alt.Bin(maxbins=25)),
y='count()',
column='Survived:N',
color='Survived:N',
).properties(
width=300,
height=300
).configure_axis(
grid=False
)
survived_per_age_plot

이제 pandas로 변환한 후 scikit-learn을 사용하여 간단한 모델을 훈련시킬 수 있습니다.
feature_cols = ['Pclass', 'Parch']
# Convert features DataFrame to pandas DataFrames
X_pandas = train_snowpark_pandas.loc[:, feature_cols].to_pandas()
# Convert labels Series to pandas Series
y_pandas = train_snowpark_pandas["Survived"].to_pandas()
from sklearn.linear_model import LogisticRegression
logreg = LogisticRegression()
logreg.fit(X_pandas, y_pandas)
y_pred_pandas = logreg.predict(X_pandas)
acc_eval = accuracy_score(y_pandas, y_pred_pandas)

제한 사항¶
pandas on Snowflake에는 다음과 같은 제한 사항이 있습니다.
pandas on Snowflake는 OSS 서드 파티 라이브러리와의 호환성을 보장하지 않습니다. 그러나 1.14.0a1 버전부터 Snowpark pandas에는 NumPy, 특히
np.where
사용에 대한 제한적 호환성이 적용됩니다. 자세한 내용은 NumPy 상호 운용성 섹션을 참조하십시오.서드 파티 라이브러리 APIs를 서드 파티 라이브러리 pandas DataFrame으로 호출하는 경우, 서드 파티 라이브러리 호출에 DataFrame을 전달하기 전에
to_pandas()
를 호출하여 Snowpark pandas DataFrame을 pandas DataFrame으로 변환할 것을 Snowflake는 권장합니다. 자세한 내용은 서드 파티 라이브러리와 함께 pandas on Snowflake 사용하기 섹션을 참조하십시오.pandas on Snowflake는 Snowpark ML 과 통합되지 않습니다. Snowpark ML을 사용하는 경우, Snowpark ML을 호출하기 전에 to_snowpark() 를 사용하여 Snowpark pandas DataFrame을 Snowpark DataFrame으로 변환하는 것이 좋습니다.
지연
MultiIndex
오브젝트는 지원되지 않습니다.MultiIndex
를 사용하면 모든 데이터를 클라이언트 측으로 가져와야 하는 네이티브 pandasMultiIndex
오브젝트를 반환합니다.모든 pandas APIs가 아직 pandas on Snowflake에서 분산 구현된 것은 아닙니다. 지원되지 않는 APIs의 경우,
NotImplementedError
가 throw됩니다. 분산 구현이 없는 작업은 저장 프로시저로 대체됩니다. 지원되는 APIs에 대한 자세한 내용은 API 참조 설명서를 참조하십시오.pandas on Snowflake에는 특정 pandas 버전이 필요합니다. pandas on Snowflake는 pandas 2.2.1이 필요하며, pandas 2.2.1과의 호환성만 제공합니다.
pandas on Snowflake는 pandas on Snowflake
apply()
함수 내에서 참조할 수 없습니다. 네이티브 pandas는apply()
내에서만 사용할 수 있습니다.
문제 해결하기¶
이 섹션에서는 pandas on Snowflake를 사용할 때 발생하는 문제 해결 팁을 설명합니다.
문제를 해결할 때 네이티브 pandas DataFrame(또는 샘플)에서 동일한 작업을 실행하여 동일한 오류가 지속되는지 확인합니다. 이러한 접근법은 사용자의 쿼리를 수정하는 방법에 대한 힌트를 제공할 수 있습니다. 예:
df = pd.DataFrame({"a": [1,2,3], "b": ["x", "y", "z"]}) # Running this in Snowpark pandas throws an error df["A"].sum() # Convert a small sample of 10 rows to pandas DataFrame for testing pandas_df = df.head(10).to_pandas() # Run the same operation. KeyError indicates that the column reference is incorrect pandas_df["A"].sum() # Fix the column reference to get the Snowpark pandas query working df["a"].sum()
장기 실행 중인 노트북이 열려 있는 경우, 기본적으로 세션이 240분(4시간) 동안 유휴 상태가 되면 Snowflake 세션 시간 제한이 설정됩니다. 세션이 만료되면 추가 pandas on Snowflake 쿼리를 실행하면 “인증 토큰이 만료되었습니다. 사용자가 다시 인증해야 합니다.” 오류가 발생합니다. 이 시점에 Snowflake에 대한 연결을 다시 설정해야 합니다. 이로 인해 보존되지 않은 세션 변수가 손실될 수 있습니다. 세션 유휴 시간 제한 매개 변수를 구성하는 방법에 대한 자세한 내용은 세션 정책 섹션을 참조하십시오.
모범 사례¶
이 섹션에서는 pandas on Snowflake를 사용할 때 따라야 할 모범 사례에 대해 설명합니다.
for
루프,iterrows
,iteritems
등 반복적 코드 패턴을 사용하지 마십시오. 반복적인 코드 패턴은 생성되는 쿼리 복잡성을 빠르게 증가시킵니다. 데이터 배포 및 계산 병렬화는 클라이언트 코드가 아닌 pandas on Snowflake가 수행하도록 합니다. 반복적인 코드 패턴의 경우 전체 DataFrame에서 수행할 수 있는 연산을 찾아서 해당 작업을 대신 사용합니다.
for i in np.arange(0, 50):
if i % 2 == 0:
data = pd.concat([data, pd.DataFrame({'A': i, 'B': i + 1}, index=[0])], ignore_index=True)
else:
data = pd.concat([data, pd.DataFrame({'A': i}, index=[0])], ignore_index=True)
# Instead of creating one DataFrame per row and concatenating them,
# try to directly create the DataFrame out of the data, like this:
data = pd.DataFrame(
{
"A": range(0, 50),
"B": [i + 1 if i % 2 == 0 else None for i in range(50)],
},
)
apply
,applymap
및transform
을 호출하지 마십시오. 그러면 UDFs 또는 UDTFs 가 구현되며, 이는 일반 SQL 쿼리에 비해 성능이 떨어질 수 있습니다. 적용된 함수에 동등한 DataFrame 또는 계열 작업이 있는 경우 해당 작업을 대신 사용합니다. 예를 들어,df.groupby('col1').apply('sum')
이 아닌df.groupby('col1').sum()
을 직접 호출하십시오.서드 파티 라이브러리 호출에 DataFrame 또는 계열을 전달하기 전에
to_pandas()
를 호출합니다. pandas on Snowflake는 서드 파티 라이브러리와의 호환성을 보장하지 않습니다.추가 I/O 오버헤드를 피하려면 구체화된 일반 Snowflake 테이블을 사용합니다. pandas on Snowflake는 일반 테이블에서만 작동하는 데이터 스냅샷을 기반으로 작동합니다. 외부 테이블, 뷰 및 Apache Iceberg™ 테이블을 포함한 다른 유형의 경우 스냅샷을 생성하기 전에 임시 테이블이 생성되므로 구체화 오버헤드가 추가로 발생합니다.
pandas on Snowflake는 빠른 제로 카피 복제 기능을 제공하는 동시에
read_snowflake
를 사용하여 Snowflake 테이블에서 DataFrames을 생성합니다. 그러나 스냅샷 기능은 일반 데이터베이스의 일반 Snowflake 테이블에만 제공됩니다. 하이브리드, Iceberg 등의 유형이 포함된 테이블이나 공유 데이터베이스의 테이블을 로드하는 경우 일반 Snowflake 테이블에 대한 추가적인 구체화가 도입됩니다. 스냅샷은 데이터 일관성 및 주문 보장을 위해 필요하며, 현재 추가 구체화를 해결할 수 있는 다른 방법이 없으므로 pandas on Snowflake를 사용할 때는 가급적 일반 Snowflake 테이블을 사용하시기 바랍니다.다른 작업을 진행하기 전에 결과 타입을 다시 확인하고 필요한 경우
astype
을 사용하여 명시적 타입 형변환을 수행합니다.유형 추론 기능이 제한되어 있기 때문에 유형 힌트가 제공되지 않으면 결과에 모든 정수 값이 포함되어 있어도
df.apply
는 오브젝트(베리언트) 유형의 결과를 반환합니다. 다른 작업에서dtype
가int
여야 하는 경우 계속하기 전에astype
메서드를 호출하여 열 타입을 수정하여 명시적 타입 형변환을 수행할 수 있습니다.평가와 구체화가 필요한 APIs의 경우 필요하지 않으면 호출하지 마십시오.
Series
또는Dataframe
을 반환하지 않는 APIs가 올바른 유형으로 결과를 도출하려면 즉시 평가와 구체화가 필요합니다. 플로팅 방법에도 마찬가지입니다. 불필요한 평가와 구체화를 최소화하기 위해 해당 APIs에 대한 호출을 줄이십시오.대규모 데이터 세트에서는
np.where(<cond>, <스칼라>, n)
를 호출하지 마십시오.<스칼라>
가 DataFrame에<cond>
의 크기로 브로드캐스트되어 속도가 느려질 수 있습니다.반복적으로 구축된 쿼리로 작업할 때
df.cache_result
를 사용하여 중간 결과를 구체화함으로써 반복되는 평가를 줄이고 전체 쿼리의 지연 시간을 개선하고 복잡성을 줄일 수 있습니다. 예:df = pd.read_snowflake('pandas_test') df2 = pd.pivot_table(df, index='index_col', columns='pivot_col') # expensive operation df3 = df.merge(df2) df4 = df3.where(df2 == True)
위의 예제에서
df2
를 생성하는 쿼리는 계산 비용이 많이 들며,df3
및df4
의 생성에 모두 재사용됩니다.df2
를 임시 테이블로 구체화하면(df2
와 관련된 후속 작업을 피벗이 아닌 테이블 스캔으로 만들면) 코드 블록의 전체 대기 시간을 줄일 수 있습니다.df = pd.read_snowflake('pandas_test') df2 = pd.pivot_table(df, index='index_col', columns='pivot_col') # expensive operation df2.cache_result(inplace=True) df3 = df.merge(df2) df4 = df3.where(df2 == True)
예¶
다음은 pandas 작업이 포함된 코드 예제입니다. 세 개의 열(COL_STR
, COL_FLOAT
및 COL_INT
)을 포함하는 pandas_test
라는 이름의 Snowpark pandas DataFrame으로 시작합니다. 이 예제와 관련된 노트북을 보려면 Snowflake-Labs 리포지토리의 pandas on Snowflake 예제 를 참조하십시오.
import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session
CONNECTION_PARAMETERS = {
'account': '<myaccount>',
'user': '<myuser>',
'password': '<mypassword>',
'role': '<myrole>',
'database': '<mydatabase>',
'schema': '<myschema>',
'warehouse': '<mywarehouse>',
}
session = Session.builder.configs(CONNECTION_PARAMETERS).create()
df = pd.DataFrame([['a', 2.1, 1],['b', 4.2, 2],['c', 6.3, None]], columns=["COL_STR", "COL_FLOAT", "COL_INT"])
df
COL_STR COL_FLOAT COL_INT
0 a 2.1 1.0
1 b 4.2 2.0
2 c 6.3 NaN
DataFrame을 이름이 pandas_test
인 Snowflake 테이블로 저장하며, 이 테이블은 예제 전체에서 사용할 것입니다.
df.to_snowflake("pandas_test", if_exists='replace',index=False)
다음으로, Snowflake 테이블에서 DataFrame을 생성합니다. 열 COL_INT
를 삭제한 다음 결과를 row_position
열을 사용하여 Snowflake에 다시 저장합니다.
# Create a DataFrame out of a Snowflake table.
df = pd.read_snowflake('pandas_test')
df.shape
(3, 3)
df.head(2)
COL_STR COL_FLOAT COL_INT
0 a 2.1 1
1 b 4.2 2
df.dropna(subset=["COL_FLOAT"], inplace=True)
df
COL_STR COL_FLOAT COL_INT
0 a 2.1 1
1 c 6.3 2
df.shape
(2, 3)
df.dtypes
COL_STR object
COL_FLOAT float64
COL_INT int64
dtype: object
# Save the result back to Snowflake with a row_pos column.
df.reset_index(drop=True).to_snowflake('pandas_test2', if_exists='replace', index=True, index_label=['row_pos'])
이 작업을 통해 새로운 테이블 pandas_test2
가 생성되고, 이 테이블은 다음과 같습니다.
row_pos COL_STR COL_FLOAT COL_INT
0 1 a 2.0 1
1 2 b 4.0 2
IO(읽기 및 쓰기)¶
# Reading and writing to Snowflake
df = pd.DataFrame({"fruit": ["apple", "orange"], "size": [3.4, 5.4], "weight": [1.4, 3.2]})
df.to_snowflake("test_table", if_exists="replace", index=False )
df_table = pd.read_snowflake("test_table")
# Generate sample CSV file
with open("data.csv", "w") as f:
f.write('fruit,size,weight\napple,3.4,1.4\norange,5.4,3.2')
# Read from local CSV file
df_csv = pd.read_csv("data.csv")
# Generate sample JSON file
with open("data.json", "w") as f:
f.write('{"fruit":"apple", "size":3.4, "weight":1.4},{"fruit":"orange", "size":5.4, "weight":3.2}')
# Read from local JSON file
df_json = pd.read_json('data.json')
# Upload data.json and data.csv to Snowflake stage named @TEST_STAGE
# Read CSV and JSON file from stage
df_csv = pd.read_csv('@TEST_STAGE/data.csv')
df_json = pd.read_json('@TEST_STAGE/data.json')
자세한 내용은 입력/출력 섹션을 참조하십시오.
인덱싱¶
df = pd.DataFrame({"a": [1,2,3], "b": ["x", "y", "z"]})
df.columns
Index(['a', 'b'], dtype='object')
df.index
Index([0, 1, 2], dtype='int8')
df["a"]
0 1
1 2
2 3
Name: a, dtype: int8
df["b"]
0 x
1 y
2 z
Name: b, dtype: object
df.iloc[0,1]
'x'
df.loc[df["a"] > 2]
a b
2 3 z
df.columns = ["c", "d"]
df
c d
0 1 x
1 2 y
2 3 z
df = df.set_index("c")
df
d
c
1 x
2 y
3 z
df.rename(columns={"d": "renamed"})
renamed
c
1 x
2 y
3 z
누락된 값¶
import numpy as np
df = pd.DataFrame([[np.nan, 2, np.nan, 0],
[3, 4, np.nan, 1],
[np.nan, np.nan, np.nan, np.nan],
[np.nan, 3, np.nan, 4]],
columns=list("ABCD"))
df
A B C D
0 NaN 2.0 NaN 0.0
1 3.0 4.0 NaN 1.0
2 NaN NaN NaN NaN
3 NaN 3.0 NaN 4.0
df.isna()
A B C D
0 True False True False
1 False False True False
2 True True True True
3 True False True False
df.fillna(0)
A B C D
0 0.0 2.0 0.0 0.0
1 3.0 4.0 0.0 1.0
2 0.0 0.0 0.0 0.0
3 0.0 3.0 0.0 4.0
df.dropna(how="all")
A B C D
0 NaN 2.0 NaN 0.0
1 3.0 4.0 NaN 1.0
3 NaN 3.0 NaN 4.0
형식 변환¶
df = pd.DataFrame({"int": [1,2,3], "str": ["4", "5", "6"]})
df
int str
0 1 4
1 2 5
2 3 6
df_float = df.astype(float)
df_float
int str
0 1.0 4.0
1 2.0 5.0
2 3.0 6.0
df_float.dtypes
int float64
str float64
dtype: object
pd.to_numeric(df.str)
0 4.0
1 5.0
2 6.0
Name: str, dtype: float64
df = pd.DataFrame({'year': [2015, 2016],
'month': [2, 3],
'day': [4, 5]})
pd.to_datetime(df)
0 2015-02-04
1 2016-03-05
dtype: datetime64[ns]
이진 연산¶
df_1 = pd.DataFrame([[1,2,3],[4,5,6]])
df_2 = pd.DataFrame([[6,7,8]])
df_1.add(df_2)
0 1 2
0 7.0 9.0 11.0
1 NaN NaN NaN
s1 = pd.Series([1, 2, 3])
s2 = pd.Series([2, 2, 2])
s1 + s2
0 3
1 4
2 5
dtype: int64
df = pd.DataFrame({"A": [1,2,3], "B": [4,5,6]})
df["A+B"] = df["A"] + df["B"]
df
A B A+B
0 1 4 5
1 2 5 7
2 3 6 9
집계¶
df = pd.DataFrame([[1, 2, 3],
[4, 5, 6],
[7, 8, 9],
[np.nan, np.nan, np.nan]],
columns=['A', 'B', 'C'])
df.agg(['sum', 'min'])
A B C
sum 12.0 15.0 18.0
min 1.0 2.0 3.0
df.median()
A 4.0
B 5.0
C 6.0
dtype: float64
병합¶
df1 = pd.DataFrame({'lkey': ['foo', 'bar', 'baz', 'foo'],
'value': [1, 2, 3, 5]})
df1
lkey value
0 foo 1
1 bar 2
2 baz 3
3 foo 5
df2 = pd.DataFrame({'rkey': ['foo', 'bar', 'baz', 'foo'],
'value': [5, 6, 7, 8]})
df2
rkey value
0 foo 5
1 bar 6
2 baz 7
3 foo 8
df1.merge(df2, left_on='lkey', right_on='rkey')
lkey value_x rkey value_y
0 foo 1 foo 5
1 foo 1 foo 8
2 bar 2 bar 6
3 baz 3 baz 7
4 foo 5 foo 5
5 foo 5 foo 8
df = pd.DataFrame({'key': ['K0', 'K1', 'K2', 'K3', 'K4', 'K5'],
'A': ['A0', 'A1', 'A2', 'A3', 'A4', 'A5']})
df
key A
0 K0 A0
1 K1 A1
2 K2 A2
3 K3 A3
4 K4 A4
5 K5 A5
other = pd.DataFrame({'key': ['K0', 'K1', 'K2'],
'B': ['B0', 'B1', 'B2']})
df.join(other, lsuffix='_caller', rsuffix='_other')
key_caller A key_other B
0 K0 A0 K0 B0
1 K1 A1 K1 B1
2 K2 A2 K2 B2
3 K3 A3 None None
4 K4 A4 None None
5 K5 A5 None None
Groupby¶
df = pd.DataFrame({'Animal': ['Falcon', 'Falcon','Parrot', 'Parrot'],
'Max Speed': [380., 370., 24., 26.]})
df
Animal Max Speed
0 Falcon 380.0
1 Falcon 370.0
2 Parrot 24.0
3 Parrot 26.0
df.groupby(['Animal']).mean()
Max Speed
Animal
Falcon 375.0
Parrot 25.0
자세한 내용은 GroupBy 섹션을 참조하십시오.
피벗¶
df = pd.DataFrame({"A": ["foo", "foo", "foo", "foo", "foo",
"bar", "bar", "bar", "bar"],
"B": ["one", "one", "one", "two", "two",
"one", "one", "two", "two"],
"C": ["small", "large", "large", "small",
"small", "large", "small", "small",
"large"],
"D": [1, 2, 2, 3, 3, 4, 5, 6, 7],
"E": [2, 4, 5, 5, 6, 6, 8, 9, 9]})
df
A B C D E
0 foo one small 1 2
1 foo one large 2 4
2 foo one large 2 5
3 foo two small 3 5
4 foo two small 3 6
5 bar one large 4 6
6 bar one small 5 8
7 bar two small 6 9
8 bar two large 7 9
pd.pivot_table(df, values='D', index=['A', 'B'],
columns=['C'], aggfunc="sum")
C large small
A B
bar one 4.0 5
two 7.0 6
foo one 4.0 1
two NaN 6
df = pd.DataFrame({'foo': ['one', 'one', 'one', 'two', 'two', 'two'],
'bar': ['A', 'B', 'C', 'A', 'B', 'C'],
'baz': [1, 2, 3, 4, 5, 6],
'zoo': ['x', 'y', 'z', 'q', 'w', 't']})
df
foo bar baz zoo
0 one A 1 x
1 one B 2 y
2 one C 3 z
3 two A 4 q
4 two B 5 w
5 two C 6 t