Snowpark Connect for Spark 互換ガイド

このガイドでは、 Snowpark Connect for Spark Spark の実装 DataFrameAPIs とネイティブApache Sparkの互換性を文書化しています。Sparkワークロードを Snowpark Connect for Spark に移動する際に、主な違い、サポートされていない機能、および移行の考慮事項をユーザーが理解できるようにすることを目的としています。

Snowpark Connect for Spark は Snowflake実行エンジンの上での、なじみのある Spark DataFrameAPI エクスペリエンスの提供を目標としています。ただし、このトピックで説明する互換性のギャップがあります。このガイドは、これらの違いを説明し、移行を計画して適応させるのに役立ちます。これらは将来のリリースで対処される可能性があります。

DataTypes

サポートされていないデータ型

暗黙的なデータ型変換

Snowpark Connect for Spark を使用する場合は、データ型の処理方法に注意してください。Snowpark Connect for Spark は暗黙的に LongType として ByteTypeShortType`および :code:`IntegerType を表します。 これは、 ByteTypeShortType または IntegerType`で列やデータを定義することができる場合、データは :code:`LongType として Snowpark Connect for Spark によって表示され、返されることを意味します。同様に、特定の操作とコンテキストに応じた暗黙的な変換が FloatType および DoubleType で起こることがあります。Snowflake 実行エンジンは内部的にデータ型の圧縮を処理し、実際にデータを Byte または :code:`Short`として保存することができます。ただし、これらは実装の詳細とみなされ、エンドユーザーに公開されません。

意味的には、この表現は Spark クエリの正確さに影響を与えません。

ネイティブ PySpark からのデータ型

Snowpark Connect for Spark からのデータ型

ByteType

LongType

ShortType

LongType

IntegerType

LongType

LongType

LongType

次の例は、Sparkと Snowpark Connect for Spark のクエリ結果でのデータ型処理方法の違いを示しています。

クエリ

query = """
    SELECT * FROM VALUES
    (float(1.0), double(1.0), 1.0, "1", true, :code:`NULL`),
    (float(2.0), double(2.0), 2.0, "2", false, :code:`NULL`),
    (float(3.0), double(3.0), :code:`NULL`, "3", false, :code:`NULL`)
    AS tab(a, b, c, d, e, f)
    """
Copy

Spark

spark.sql(query).printSchema()
Copy
root
 |-- a: float (nullable = false)
 |-- b: double (nullable = false)
 |-- c: decimal(2,1) (nullable = true)
 |-- d: string (nullable = false)
 |-- e: boolean (nullable = false)
 |-- f: void (nullable = true)

Snowpark Connect for Spark

snowpark_connect_spark.sql(query).printSchema()
Copy
root
 |-- a: double (nullable = false)
 |-- b: double (nullable = false)
 |-- c: decimal (nullable = true)
 |-- d: string (nullable = false)
 |-- e: boolean (nullable = true)
 |-- f: string (nullable = true)

NullType ニュアンス

Snowpark Connect for Spark は、Sparkでサポートされている NullType データ型をサポートしていません。これにより、データフレームの NullNone を使用すると、動作が変更されます。

Sparkでは、リテラル NULL (例: lit(None))は、 NullType として自動的に推測されます。Snowpark Connect for Spark 内では、スキーマ推論中は StringType として推測されます。

df = self.spark.range(1).select(lit(None).alias("null_col"))
field = df.schema["null_col"]

# Spark: StructField('null_col', :code:`NullType`(), True)
# Snowpark Connect for Spark: StructField('null_col', :code:`StringType`(), True)
Copy

ArrayType MapType および ObjectType の構造化データ型。

構造化型のサポートは Snowpark Connect for Spark ではデフォルトでは利用できませんが、 ARRAYMAP および Object データ型は一般的な型付けされていないコレクションとして扱われます。これは、構造化型のサポートによって提供されるものとは異なり、要素型、フィールド名、スキーマ、またはNULL値の許容が強制されないことを意味します。

このサポートと依存関係がある場合は、アカウントチームと協力して、アカウントでこの機能を有効にしてください。

サポートされていない Spark APIs

以下は、従来の Spark および Spark Connect ではサポートされていますが、Snowpark Connect for Spark ではサポートされていない APIsです。

UDF の相違

StructType の相違

Sparkがユーザー定義関数(UDF)に使用するため StructType を変換するとき、Python の tuple 型に変換します。Snowpark Connect for Spark は StructType を Pythonの:code:dict 型に変換します。これには、要素のアクセスと出力に基本的な違いがあります。

  • Sparkは0、1、2、3などのインデックスにアクセスします。

  • Snowpark Connect for Spark は「_1」、「_2」などを使用してインデックスにアクセスします。

def f(e):
    return e[0]

    df = self.spark.createDataFrame([((1.0, 1.0), (1, 1))], ["c1", "c2"])
    result = df.select("*", udf(f, DoubleType())("c1"))

# This results in an index access issue. Workaround is to use _1, _2 as indicies.
# Workaround:

def f(e):
    return e['_1']

row = (
    self.spark.range(1)
    .selectExpr("struct(1, 2) as struct")
    .select(
        udf(lambda x: x, "struct<col1:int,col2:int>")("struct"),
    )
    .first()
)

self.assertEquals(row[0], Row(col1=1, col2=2))

# Spark: Row(col1=1, col2=2)

# Snowpark Connect for Spark: {'col1': 1, 'col2': 2}
Copy

反復子型の UDFs

反復子は、戻り型または入力型としてサポートされていません。

# This will not work
def func(iterator):
  for _ in iterator:
              ...

df = self.spark.range(10)
actual = df.repartition(1).mapInArrow(func, "a long").collect()
Copy

Python UDF へのファイルのインポート

Snowpark Connect for Spark を使用すると、Python UDFs で外部ライブラリとファイルを指定できます。Snowflakeには、コードの実行コンテキストにPythonファイルとアーカイブが含まれています。追加の手順なしで、これらの UDF に含まれるファイルから関数をインポートできます。この依存関係の動作は、 ステージからアップロードされたコードを使用したPython UDF の作成 で説明されているように機能します。

外部ライブラリとファイルを含めるには、 snowpark.connect.udf.imports 構成設定の値としてファイルへのステージパスを指定します。構成値は、ファイルへのステージパスの配列である必要があります。パスはコンマで区切られます。

次の例のコードには、 UDF の実行コンテキストの2つのファイルが含まれています。UDF はこれらのファイルから関数をインポートし、ロジックで使用します。

# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.imports", "[@stage/library.py, @other_lib.zip]")

@udf(returnType = StringType())
def import_example(input: str) -> str:
  from library import first_function
  from other_lib.custom import second_function

  return first_function(input) + second_function(input)

spark.range(1).select(import_read_example("example_string")).show()
Copy

snowpark.connect.udf.imports 設定を使用して、コードが読み取る必要のあるデータなど、他の種類のファイルも含めます。これを行う場合、コードは含まれているファイルからのみ読み取る必要があることに注意してください。関数の実行が終了すると、そのようなファイルへの書き込みは失われます。

# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.imports", "[@stage/data.csv]")

@udf(returnType = StringType())
def import_read_example(file_name: str) -> str:
  with open(file_name) as f:
    return f.read()

spark.range(1).select(import_read_example("data.csv")).show()
Copy

Lambda 関数の制限

User-defined functions (UDFs) are not supported within lambda expressions. This includes both custom UDFs and certain built-in functions whose underlying implementation relies on Snowflake UDFs. Attempting to use a UDF inside a lambda expression will result in an error.

df = spark.createDataFrame([({"a": 123},)], ("data",))
df.select(map_filter("data", lambda _, v: bit_count(v) > 3)).show() # does not work, since `bit_count` is implemented with UDF
Copy

仮ビュー

デフォルトでは Snowpark Connect for Spark は、Snowflakeで仮ビューを作成しません。構成パラメーター snowpark.connect.temporary.views.create_in_snowflaketrue に設定することで、 Snowpark Connect for Spark が仮ビューを作成するように指定できます。

パラメーターが false に設定されている場合、Snowpark Connect for Spark は Snowflakeビューを作成せずにビューを DataFrames として保存します。これは、Spark Connectリクエストから作成されたビュー定義 SQL がSnowflakeのビューサイズ制限(95KB)を超えた場合に発生する可能性のある問題を防ぐのに役立ちます。

Spark接続カタログ API を使用すると、仮ビューは通常表示されます。ただし、構成 snowpark.connect.sql.passthroughtrue に設定された SQL ステートメントから呼び出された場合は、アクセスできません。Snowflakeの仮ビューを作成するには、構成 snowpark.connect.temporary.views.create_in_snowflaketrue に設定します。

データソース

データソース

PySpark と比較した互換性の問題

Avro

ファイル型はサポートされていません。

CSV

以下では保存モードはサポートされていません:AppendIgnore

以下のオプションはサポートされていません:encodingquotequoteAllescapeescapeQuotescommentpreferDateenforceSchemaignoreLeadingWhiteSpaceignoreTrailingWhiteSpacenanValuepositiveInfnegativeInftimestampNTZFormatenableDateTimeParsingFallbackmaxColumnsmaxCharsPerColumnmodecolumnNameOfCorruptRecordcharToEscapeQuoteEscapingsamplingRatioemptyValuelocalelineSepunescapedQuoteHandlingcompression

JSON

以下では保存モードはサポートされていません:AppendIgnore

以下のオプションはサポートされていません:timeZoneprimitivesAsStringprefersDecimalallowCommentsallowUnquotedFieldNamesallowSingleQuotesallowNumericLeadingZerosallowBackslashEscapingAnyCharactermodecolumnNameOfCorruptRecordtimestampNTZFormatenableDateTimeParsingFallbackallowUnquotedControlCharsencodinglineSepsamplingRatiodropFieldIfAllNulllocaleallowNonNumericNumberscompressionignoreNullFields

Show の差:フィールドの値が文字列の場合、引用符で囲まれます。余分な「n」文字が結果に表示されます。

Orc

ファイル型はサポートされていません。

Parquet

以下では保存モードはサポートされていません:AppendIgnore

以下のオプションはサポートされていません:datetimeRebaseModeint96RebaseModemergeSchemacompression

サポートされていない設定:(ALL)

テキスト

以下では書き込みモードはサポートされていません:AppendIgnore

以下のオプションはサポートされていません:compression

lineSep パラメーターは書き込みではサポートされていません。

XML

ファイル型はサポートされていません。

Snowflakeテーブル

テーブルへの書き込みには、プロバイダー形式は必要ありません。

バケットとパーティション分割はサポートされていません。

ストレージの形式とバージョン管理はサポートされていません。

カタログ

Snowflake Horizon カタログ プロバイダーのサポート

  • カタログ プロバイダーとしてサポートされているのはSnowflakeのみです。

サポートされていないカタログ APIs

  • registerFunction

  • listFunctions

  • getFunction

  • functionExists

  • createExternalTable

部分的にサポートされているカタログ APIs

  • createTable (外部テーブルのサポートなし)

Iceberg

Snowflake 管理 Iceberg テーブル

Spark 用 Snowpark Connectは、外部で管理された Iceberg テーブルやカタログリンクのデータベースを含む Apache Iceberg™テーブルと動作します。

読む

履歴スナップショット、分岐、増分読み取りを含む Time Travel はサポートされていません。

書き込み

  • テーブルを作成するための Spark SQL の使用はサポートされていません。

  • スキーマのマージはサポートされていません。

  • テーブルを作成するには、以下が必要です。

    • 外部ボリュームを作成します。

    • 以下のいずれかの方法で、外部ボリュームニーズをテーブル作成にリンクします。

      • データベースに EXTERNAL_VOLUME を設定します。

      • :code:`snowpark.connect.iceberg.external_volume`を Sparkの構成に設定します。

外部管理 Iceberg テーブル

読む

  • Snowflakeの管理されていないテーブルエンティティを作成する必要があります。

  • 履歴スナップショット、分岐、増分読み取りを含む Time Travel はサポートされていません。

書き込み

  • テーブルの作成はサポートされていません。

  • 既存のIceberg テーブルへの書き込みはサポートされています。

列名の重複

Snowflake は、列名の重複をサポートしていません。

次のコードは、次の SQL コンパイルエラーのためビュー作成ステップで失敗します: duplicate column name 'foo'

df = spark.createDataFrame([
(1, 1),
(2, 2)
], ["foo", "foo"])

df.show() # works

df.createTempView("df_view") # Fails with SQL compilation error: duplicate column name 'foo'
Copy

これを回避するには、snowpark.connect.views.duplicate_column_names_handling_mode 構成オプションを次の値のいずれかに設定します:

  • rename:_dedup_1, _dedup_2 などのようなサフィックスが、最初の列の後のすべての重複列名に追加されます。

  • drop:1つを除くすべての重複列は削除されます。列の値が異なる場合は、誤った結果になる可能性があります。