Snowpark Connect for Spark 호환성 가이드¶
이 가이드에서는 Spark DataFrame APIs의 Snowpark Connect for Spark 구현과 네이티브 Apache Spark 간의 호환성에 대해 설명합니다. 이 가이드는 사용자가 Spark 워크로드를 Snowpark Connect for Spark 로 이동할 때의 키 차이점과 지원되지 않는 기능, 마이그레이션 고려 사항을 이해하는 데 도움을 주기 위해 작성되었습니다.
Snowpark Connect for Spark 는 Snowflake 실행 엔진을 기반으로 익숙한 Spark DataFrame API 환경을 제공하는 데 목표를 두고 있습니다. 그러나 이 항목에서 설명하는 호환성 차이 문제가 있습니다. 이 가이드에서는 마이그레이션을 계획하고 조정하는 데 도움이 되는 이러한 차이점을 중점적으로 설명합니다. 관련 문제는 향후 릴리스에서 해결될 수 있습니다.
DataTypes¶
지원되지 않는 데이터 타입¶
암시적 데이터 타입 변환¶
Snowpark Connect for Spark 사용 시 데이터 타입이 처리되는 방식에 유의하세요. Snowpark Connect for Spark 는 ByteType, ShortType, IntegerType`을 :code:`LongType`으로 암시적으로 표현합니다. 즉, 열이나 데이터를 :code:`ByteType, ShortType 또는 IntegerType`으로 정의할 수는 있지만 이 데이터는 |spconnect| 에서 :code:`LongType`으로 표현되고 반환됩니다. 마찬가지로, 특정 작업과 컨텍스트에 따라 :code:`FloatType 및 DoubleType`에 대해서도 암시적 변환이 발생할 수 있습니다. Snowflake 실행 엔진은 데이터 타입 압축을 내부적으로 처리하며 실제로 데이터를 :code:`Byte 또는 :code:`Short`으로 저장할 수 있지만, 이는 구현 세부 정보로 간주되고 최종 사용자에게 공개되지 않습니다.
이 표현은 의미상 Spark 쿼리의 정확성에 영향을 주지 않습니다.
네이티브 PySpark의 데이터 타입 |
Snowpark Connect for Spark 의 데이터 타입 |
|---|---|
|
|
|
|
|
|
|
|
다음 예제에서는 Spark와 Snowpark Connect for Spark 가 쿼리 결과에서 데이터 타입을 처리하는 방법의 차이점을 설명합니다.
쿼리¶
query = """
SELECT * FROM VALUES
(float(1.0), double(1.0), 1.0, "1", true, :code:`NULL`),
(float(2.0), double(2.0), 2.0, "2", false, :code:`NULL`),
(float(3.0), double(3.0), :code:`NULL`, "3", false, :code:`NULL`)
AS tab(a, b, c, d, e, f)
"""
Spark¶
spark.sql(query).printSchema()
root
|-- a: float (nullable = false)
|-- b: double (nullable = false)
|-- c: decimal(2,1) (nullable = true)
|-- d: string (nullable = false)
|-- e: boolean (nullable = false)
|-- f: void (nullable = true)
Snowpark Connect for Spark¶
snowpark_connect_spark.sql(query).printSchema()
root
|-- a: double (nullable = false)
|-- b: double (nullable = false)
|-- c: decimal (nullable = true)
|-- d: string (nullable = false)
|-- e: boolean (nullable = true)
|-- f: string (nullable = true)
:code:`NullType`의 미묘한 차이¶
Snowpark Connect for Spark 는 Spark에서 지원되는 `NullType<https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.NullType.html>`_ 데이터 타입을 지원하지 않습니다. 이로 인해 데이터 프레임에서 Null 또는 :code:`None`을 사용할 때 동작이 변경됩니다.
Spark에서 NULL 리터럴(예: lit(None))은 :code:`NullType`으로 자동 추론됩니다. Snowpark Connect for Spark 에서는 스키마 추론 중 :code:`StringType`으로 추론됩니다.
df = self.spark.range(1).select(lit(None).alias("null_col"))
field = df.schema["null_col"]
# Spark: StructField('null_col', :code:`NullType`(), True)
# Snowpark Connect for Spark: StructField('null_col', :code:`StringType`(), True)
ArrayType, MapType, :code:`ObjectType`의 정형화 데이터 타입¶
Snowpark Connect for Spark 에서는 기본적으로 정형화 타입 지원을 사용할 수 없지만, ARRAY, MAP, Object 데이터 타입은 유형이 지정되지 않은 일반 컬렉션으로 처리됩니다. 즉, 정형화 타입 지원에서 제공하는 것과 달리 요소 유형, 필드 이름, 스키마 또는 null 허용 여부가 적용되지 않습니다.
이 지원에 대한 종속성이 있는 경우 계정 팀과 협업하여 계정에서 이 기능을 활성화하세요.
지원되지 않는 Spark APIs¶
다음은 클래식 Spark 및 Spark Connect에서는 지원되지만 Snowpark Connect for Spark 에서는 지원되지 않는 APIs입니다.
`Dataframe.hint<https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.hint.html>`_: Snowpark Connect for Spark 는 데이터 프레임에 설정된 모든 힌트를 무시합니다. Snowflake 쿼리 최적화 프로그램은 가장 효율적인 실행 전략을 자동으로 결정합니다.
`DataFrame.repartition<https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.repartition.html>`_: 이는 Snowpark Connect for Spark 에서 아무런 작업도 수행하지 않습니다. Snowflake는 분산 컴퓨팅 인프라 전반에 걸쳐 데이터 배포/분할을 자동으로 관리합니다.
`pyspark.RDD<https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html>`_: RDD APIs는 Spark Connect(Snowpark Connect for Spark 포함)에서 지원되지 않습니다.
`pyspark.ml<https://spark.apache.org/docs/latest/api/python/reference/pyspark.ml.html>`_
`pyspark 스트리밍<https://spark.apache.org/docs/latest/streaming-programming-guide.html>`_
UDF의 차이점¶
:code:`StructType`의 차이점¶
Spark는 사용자 정의 함수(UDF)에 사용할 StructType`을 변환할 때 Python의 :code:`tuple 유형으로 변환합니다. Snowpark Connect for Spark 는 StructType`을 Python의 :code:`dict 유형으로 변환합니다. 이는 요소 액세스 및 출력에 근본적인 차이점이 있습니다.
Spark는 0, 1, 2, 3 등을 사용하여 인덱스에 액세스합니다.
Snowpark Connect for Spark 는 ‘_1’, ‘_2’ 등을 사용하여 인덱스에 액세스합니다.
def f(e):
return e[0]
df = self.spark.createDataFrame([((1.0, 1.0), (1, 1))], ["c1", "c2"])
result = df.select("*", udf(f, DoubleType())("c1"))
# This results in an index access issue. Workaround is to use _1, _2 as indicies.
# Workaround:
def f(e):
return e['_1']
row = (
self.spark.range(1)
.selectExpr("struct(1, 2) as struct")
.select(
udf(lambda x: x, "struct<col1:int,col2:int>")("struct"),
)
.first()
)
self.assertEquals(row[0], Row(col1=1, col2=2))
# Spark: Row(col1=1, col2=2)
# Snowpark Connect for Spark: {'col1': 1, 'col2': 2}
UDFs의 반복자 유형¶
반복자는 반환 유형 또는 입력 유형으로 지원되지 않습니다.
# This will not work
def func(iterator):
for _ in iterator:
...
df = self.spark.range(10)
actual = df.repartition(1).mapInArrow(func, "a long").collect()
Python UDF로 파일 가져오기¶
Snowpark Connect for Spark 를 사용하면 Python UDFs에서 외부 라이브러리 및 파일을 지정할 수 있습니다. Snowflake는 코드의 실행 컨텍스트에 Python 파일과 아카이브를 포함합니다. 추가 단계 없이 UDF에 포함된 이러한 파일에서 함수를 가져올 수 있습니다. 이 종속성 처리 동작은 :ref:`label-udf_python_stage`에 설명된 대로 작동합니다.
외부 라이브러리 및 파일을 포함하려면 파일에 대한 스테이지 경로를 구성 설정 :code:`snowpark.connect.udf.imports`의 값으로 제공합니다. 구성 값은 파일에 대한 스테이지 경로의 배열이어야 하며, 경로는 쉼표로 구분됩니다.
다음 예제의 코드에는 UDF의 실행 컨텍스트에 두 개의 파일이 포함되어 있습니다. UDF는 이러한 파일에서 함수를 가져와 논리에서 사용합니다.
# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.imports", "[@stage/library.py, @other_lib.zip]")
@udf(returnType = StringType())
def import_example(input: str) -> str:
from library import first_function
from other_lib.custom import second_function
return first_function(input) + second_function(input)
spark.range(1).select(import_read_example("example_string")).show()
snowpark.connect.udf.imports 설정을 사용하여 코드에서 읽어야 하는 데이터가 있는 파일과 같은 다른 종류의 파일도 포함할 수 있습니다. 이 작업을 수행할 때 코드는 포함된 파일에서만 읽어야 합니다. 함수 실행이 종료되면 이러한 파일에 대한 모든 쓰기가 손실됩니다.
# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.imports", "[@stage/data.csv]")
@udf(returnType = StringType())
def import_read_example(file_name: str) -> str:
with open(file_name) as f:
return f.read()
spark.range(1).select(import_read_example("data.csv")).show()
Lambda 함수 제한 사항¶
User-defined functions (UDFs) are not supported within lambda expressions. This includes both custom UDFs and certain built-in functions whose underlying implementation relies on Snowflake UDFs. Attempting to use a UDF inside a lambda expression will result in an error.
df = spark.createDataFrame([({"a": 123},)], ("data",))
df.select(map_filter("data", lambda _, v: bit_count(v) > 3)).show() # does not work, since `bit_count` is implemented with UDF
임시 뷰¶
기본적으로, Snowpark Connect for Spark 는 Snowflake에서 임시 뷰를 생성하지 않습니다. :code:`true`에 구성 매개 변수 :code:`snowpark.connect.temporary.views.create_in_snowflake`를 설정하여 임시 뷰를 생성하는 Snowpark Connect for Spark 를 지정합니다.
매개 변수가 :code:`false`로 설정된 경우, Snowpark Connect for Spark 는 Snowflake 뷰를 만들지 않고 뷰를 DataFrames로 저장합니다. 이는 Spark Connect 요청에서 생성된 뷰 정의 SQL이 Snowflake 뷰 크기 제한(95KB)을 초과하는 경우 발생할 수 있는 문제를 방지하는 데 도움이 됩니다.
임시 뷰는 일반적으로 Spark Connect Catalog API를 사용할 때 표시됩니다. 그러나 구성 :code:`snowpark.connect.sql.passthrough`가 :code:`true`로 설정된 SQL 문에서 호출되는 경우에는 액세스할 수 없습니다. Snowflake 임시 뷰를 생성하려면 구성 :code:`snowpark.connect.temporary.views.create_in_snowflake`를 :code:`true`로 설정합니다.
데이터 소스¶
데이터 소스 |
PySpark와 비교한 호환성 문제 |
|---|---|
Avro |
파일 유형이 지원되지 않습니다. |
CSV |
|
JSON |
:code:`Show`의 차이점: 필드 값이 문자열인 경우 따옴표로 묶입니다. 결과에 추가 “n” 문자가 표시됩니다. |
Orc |
파일 유형이 지원되지 않습니다. |
Parquet |
(ALL) 구성이 지원되지 않습니다. |
텍스트 |
|
XML |
파일 유형이 지원되지 않습니다. |
Snowflake 테이블 |
테이블에 쓰기에는 공급자 형식이 필요하지 않습니다. 버킷팅 및 분할이 지원되지 않습니다. 저장소 형식 및 버전 관리가 지원되지 않습니다. |
카탈로그¶
Snowflake Horizon Catalog 공급자 지원¶
Snowflake만 카탈로그 공급자로 지원됩니다.
APIs 카탈로그는 지원되지 않습니다.¶
registerFunctionlistFunctionsgetFunctionfunctionExistscreateExternalTable
APIs 카탈로그는 일부 지원됩니다.¶
:code:`createTable`(외부 테이블 미지원)
Iceberg¶
Snowflake 관리형 Iceberg 테이블¶
Spark용 Snowpark Connect는 외부 관리형 Apache Iceberg™ 테이블(Iceberg 테이블 및 카탈로그 연결 데이터베이스 포함)과 함께 작동합니다.
읽기¶
과거 스냅샷, 분기 및 증분 읽기 등의 Time Travel은 지원되지 않습니다.
쓰기¶
Spark SQL을 사용해 테이블을 만드는 기능은 지원되지 않습니다.
스키마 병합은 지원되지 않습니다.
테이블을 만들려면 다음을 수행해야 합니다.
외부 볼륨을 만듭니다.
다음 방법 중 하나로 외부 볼륨 요구 사항을 테이블 생성에 연결합니다.
EXTERNAL_VOLUME을 데이터베이스로 설정합니다.
:code:`snowpark.connect.iceberg.external_volume`을 Spark 구성으로 설정합니다.
외부 관리형 Iceberg 테이블¶
읽기¶
Snowflake 비관리형 테이블 엔터티를 만들어야 합니다.
과거 스냅샷, 분기 및 증분 읽기 등의 Time Travel은 지원되지 않습니다.
쓰기¶
테이블 생성은 지원되지 않습니다.
기존 Iceberg 테이블에 대한 쓰기가 지원됩니다.
중복 열 이름¶
Snowflake는 중복 열 이름을 지원하지 않습니다.
다음 코드는 duplicate column name 'foo' 같은 SQL 컴파일 오류가 발생하며 쓰기 생성에서 실패합니다.
df = spark.createDataFrame([
(1, 1),
(2, 2)
], ["foo", "foo"])
df.show() # works
df.createTempView("df_view") # Fails with SQL compilation error: duplicate column name 'foo'
이 문제를 해결하려면 다음 값 중 하나로 snowpark.connect.views.duplicate_column_names_handling_mode 구성 옵션을 설정하세요.
rename: 첫 번째 열 이름 뒤에 있는 모든 중복 열 이름에_dedup_1,_dedup_2등의 접미사가 추가됩니다.drop: 한 가지를 제외한 모든 중복 열이 삭제됩니다. 열의 값이 다르면 잘못된 결과가 발생할 수 있습니다.