Snowpark Connect for Spark 호환성 가이드¶
이 가이드에서는 Spark DataFrame APIs의 Snowpark Connect for Spark 구현과 네이티브 Apache Spark 간의 호환성에 대해 설명합니다. 이 가이드는 사용자가 Spark 워크로드를 |spconnect|로 이동할 때의 키 차이점과 지원되지 않는 기능, 마이그레이션 고려 사항을 이해하는 데 도움을 주기 위해 작성되었습니다.
|spconnect|는 Snowflake 실행 엔진을 기반으로 익숙한 Spark DataFrame API 환경을 제공하는 데 목표를 두고 있습니다. 그러나 이 항목에서 설명하는 호환성 차이 문제가 있습니다. 이 가이드에서는 마이그레이션을 계획하고 조정하는 데 도움이 되는 이러한 차이점을 중점적으로 설명합니다. 관련 문제는 향후 릴리스에서 해결될 수 있습니다.
DataTypes¶
지원되지 않는 데이터 타입¶
암시적 데이터 타입 변환¶
Snowpark Connect for Spark 사용 시 데이터 타입이 처리되는 방식에 유의하세요. |spconnect|는 ByteType
, ShortType
, IntegerType`을 :code:`LongType`으로 암시적으로 표현합니다. 즉, 열이나 데이터를 :code:`ByteType
, ShortType
또는 IntegerType`으로 정의할 수는 있지만 이 데이터는 |spconnect|에서 :code:`LongType`으로 표현되고 반환됩니다. 마찬가지로, 특정 작업과 컨텍스트에 따라 :code:`FloatType
및 DoubleType`에 대해서도 암시적 변환이 발생할 수 있습니다. Snowflake 실행 엔진은 데이터 타입 압축을 내부적으로 처리하며 실제로 데이터를 :code:`Byte
또는 :code:`Short`으로 저장할 수 있지만, 이는 구현 세부 정보로 간주되고 최종 사용자에게 공개되지 않습니다.
이 표현은 의미상 Spark 쿼리의 정확성에 영향을 주지 않습니다.
네이티브 PySpark의 데이터 타입 |
|spconnect|의 데이터 타입 |
---|---|
|
|
|
|
|
|
|
|
다음 예제에서는 Spark와 |spconnect|가 쿼리 결과에서 데이터 타입을 처리하는 방법의 차이점을 설명합니다.
쿼리¶
query = """
SELECT * FROM VALUES
(float(1.0), double(1.0), 1.0, "1", true, :code:`NULL`),
(float(2.0), double(2.0), 2.0, "2", false, :code:`NULL`),
(float(3.0), double(3.0), :code:`NULL`, "3", false, :code:`NULL`)
AS tab(a, b, c, d, e, f)
"""
Spark¶
spark.sql(query).printSchema()
root
|-- a: float (nullable = false)
|-- b: double (nullable = false)
|-- c: decimal(2,1) (nullable = true)
|-- d: string (nullable = false)
|-- e: boolean (nullable = false)
|-- f: void (nullable = true)
Snowpark Connect for Spark¶
snowpark_connect_spark.sql(query).printSchema()
root
|-- a: double (nullable = false)
|-- b: double (nullable = false)
|-- c: decimal (nullable = true)
|-- d: string (nullable = false)
|-- e: boolean (nullable = true)
|-- f: string (nullable = true)
:code:`NullType`의 미묘한 차이¶
|spconnect|는 Spark에서 지원되는 `NullType<https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.NullType.html>`_ 데이터 타입을 지원하지 않습니다. 이로 인해 데이터 프레임에서 Null
또는 :code:`None`을 사용할 때 동작이 변경됩니다.
Spark에서 NULL
리터럴(예: lit(None)
)은 :code:`NullType`으로 자동 추론됩니다. |spconnect|에서는 스키마 추론 중 :code:`StringType`으로 추론됩니다.
df = self.spark.range(1).select(lit(None).alias("null_col"))
field = df.schema["null_col"]
# Spark: StructField('null_col', :code:`NullType`(), True)
# |spconnect|: StructField('null_col', :code:`StringType`(), True)
ArrayType
, MapType
, :code:`ObjectType`의 정형화 데이터 타입¶
|spconnect|에서는 기본적으로 정형화 타입 지원을 사용할 수 없지만, ARRAY
, MAP
, Object
데이터 타입은 유형이 지정되지 않은 일반 컬렉션으로 처리됩니다. 즉, 정형화 타입 지원에서 제공하는 것과 달리 요소 유형, 필드 이름, 스키마 또는 null 허용 여부가 적용되지 않습니다.
이 지원에 대한 종속성이 있는 경우 계정 팀과 협업하여 계정에서 이 기능을 활성화하세요.
지원되지 않는 Spark APIs¶
다음은 클래식 Spark 및 Spark Connect에서는 지원되지만 |spconnect|에서는 지원되지 않는 APIs입니다.
`Dataframe.hint<https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.hint.html>`_: |spconnect|는 데이터 프레임에 설정된 모든 힌트를 무시합니다. Snowflake 쿼리 최적화 프로그램은 가장 효율적인 실행 전략을 자동으로 결정합니다.
`DataFrame.repartition<https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.repartition.html>`_: 이는 |spconnect|에서 아무런 작업도 수행하지 않습니다. Snowflake는 분산 컴퓨팅 인프라 전반에 걸쳐 데이터 배포/분할을 자동으로 관리합니다.
`pyspark.RDD<https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html>`_: RDD APIs는 Spark Connect(Snowpark Connect for Spark 포함)에서 지원되지 않습니다.
`pyspark.ml<https://spark.apache.org/docs/latest/api/python/reference/pyspark.ml.html>`_
`pyspark 스트리밍<https://spark.apache.org/docs/latest/streaming-programming-guide.html>`_
UDF의 차이점¶
:code:`StructType`의 차이점¶
Spark는 사용자 정의 함수(UDF)에 사용할 StructType`을 변환할 때 Python의 :code:`tuple
유형으로 변환합니다. |spconnect|는 StructType`을 Python의 :code:`dict
유형으로 변환합니다. 이는 요소 액세스 및 출력에 근본적인 차이점이 있습니다.
Spark는 0, 1, 2, 3 등을 사용하여 인덱스에 액세스합니다.
|spconnect|는 ‘_1’, ‘_2’ 등을 사용하여 인덱스에 액세스합니다.
def f(e):
return e[0]
df = self.spark.createDataFrame([((1.0, 1.0), (1, 1))], ["c1", "c2"])
result = df.select("*", udf(f, DoubleType())("c1"))
# This results in an index access issue. Workaround is to use _1, _2 as indicies.
# Workaround:
def f(e):
return e['_1']
row = (
self.spark.range(1)
.selectExpr("struct(1, 2) as struct")
.select(
udf(lambda x: x, "struct<col1:int,col2:int>")("struct"),
)
.first()
)
self.assertEquals(row[0], Row(col1=1, col2=2))
# Spark: Row(col1=1, col2=2)
# |spconnect|: {'col1': 1, 'col2': 2}
UDFs의 반복자 유형¶
반복자는 반환 유형 또는 입력 유형으로 지원되지 않습니다.
# This will not work
def func(iterator):
for _ in iterator:
...
df = self.spark.range(10)
actual = df.repartition(1).mapInArrow(func, "a long").collect()
Lambda 함수 제한 사항¶
|spconnect|는 Lambda 식 및 고차 함수(예: transform
함수)를 지원하지만, Lambda 본문 내에서 외부 열 또는 표현식 참조를 지원하지는 않습니다.
이 제한 사항은 :ref:`Snowflake의 Lambda 식에 대한 제한 사항<label-query_semistructured_data_limitations>`으로 인해 발생합니다.
data = [
(1, [1, 2, 3]),
(2, [4, 5]),
(3, [])
]
df = spark.createDataFrame(data, ["id", "numbers"])
df.select(transform(df.numbers, lambda el: el + array_size(df.numbers))).show() # this fails in Spark Connect for Snowpark, because of a
reference to df.numbers which is outside of the lambda context
df.select(transform(df.numbers, lambda el: negative(el) + 1)).show() # this works
또 다른 제한 사항은 user-dDefined 함수(UDFs)가 Lambda 식 내에서 지원되지 않는다는 점입니다. 여기에는 사용자 지정 UDFs 및 기본 구현 시 Snowflake UDFs를 사용하는 특정 기본 제공 함수가 모두 포함됩니다. Lambda 식 내부에서 UDF를 사용하려고 시도하면 오류가 발생합니다.
spark.range(1).select(split(lit('a,b,c'), ',')).show() # works
spark.range(1).select(transform(array(lit('a,b,c')), lambda x: split(x, ','))).show() # does not work, since `split` is implemented with UDF
데이터 소스¶
데이터 소스 |
PySpark와 비교한 호환성 문제 |
---|---|
Avro |
파일 유형이 지원되지 않습니다. |
CSV |
|
JSON |
:code:`Show`의 차이점: 필드 값이 문자열인 경우 따옴표로 묶입니다. 결과에 추가 “n” 문자가 표시됩니다. |
Orc |
파일 유형이 지원되지 않습니다. |
Parquet |
(ALL) 구성이 지원되지 않습니다. |
텍스트 |
|
XML |
파일 유형이 지원되지 않습니다. |
Snowflake 테이블 |
테이블에 쓰기에는 공급자 형식이 필요하지 않습니다. 버킷팅 및 분할이 지원되지 않습니다. 저장소 형식 및 버전 관리가 지원되지 않습니다. |
카탈로그¶
Snowflake Horizon Catalog 공급자 지원¶
Snowflake만 카탈로그 공급자로 지원됩니다.
APIs 카탈로그는 지원되지 않습니다.¶
registerFunction
listFunctions
getFunction
functionExists
createExternalTable
APIs 카탈로그는 일부 지원됩니다.¶
:code:`createTable`(외부 테이블 미지원)
Iceberg¶
Snowflake 관리형 Iceberg 테이블¶
Spark용 Snowpark Connect는 외부 관리형 Apache Iceberg™ 테이블(Iceberg 테이블 및 카탈로그 연결 데이터베이스 포함)과 함께 작동합니다.
읽기¶
과거 스냅샷, 분기 및 증분 읽기 등의 Time Travel은 지원되지 않습니다.
쓰기¶
Spark SQL을 사용해 테이블을 만드는 기능은 지원되지 않습니다.
스키마 병합은 지원되지 않습니다.
테이블을 만들려면 다음을 수행해야 합니다.
외부 볼륨을 만듭니다.
다음 방법 중 하나로 외부 볼륨 요구 사항을 테이블 생성에 연결합니다.
EXTERNAL_VOLUME을 데이터베이스로 설정합니다.
:code:`snowpark.connect.iceberg.external_volume`을 Spark 구성으로 설정합니다.
외부 관리형 Iceberg 테이블¶
읽기¶
Snowflake 비관리형 테이블 엔터티를 만들어야 합니다.
과거 스냅샷, 분기 및 증분 읽기 등의 Time Travel은 지원되지 않습니다.
쓰기¶
테이블 생성은 지원되지 않습니다.
기존 Iceberg 테이블에 대한 쓰기가 지원됩니다.
중복 열 이름¶
Snowflake는 중복 열 이름을 지원하지 않습니다.
다음 코드는 duplicate column name 'foo'
같은 SQL 컴파일 오류가 발생하며 쓰기 생성에서 실패합니다.
df = spark.createDataFrame([
(1, 1),
(2, 2)
], ["foo", "foo"])
df.show() # works
df.createTempView("df_view") # Fails with SQL compilation error: duplicate column name 'foo'
이 문제를 해결하려면 다음 값 중 하나로 snowpark.connect.views.duplicate_column_names_handling_mode
구성 옵션을 설정하세요.
rename
: 첫 번째 열 이름 뒤에 있는 모든 중복 열 이름에_dedup_1
,_dedup_2
등의 접미사가 추가됩니다.drop
: 한 가지를 제외한 모든 중복 열이 삭제됩니다. 열의 값이 다르면 잘못된 결과가 발생할 수 있습니다.