Snowpark Connect for Spark 호환성 가이드

이 가이드에서는 Spark DataFrame APIs의 Snowpark Connect for Spark 구현과 네이티브 Apache Spark 간의 호환성에 대해 설명합니다. 이 가이드는 사용자가 Spark 워크로드를 Snowpark Connect for Spark 로 이동할 때의 키 차이점과 지원되지 않는 기능, 마이그레이션 고려 사항을 이해하는 데 도움을 주기 위해 작성되었습니다.

Snowpark Connect for Spark 는 Snowflake 실행 엔진을 기반으로 익숙한 Spark DataFrame API 환경을 제공하는 데 목표를 두고 있습니다. 그러나 이 항목에서 설명하는 호환성 차이 문제가 있습니다. 이 가이드에서는 마이그레이션을 계획하고 조정하는 데 도움이 되는 이러한 차이점을 중점적으로 설명합니다. 관련 문제는 향후 릴리스에서 해결될 수 있습니다.

DataTypes

지원되지 않는 데이터 타입

암시적 데이터 타입 변환

Snowpark Connect for Spark 사용 시 데이터 타입이 처리되는 방식에 유의하세요. Snowpark Connect for Spark 는 ByteType, ShortType, IntegerType`을 :code:`LongType`으로 암시적으로 표현합니다. 즉, 열이나 데이터를 :code:`ByteType, ShortType 또는 IntegerType`으로 정의할 수는 있지만 데이터는 |spconnect| 에서 :code:`LongType`으로 표현되고 반환됩니다.  마찬가지로, 특정 작업과 컨텍스트에 따라 :code:`FloatTypeDoubleType`에 대해서도 암시적 변환이 발생할 있습니다. Snowflake 실행 엔진은 데이터 타입 압축을 내부적으로 처리하며 실제로 데이터를 :code:`Byte 또는 :code:`Short`으로 저장할 수 있지만, 이는 구현 세부 정보로 간주되고 최종 사용자에게 공개되지 않습니다.

이 표현은 의미상 Spark 쿼리의 정확성에 영향을 주지 않습니다.

네이티브 PySpark의 데이터 타입

Snowpark Connect for Spark 의 데이터 타입

ByteType

LongType

ShortType

LongType

IntegerType

LongType

LongType

LongType

다음 예제에서는 Spark와 Snowpark Connect for Spark 가 쿼리 결과에서 데이터 타입을 처리하는 방법의 차이점을 설명합니다.

쿼리

query = """
    SELECT * FROM VALUES
    (float(1.0), double(1.0), 1.0, "1", true, :code:`NULL`),
    (float(2.0), double(2.0), 2.0, "2", false, :code:`NULL`),
    (float(3.0), double(3.0), :code:`NULL`, "3", false, :code:`NULL`)
    AS tab(a, b, c, d, e, f)
    """
Copy

Spark

spark.sql(query).printSchema()
Copy
root
 |-- a: float (nullable = false)
 |-- b: double (nullable = false)
 |-- c: decimal(2,1) (nullable = true)
 |-- d: string (nullable = false)
 |-- e: boolean (nullable = false)
 |-- f: void (nullable = true)

Snowpark Connect for Spark

snowpark_connect_spark.sql(query).printSchema()
Copy
root
 |-- a: double (nullable = false)
 |-- b: double (nullable = false)
 |-- c: decimal (nullable = true)
 |-- d: string (nullable = false)
 |-- e: boolean (nullable = true)
 |-- f: string (nullable = true)

:code:`NullType`의 미묘한 차이

Snowpark Connect for Spark 는 Spark에서 지원되는 `NullType<https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.NullType.html>`_ 데이터 타입을 지원하지 않습니다. 이로 인해 데이터 프레임에서 Null 또는 :code:`None`을 사용할 때 동작이 변경됩니다.

Spark에서 NULL 리터럴(예: lit(None))은 :code:`NullType`으로 자동 추론됩니다. Snowpark Connect for Spark 에서는 스키마 추론 중 :code:`StringType`으로 추론됩니다.

df = self.spark.range(1).select(lit(None).alias("null_col"))
field = df.schema["null_col"]

# Spark: StructField('null_col', :code:`NullType`(), True)
# Snowpark Connect for Spark: StructField('null_col', :code:`StringType`(), True)
Copy

ArrayType, MapType, :code:`ObjectType`의 정형화 데이터 타입

Snowpark Connect for Spark 에서는 기본적으로 정형화 타입 지원을 사용할 수 없지만, ARRAY, MAP, Object 데이터 타입은 유형이 지정되지 않은 일반 컬렉션으로 처리됩니다. 즉, 정형화 타입 지원에서 제공하는 것과 달리 요소 유형, 필드 이름, 스키마 또는 null 허용 여부가 적용되지 않습니다.

이 지원에 대한 종속성이 있는 경우 계정 팀과 협업하여 계정에서 이 기능을 활성화하세요.

지원되지 않는 Spark APIs

다음은 클래식 Spark 및 Spark Connect에서는 지원되지만 Snowpark Connect for Spark 에서는 지원되지 않는 APIs입니다.

UDF의 차이점

:code:`StructType`의 차이점

Spark는 사용자 정의 함수(UDF)에 사용할 StructType`을 변환할 Python의 :code:`tuple 유형으로 변환합니다. Snowpark Connect for Spark 는 StructType`을 Python의 :code:`dict 유형으로 변환합니다. 이는 요소 액세스 및 출력에 근본적인 차이점이 있습니다.

  • Spark는 0, 1, 2, 3 등을 사용하여 인덱스에 액세스합니다.

  • Snowpark Connect for Spark 는 ‘_1’, ‘_2’ 등을 사용하여 인덱스에 액세스합니다.

def f(e):
    return e[0]

    df = self.spark.createDataFrame([((1.0, 1.0), (1, 1))], ["c1", "c2"])
    result = df.select("*", udf(f, DoubleType())("c1"))

# This results in an index access issue. Workaround is to use _1, _2 as indicies.
# Workaround:

def f(e):
    return e['_1']

row = (
    self.spark.range(1)
    .selectExpr("struct(1, 2) as struct")
    .select(
        udf(lambda x: x, "struct<col1:int,col2:int>")("struct"),
    )
    .first()
)

self.assertEquals(row[0], Row(col1=1, col2=2))

# Spark: Row(col1=1, col2=2)

# Snowpark Connect for Spark: {'col1': 1, 'col2': 2}
Copy

UDFs의 반복자 유형

반복자는 반환 유형 또는 입력 유형으로 지원되지 않습니다.

# This will not work
def func(iterator):
  for _ in iterator:
              ...

df = self.spark.range(10)
actual = df.repartition(1).mapInArrow(func, "a long").collect()
Copy

Python UDF로 파일 가져오기

Snowpark Connect for Spark 를 사용하면 Python UDFs에서 외부 라이브러리 및 파일을 지정할 수 있습니다. Snowflake는 코드의 실행 컨텍스트에 Python 파일과 아카이브를 포함합니다. 추가 단계 없이 UDF에 포함된 이러한 파일에서 함수를 가져올 수 있습니다. 이 종속성 처리 동작은 :ref:`label-udf_python_stage`에 설명된 대로 작동합니다.

외부 라이브러리 및 파일을 포함하려면 파일에 대한 스테이지 경로를 구성 설정 :code:`snowpark.connect.udf.imports`의 값으로 제공합니다. 구성 값은 파일에 대한 스테이지 경로의 배열이어야 하며, 경로는 쉼표로 구분됩니다.

다음 예제의 코드에는 UDF의 실행 컨텍스트에 두 개의 파일이 포함되어 있습니다. UDF는 이러한 파일에서 함수를 가져와 논리에서 사용합니다.

# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.imports", "[@stage/library.py, @other_lib.zip]")

@udf(returnType = StringType())
def import_example(input: str) -> str:
  from library import first_function
  from other_lib.custom import second_function

  return first_function(input) + second_function(input)

spark.range(1).select(import_read_example("example_string")).show()
Copy

snowpark.connect.udf.imports 설정을 사용하여 코드에서 읽어야 하는 데이터가 있는 파일과 같은 다른 종류의 파일도 포함할 수 있습니다. 이 작업을 수행할 때 코드는 포함된 파일에서만 읽어야 합니다. 함수 실행이 종료되면 이러한 파일에 대한 모든 쓰기가 손실됩니다.

# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.imports", "[@stage/data.csv]")

@udf(returnType = StringType())
def import_read_example(file_name: str) -> str:
  with open(file_name) as f:
    return f.read()

spark.range(1).select(import_read_example("data.csv")).show()
Copy

Lambda 함수 제한 사항

User-defined functions (UDFs) are not supported within lambda expressions. This includes both custom UDFs and certain built-in functions whose underlying implementation relies on Snowflake UDFs. Attempting to use a UDF inside a lambda expression will result in an error.

df = spark.createDataFrame([({"a": 123},)], ("data",))
df.select(map_filter("data", lambda _, v: bit_count(v) > 3)).show() # does not work, since `bit_count` is implemented with UDF
Copy

임시 뷰

기본적으로, Snowpark Connect for Spark 는 Snowflake에서 임시 뷰를 생성하지 않습니다. :code:`true`에 구성 매개 변수 :code:`snowpark.connect.temporary.views.create_in_snowflake`를 설정하여 임시 뷰를 생성하는 Snowpark Connect for Spark 를 지정합니다.

매개 변수가 :code:`false`로 설정된 경우, Snowpark Connect for Spark 는 Snowflake 뷰를 만들지 않고 뷰를 DataFrames로 저장합니다. 이는 Spark Connect 요청에서 생성된 뷰 정의 SQL이 Snowflake 뷰 크기 제한(95KB)을 초과하는 경우 발생할 수 있는 문제를 방지하는 데 도움이 됩니다.

임시 뷰는 일반적으로 Spark Connect Catalog API를 사용할 때 표시됩니다. 그러나 구성 :code:`snowpark.connect.sql.passthrough`가 :code:`true`로 설정된 SQL 문에서 호출되는 경우에는 액세스할 수 없습니다. Snowflake 임시 뷰를 생성하려면 구성 :code:`snowpark.connect.temporary.views.create_in_snowflake`를 :code:`true`로 설정합니다.

데이터 소스

데이터 소스

PySpark와 비교한 호환성 문제

Avro

파일 유형이 지원되지 않습니다.

CSV

Append, Ignore 저장 모드가 지원되지 않습니다.

encoding, quote, quoteAll, escape, escapeQuotes, comment, preferDate, enforceSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nanValue, positiveInf, negativeInf, timestampNTZFormat, enableDateTimeParsingFallback, maxColumns, maxCharsPerColumn, mode, columnNameOfCorruptRecord, charToEscapeQuoteEscaping, samplingRatio, emptyValue, locale, lineSep, unescapedQuoteHandling, compression 옵션이 지원되지 않습니다.

JSON

Append, Ignore 저장 모드가 지원되지 않습니다.

timeZone, primitivesAsString, prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, allowNumericLeadingZeros, allowBackslashEscapingAnyCharacter, mode, columnNameOfCorruptRecord, timestampNTZFormat, enableDateTimeParsingFallback, allowUnquotedControlChars, encoding, lineSep, samplingRatio, dropFieldIfAllNull, locale, allowNonNumericNumbers, compression, ignoreNullFields 옵션이 지원되지 않습니다.

:code:`Show`의 차이점: 필드 값이 문자열인 경우 따옴표로 묶입니다. 결과에 추가 “n” 문자가 표시됩니다.

Orc

파일 유형이 지원되지 않습니다.

Parquet

Append, Ignore 저장 모드가 지원되지 않습니다.

datetimeRebaseMode, int96RebaseMode, mergeSchema, compression 옵션이 지원되지 않습니다.

(ALL) 구성이 지원되지 않습니다.

텍스트

Append, Ignore 쓰기 모드가 지원되지 않습니다.

compression 옵션이 지원되지 않습니다.

lineSep 매개 변수가 쓰기에 지원되지 않습니다.

XML

파일 유형이 지원되지 않습니다.

Snowflake 테이블

테이블에 쓰기에는 공급자 형식이 필요하지 않습니다.

버킷팅 및 분할이 지원되지 않습니다.

저장소 형식 및 버전 관리가 지원되지 않습니다.

카탈로그

Snowflake Horizon Catalog 공급자 지원

  • Snowflake만 카탈로그 공급자로 지원됩니다.

APIs 카탈로그는 지원되지 않습니다.

  • registerFunction

  • listFunctions

  • getFunction

  • functionExists

  • createExternalTable

APIs 카탈로그는 일부 지원됩니다.

  • :code:`createTable`(외부 테이블 미지원)

Iceberg

Snowflake 관리형 Iceberg 테이블

Spark용 Snowpark Connect는 외부 관리형 Apache Iceberg™ 테이블(Iceberg 테이블 및 카탈로그 연결 데이터베이스 포함)과 함께 작동합니다.

읽기

과거 스냅샷, 분기 및 증분 읽기 등의 Time Travel은 지원되지 않습니다.

쓰기

  • Spark SQL을 사용해 테이블을 만드는 기능은 지원되지 않습니다.

  • 스키마 병합은 지원되지 않습니다.

  • 테이블을 만들려면 다음을 수행해야 합니다.

    • 외부 볼륨을 만듭니다.

    • 다음 방법 중 하나로 외부 볼륨 요구 사항을 테이블 생성에 연결합니다.

      • EXTERNAL_VOLUME을 데이터베이스로 설정합니다.

      • :code:`snowpark.connect.iceberg.external_volume`을 Spark 구성으로 설정합니다.

외부 관리형 Iceberg 테이블

읽기

  • Snowflake 비관리형 테이블 엔터티를 만들어야 합니다.

  • 과거 스냅샷, 분기 및 증분 읽기 등의 Time Travel은 지원되지 않습니다.

쓰기

  • 테이블 생성은 지원되지 않습니다.

  • 기존 Iceberg 테이블에 대한 쓰기가 지원됩니다.

중복 열 이름

Snowflake는 중복 열 이름을 지원하지 않습니다.

다음 코드는 duplicate column name 'foo' 같은 SQL 컴파일 오류가 발생하며 쓰기 생성에서 실패합니다.

df = spark.createDataFrame([
(1, 1),
(2, 2)
], ["foo", "foo"])

df.show() # works

df.createTempView("df_view") # Fails with SQL compilation error: duplicate column name 'foo'
Copy

이 문제를 해결하려면 다음 값 중 하나로 snowpark.connect.views.duplicate_column_names_handling_mode 구성 옵션을 설정하세요.

  • rename: 첫 번째 열 이름 뒤에 있는 모든 중복 열 이름에 _dedup_1, _dedup_2 등의 접미사가 추가됩니다.

  • drop: 한 가지를 제외한 모든 중복 열이 삭제됩니다. 열의 값이 다르면 잘못된 결과가 발생할 수 있습니다.