Snowpark Connect for Spark 互換ガイド

このガイドでは、 Snowpark Connect for Spark Spark の実装 DataFrameAPIs とネイティブApache Sparkの互換性を文書化しています。Sparkワークロードを Snowpark Connect for Spark に移動する際に、主な違い、サポートされていない機能、および移行の考慮事項をユーザーが理解できるようにすることを目的としています。

Snowpark Connect for Spark は Snowflake実行エンジンの上での、なじみのある Spark DataFrameAPI エクスペリエンスの提供を目標としています。ただし、このトピックで説明する互換性のギャップがあります。このガイドは、これらの違いを説明し、移行を計画して適応させるのに役立ちます。これらは将来のリリースで対処される可能性があります。

DataTypes

サポートされていないデータ型

暗黙的なデータ型変換

Snowpark Connect for Spark を使用する場合は、データ型の処理方法に注意してください。 Snowpark Connect for Spark は暗黙的に LongType として ByteTypeShortType`および :code:`IntegerType を表します。これは、 ByteTypeShortType または IntegerType`で列やデータを定義することができる場合、データは :code:`LongType として Snowpark Connect for Spark によって表示され、返されることを意味します。 同様に、特定の操作とコンテキストに応じた暗黙的な変換が FloatType および DoubleType で起こることがあります。Snowflake 実行エンジンは内部的にデータ型の圧縮を処理し、実際にデータを Byte または :code:`Short`として保存することができます。ただし、これらは実装の詳細とみなされ、エンドユーザーに公開されません。

意味的には、この表現は Spark クエリの正確さに影響を与えません。

ネイティブ PySpark からのデータ型

Snowpark Connect for Spark からのデータ型

ByteType

LongType

ShortType

LongType

IntegerType

LongType

LongType

LongType

次の例は、Sparkと Snowpark Connect for Spark のクエリ結果でのデータ型処理方法の違いを示しています。

クエリ

query = """
    SELECT * FROM VALUES
    (float(1.0), double(1.0), 1.0, "1", true, :code:`NULL`),
    (float(2.0), double(2.0), 2.0, "2", false, :code:`NULL`),
    (float(3.0), double(3.0), :code:`NULL`, "3", false, :code:`NULL`)
    AS tab(a, b, c, d, e, f)
    """
Copy

Spark

spark.sql(query).printSchema()
Copy
root
 |-- a: float (nullable = false)
 |-- b: double (nullable = false)
 |-- c: decimal(2,1) (nullable = true)
 |-- d: string (nullable = false)
 |-- e: boolean (nullable = false)
 |-- f: void (nullable = true)

Snowpark Connect for Spark

snowpark_connect_spark.sql(query).printSchema()
Copy
root
 |-- a: double (nullable = false)
 |-- b: double (nullable = false)
 |-- c: decimal (nullable = true)
 |-- d: string (nullable = false)
 |-- e: boolean (nullable = true)
 |-- f: string (nullable = true)

NullType ニュアンス

Snowpark Connect for Spark は、Sparkでサポートされている NullType データ型をサポートしていません。これにより、データフレームの NullNone を使用すると、動作が変更されます。

Sparkでは、リテラル NULL (例: lit(None))は、 NullType として自動的に推測されます。|spconnect|内では、スキーマ推論中は StringType として推測されます。

df = self.spark.range(1).select(lit(None).alias("null_col"))
field = df.schema["null_col"]

# Spark: StructField('null_col', :code:`NullType`(), True)
# |spconnect|: StructField('null_col', :code:`StringType`(), True)
Copy

ArrayType MapType および ObjectType の構造化データ型。

構造化型のサポートは Snowpark Connect for Spark ではデフォルトでは利用できませんが、 ARRAYMAP および Object データ型は一般的な型付けされていないコレクションとして扱われます。これは、構造化型のサポートによって提供されるものとは異なり、要素型、フィールド名、スキーマ、またはNULL値の許容が強制されないことを意味します。

このサポートと依存関係がある場合は、アカウントチームと協力して、アカウントでこの機能を有効にしてください。

サポートされていない Spark APIs

以下は、従来の Spark および Spark Connect ではサポートされていますが、Snowpark Connect for Spark ではサポートされていない APIsです。

UDF の相違

StructType の相違

Sparkがユーザー定義関数(UDF)に使用するため StructType を変換するとき、Python の tuple 型に変換します。 Snowpark Connect for Spark は StructType を Pythonの:code:dict 型に変換します。これには、要素のアクセスと出力に基本的な違いがあります。

  • Sparkは0、1、2、3などのインデックスにアクセスします。

  • Snowpark Connect for Spark は「_1」、「_2」などを使用してインデックスにアクセスします。

def f(e):
    return e[0]

    df = self.spark.createDataFrame([((1.0, 1.0), (1, 1))], ["c1", "c2"])
    result = df.select("*", udf(f, DoubleType())("c1"))

# This results in an index access issue. Workaround is to use _1, _2 as indicies.
# Workaround:

def f(e):
    return e['_1']

row = (
    self.spark.range(1)
    .selectExpr("struct(1, 2) as struct")
    .select(
        udf(lambda x: x, "struct<col1:int,col2:int>")("struct"),
    )
    .first()
)

self.assertEquals(row[0], Row(col1=1, col2=2))

# Spark: Row(col1=1, col2=2)

# |spconnect|: {'col1': 1, 'col2': 2}
Copy

反復子型の UDFs

反復子は、戻り型または入力型としてサポートされていません。

# This will not work
def func(iterator):
  for _ in iterator:
              ...

df = self.spark.range(10)
actual = df.repartition(1).mapInArrow(func, "a long").collect()
Copy

Lambda 関数の制限

Snowpark Connect for Spark はLambda式と高次関数( transform 関数など)をサポートしますが、Lambda本文内からの外部列または式の参照をサポートしません。

この制限は、Snowflake <label-query_semistructured_data_limitations>` でのLambda式の :ref:` 制限によって引き起こされます。

data = [
    (1, [1, 2, 3]),
    (2, [4, 5]),
    (3, [])
]
df = spark.createDataFrame(data, ["id", "numbers"])

df.select(transform(df.numbers, lambda el: el + array_size(df.numbers))).show() # this fails in Spark Connect for Snowpark, because of a
reference to df.numbers which is outside of the lambda context

df.select(transform(df.numbers, lambda el: negative(el) + 1)).show() # this works
Copy

また、ユーザー定義関数(UDFs)はLambda式内ではサポートされないという制限もあります。これには、カスタム UDFs と、基になる実装が Snowflake UDFs に依存する特定の組み込み関数の両方が含まれます。Lambda式内で UDFを使用しようとするとエラーになります。

spark.range(1).select(split(lit('a,b,c'), ',')).show() # works
spark.range(1).select(transform(array(lit('a,b,c')), lambda x: split(x, ','))).show() # does not work, since `split` is implemented with UDF
Copy

データソース

データソース

PySpark と比較した互換性の問題

Avro

ファイル型はサポートされていません。

CSV

以下では保存モードはサポートされていません:AppendIgnore

以下のオプションはサポートされていません:encodingquotequoteAllescapeescapeQuotescommentpreferDateenforceSchemaignoreLeadingWhiteSpaceignoreTrailingWhiteSpacenanValuepositiveInfnegativeInftimestampNTZFormatenableDateTimeParsingFallbackmaxColumnsmaxCharsPerColumnmodecolumnNameOfCorruptRecordcharToEscapeQuoteEscapingsamplingRatioemptyValuelocalelineSepunescapedQuoteHandlingcompression

JSON

以下では保存モードはサポートされていません:AppendIgnore

以下のオプションはサポートされていません:timeZoneprimitivesAsStringprefersDecimalallowCommentsallowUnquotedFieldNamesallowSingleQuotesallowNumericLeadingZerosallowBackslashEscapingAnyCharactermodecolumnNameOfCorruptRecordtimestampNTZFormatenableDateTimeParsingFallbackallowUnquotedControlCharsencodinglineSepsamplingRatiodropFieldIfAllNulllocaleallowNonNumericNumberscompressionignoreNullFields

Show の差:フィールドの値が文字列の場合、引用符で囲まれます。余分な「n」文字が結果に表示されます。

Orc

ファイル型はサポートされていません。

Parquet

以下では保存モードはサポートされていません:AppendIgnore

以下のオプションはサポートされていません:datetimeRebaseModeint96RebaseModemergeSchemacompression

サポートされていない設定:(ALL)

テキスト

以下では書き込みモードはサポートされていません:AppendIgnore

以下のオプションはサポートされていません:compression

lineSep パラメーターは書き込みではサポートされていません。

XML

ファイル型はサポートされていません。

Snowflakeテーブル

テーブルへの書き込みには、プロバイダー形式は必要ありません。

バケットとパーティション分割はサポートされていません。

ストレージの形式とバージョン管理はサポートされていません。

カタログ

Snowflake Horizon カタログ プロバイダーのサポート

  • カタログ プロバイダーとしてサポートされているのはSnowflakeのみです。

サポートされていないカタログ APIs

  • registerFunction

  • listFunctions

  • getFunction

  • functionExists

  • createExternalTable

部分的にサポートされているカタログ APIs

  • createTable (外部テーブルのサポートなし)

Iceberg

Snowflake 管理 Iceberg テーブル

Spark 用 Snowpark Connectは、外部で管理された Iceberg テーブルやカタログリンクのデータベースを含む Apache Iceberg™テーブルと動作します。

読む

履歴スナップショット、分岐、増分読み取りを含む Time Travel はサポートされていません。

書き込み

  • テーブルを作成するための Spark SQL の使用はサポートされていません。

  • スキーマのマージはサポートされていません。

  • テーブルを作成するには、以下が必要です。

    • 外部ボリュームを作成します。

    • 以下のいずれかの方法で、外部ボリュームニーズをテーブル作成にリンクします。

      • データベースに EXTERNAL_VOLUME を設定します。

      • :code:`snowpark.connect.iceberg.external_volume`を Sparkの構成に設定します。

外部管理 Iceberg テーブル

読む

  • Snowflakeの管理されていないテーブルエンティティを作成する必要があります。

  • 履歴スナップショット、分岐、増分読み取りを含む Time Travel はサポートされていません。

書き込み

  • テーブルの作成はサポートされていません。

  • 既存のIceberg テーブルへの書き込みはサポートされています。

列名の重複

Snowflake は、列名の重複をサポートしていません。

次のコードは、次の SQL コンパイルエラーのためビュー作成ステップで失敗します: duplicate column name 'foo'

df = spark.createDataFrame([
(1, 1),
(2, 2)
], ["foo", "foo"])

df.show() # works

df.createTempView("df_view") # Fails with SQL compilation error: duplicate column name 'foo'
Copy

これを回避するには、snowpark.connect.views.duplicate_column_names_handling_mode 構成オプションを次の値のいずれかに設定します:

  • rename:_dedup_1, _dedup_2 などのようなサフィックスが、最初の列の後のすべての重複列名に追加されます。

  • drop:1つを除くすべての重複列は削除されます。列の値が異なる場合は、誤った結果になる可能性があります。