Snowpark Connect for Spark 호환성 가이드

이 가이드에서는 Spark DataFrame APIs의 Snowpark Connect for Spark 구현과 네이티브 Apache Spark 간의 호환성에 대해 설명합니다. 이 가이드는 사용자가 Spark 워크로드를 |spconnect|로 이동할 때의 키 차이점과 지원되지 않는 기능, 마이그레이션 고려 사항을 이해하는 데 도움을 주기 위해 작성되었습니다.

|spconnect|는 Snowflake 실행 엔진을 기반으로 익숙한 Spark DataFrame API 환경을 제공하는 데 목표를 두고 있습니다. 그러나 이 항목에서 설명하는 호환성 차이 문제가 있습니다. 이 가이드에서는 마이그레이션을 계획하고 조정하는 데 도움이 되는 이러한 차이점을 중점적으로 설명합니다. 관련 문제는 향후 릴리스에서 해결될 수 있습니다.

DataTypes

지원되지 않는 데이터 타입

암시적 데이터 타입 변환

Snowpark Connect for Spark 사용 시 데이터 타입이 처리되는 방식에 유의하세요. |spconnect|는 ByteType, ShortType, IntegerType`을 :code:`LongType`으로 암시적으로 표현합니다. 즉, 열이나 데이터를 :code:`ByteType, ShortType 또는 IntegerType`으로 정의할 수는 있지만 데이터는 |spconnect|에서 :code:`LongType`으로 표현되고 반환됩니다.  마찬가지로, 특정 작업과 컨텍스트에 따라 :code:`FloatTypeDoubleType`에 대해서도 암시적 변환이 발생할 있습니다. Snowflake 실행 엔진은 데이터 타입 압축을 내부적으로 처리하며 실제로 데이터를 :code:`Byte 또는 :code:`Short`으로 저장할 수 있지만, 이는 구현 세부 정보로 간주되고 최종 사용자에게 공개되지 않습니다.

이 표현은 의미상 Spark 쿼리의 정확성에 영향을 주지 않습니다.

네이티브 PySpark의 데이터 타입

|spconnect|의 데이터 타입

ByteType

LongType

ShortType

LongType

IntegerType

LongType

LongType

LongType

다음 예제에서는 Spark와 |spconnect|가 쿼리 결과에서 데이터 타입을 처리하는 방법의 차이점을 설명합니다.

쿼리

query = """
    SELECT * FROM VALUES
    (float(1.0), double(1.0), 1.0, "1", true, :code:`NULL`),
    (float(2.0), double(2.0), 2.0, "2", false, :code:`NULL`),
    (float(3.0), double(3.0), :code:`NULL`, "3", false, :code:`NULL`)
    AS tab(a, b, c, d, e, f)
    """
Copy

Spark

spark.sql(query).printSchema()
Copy
root
 |-- a: float (nullable = false)
 |-- b: double (nullable = false)
 |-- c: decimal(2,1) (nullable = true)
 |-- d: string (nullable = false)
 |-- e: boolean (nullable = false)
 |-- f: void (nullable = true)

Snowpark Connect for Spark

snowpark_connect_spark.sql(query).printSchema()
Copy
root
 |-- a: double (nullable = false)
 |-- b: double (nullable = false)
 |-- c: decimal (nullable = true)
 |-- d: string (nullable = false)
 |-- e: boolean (nullable = true)
 |-- f: string (nullable = true)

:code:`NullType`의 미묘한 차이

|spconnect|는 Spark에서 지원되는 `NullType<https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.NullType.html>`_ 데이터 타입을 지원하지 않습니다. 이로 인해 데이터 프레임에서 Null 또는 :code:`None`을 사용할 때 동작이 변경됩니다.

Spark에서 NULL 리터럴(예: lit(None))은 :code:`NullType`으로 자동 추론됩니다. |spconnect|에서는 스키마 추론 중 :code:`StringType`으로 추론됩니다.

df = self.spark.range(1).select(lit(None).alias("null_col"))
field = df.schema["null_col"]

# Spark: StructField('null_col', :code:`NullType`(), True)
# |spconnect|: StructField('null_col', :code:`StringType`(), True)
Copy

ArrayType, MapType, :code:`ObjectType`의 정형화 데이터 타입

|spconnect|에서는 기본적으로 정형화 타입 지원을 사용할 수 없지만, ARRAY, MAP, Object 데이터 타입은 유형이 지정되지 않은 일반 컬렉션으로 처리됩니다. 즉, 정형화 타입 지원에서 제공하는 것과 달리 요소 유형, 필드 이름, 스키마 또는 null 허용 여부가 적용되지 않습니다.

이 지원에 대한 종속성이 있는 경우 계정 팀과 협업하여 계정에서 이 기능을 활성화하세요.

지원되지 않는 Spark APIs

다음은 클래식 Spark 및 Spark Connect에서는 지원되지만 |spconnect|에서는 지원되지 않는 APIs입니다.

UDF의 차이점

:code:`StructType`의 차이점

Spark는 사용자 정의 함수(UDF)에 사용할 StructType`을 변환할 Python의 :code:`tuple 유형으로 변환합니다. |spconnect|는 StructType`을 Python의 :code:`dict 유형으로 변환합니다. 이는 요소 액세스 및 출력에 근본적인 차이점이 있습니다.

  • Spark는 0, 1, 2, 3 등을 사용하여 인덱스에 액세스합니다.

  • |spconnect|는 ‘_1’, ‘_2’ 등을 사용하여 인덱스에 액세스합니다.

def f(e):
    return e[0]

    df = self.spark.createDataFrame([((1.0, 1.0), (1, 1))], ["c1", "c2"])
    result = df.select("*", udf(f, DoubleType())("c1"))

# This results in an index access issue. Workaround is to use _1, _2 as indicies.
# Workaround:

def f(e):
    return e['_1']

row = (
    self.spark.range(1)
    .selectExpr("struct(1, 2) as struct")
    .select(
        udf(lambda x: x, "struct<col1:int,col2:int>")("struct"),
    )
    .first()
)

self.assertEquals(row[0], Row(col1=1, col2=2))

# Spark: Row(col1=1, col2=2)

# |spconnect|: {'col1': 1, 'col2': 2}
Copy

UDFs의 반복자 유형

반복자는 반환 유형 또는 입력 유형으로 지원되지 않습니다.

# This will not work
def func(iterator):
  for _ in iterator:
              ...

df = self.spark.range(10)
actual = df.repartition(1).mapInArrow(func, "a long").collect()
Copy

Lambda 함수 제한 사항

|spconnect|는 Lambda 식 및 고차 함수(예: transform 함수)를 지원하지만, Lambda 본문 내에서 외부 열 또는 표현식 참조를 지원하지는 않습니다.

이 제한 사항은 :ref:`Snowflake의 Lambda 식에 대한 제한 사항<label-query_semistructured_data_limitations>`으로 인해 발생합니다.

data = [
    (1, [1, 2, 3]),
    (2, [4, 5]),
    (3, [])
]
df = spark.createDataFrame(data, ["id", "numbers"])

df.select(transform(df.numbers, lambda el: el + array_size(df.numbers))).show() # this fails in Spark Connect for Snowpark, because of a
reference to df.numbers which is outside of the lambda context

df.select(transform(df.numbers, lambda el: negative(el) + 1)).show() # this works
Copy

또 다른 제한 사항은 user-dDefined 함수(UDFs)가 Lambda 식 내에서 지원되지 않는다는 점입니다. 여기에는 사용자 지정 UDFs 및 기본 구현 시 Snowflake UDFs를 사용하는 특정 기본 제공 함수가 모두 포함됩니다. Lambda 식 내부에서 UDF를 사용하려고 시도하면 오류가 발생합니다.

spark.range(1).select(split(lit('a,b,c'), ',')).show() # works
spark.range(1).select(transform(array(lit('a,b,c')), lambda x: split(x, ','))).show() # does not work, since `split` is implemented with UDF
Copy

데이터 소스

데이터 소스

PySpark와 비교한 호환성 문제

Avro

파일 유형이 지원되지 않습니다.

CSV

Append, Ignore 저장 모드가 지원되지 않습니다.

encoding, quote, quoteAll, escape, escapeQuotes, comment, preferDate, enforceSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nanValue, positiveInf, negativeInf, timestampNTZFormat, enableDateTimeParsingFallback, maxColumns, maxCharsPerColumn, mode, columnNameOfCorruptRecord, charToEscapeQuoteEscaping, samplingRatio, emptyValue, locale, lineSep, unescapedQuoteHandling, compression 옵션이 지원되지 않습니다.

JSON

Append, Ignore 저장 모드가 지원되지 않습니다.

timeZone, primitivesAsString, prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, allowNumericLeadingZeros, allowBackslashEscapingAnyCharacter, mode, columnNameOfCorruptRecord, timestampNTZFormat, enableDateTimeParsingFallback, allowUnquotedControlChars, encoding, lineSep, samplingRatio, dropFieldIfAllNull, locale, allowNonNumericNumbers, compression, ignoreNullFields 옵션이 지원되지 않습니다.

:code:`Show`의 차이점: 필드 값이 문자열인 경우 따옴표로 묶입니다. 결과에 추가 “n” 문자가 표시됩니다.

Orc

파일 유형이 지원되지 않습니다.

Parquet

Append, Ignore 저장 모드가 지원되지 않습니다.

datetimeRebaseMode, int96RebaseMode, mergeSchema, compression 옵션이 지원되지 않습니다.

(ALL) 구성이 지원되지 않습니다.

텍스트

Append, Ignore 쓰기 모드가 지원되지 않습니다.

compression 옵션이 지원되지 않습니다.

lineSep 매개 변수가 쓰기에 지원되지 않습니다.

XML

파일 유형이 지원되지 않습니다.

Snowflake 테이블

테이블에 쓰기에는 공급자 형식이 필요하지 않습니다.

버킷팅 및 분할이 지원되지 않습니다.

저장소 형식 및 버전 관리가 지원되지 않습니다.

카탈로그

Snowflake Horizon Catalog 공급자 지원

  • Snowflake만 카탈로그 공급자로 지원됩니다.

APIs 카탈로그는 지원되지 않습니다.

  • registerFunction

  • listFunctions

  • getFunction

  • functionExists

  • createExternalTable

APIs 카탈로그는 일부 지원됩니다.

  • :code:`createTable`(외부 테이블 미지원)

Iceberg

Snowflake 관리형 Iceberg 테이블

Spark용 Snowpark Connect는 외부 관리형 Apache Iceberg™ 테이블(Iceberg 테이블 및 카탈로그 연결 데이터베이스 포함)과 함께 작동합니다.

읽기

과거 스냅샷, 분기 및 증분 읽기 등의 Time Travel은 지원되지 않습니다.

쓰기

  • Spark SQL을 사용해 테이블을 만드는 기능은 지원되지 않습니다.

  • 스키마 병합은 지원되지 않습니다.

  • 테이블을 만들려면 다음을 수행해야 합니다.

    • 외부 볼륨을 만듭니다.

    • 다음 방법 중 하나로 외부 볼륨 요구 사항을 테이블 생성에 연결합니다.

      • EXTERNAL_VOLUME을 데이터베이스로 설정합니다.

      • :code:`snowpark.connect.iceberg.external_volume`을 Spark 구성으로 설정합니다.

외부 관리형 Iceberg 테이블

읽기

  • Snowflake 비관리형 테이블 엔터티를 만들어야 합니다.

  • 과거 스냅샷, 분기 및 증분 읽기 등의 Time Travel은 지원되지 않습니다.

쓰기

  • 테이블 생성은 지원되지 않습니다.

  • 기존 Iceberg 테이블에 대한 쓰기가 지원됩니다.

중복 열 이름

Snowflake는 중복 열 이름을 지원하지 않습니다.

다음 코드는 duplicate column name 'foo' 같은 SQL 컴파일 오류가 발생하며 쓰기 생성에서 실패합니다.

df = spark.createDataFrame([
(1, 1),
(2, 2)
], ["foo", "foo"])

df.show() # works

df.createTempView("df_view") # Fails with SQL compilation error: duplicate column name 'foo'
Copy

이 문제를 해결하려면 다음 값 중 하나로 snowpark.connect.views.duplicate_column_names_handling_mode 구성 옵션을 설정하세요.

  • rename: 첫 번째 열 이름 뒤에 있는 모든 중복 열 이름에 _dedup_1, _dedup_2 등의 접미사가 추가됩니다.

  • drop: 한 가지를 제외한 모든 중복 열이 삭제됩니다. 열의 값이 다르면 잘못된 결과가 발생할 수 있습니다.