Snowpark Connect for Spark 互換ガイド

このガイドでは、 Snowpark Connect for Spark Spark の実装 DataFrameAPIs とネイティブApache Sparkの互換性を文書化しています。Sparkワークロードを Snowpark Connect for Spark に移動する際に、主な違い、サポートされていない機能、および移行の考慮事項をユーザーが理解できるようにすることを目的としています。

Snowpark Connect for Spark は Snowflake実行エンジンの上での、なじみのある Spark DataFrameAPI エクスペリエンスの提供を目標としています。ただし、このトピックで説明する互換性のギャップがあります。このガイドは、これらの違いを説明し、移行を計画して適応させるのに役立ちます。これらは将来のリリースで対処される可能性があります。

DataTypes

サポートされていないデータ型

暗黙的なデータ型変換

Snowpark Connect for Spark を使用する場合は、データ型の処理方法に注意してください。Snowpark Connect for Spark は暗黙的に LongType として ByteTypeShortType`および :code:`IntegerType を表します。 これは、 ByteTypeShortType または IntegerType`で列やデータを定義することができる場合、データは :code:`LongType として Snowpark Connect for Spark によって表示され、返されることを意味します。同様に、特定の操作とコンテキストに応じた暗黙的な変換が FloatType および DoubleType で起こることがあります。Snowflake 実行エンジンは内部的にデータ型の圧縮を処理し、実際にデータを Byte または :code:`Short`として保存することができます。ただし、これらは実装の詳細とみなされ、エンドユーザーに公開されません。

意味的には、この表現は Spark クエリの正確さに影響を与えません。

ネイティブ PySpark からのデータ型

Snowpark Connect for Spark からのデータ型

ByteType

LongType

ShortType

LongType

IntegerType

LongType

LongType

LongType

次の例は、Sparkと Snowpark Connect for Spark のクエリ結果でのデータ型処理方法の違いを示しています。

クエリ

query = """
    SELECT * FROM VALUES
    (float(1.0), double(1.0), 1.0, "1", true, :code:`NULL`),
    (float(2.0), double(2.0), 2.0, "2", false, :code:`NULL`),
    (float(3.0), double(3.0), :code:`NULL`, "3", false, :code:`NULL`)
    AS tab(a, b, c, d, e, f)
    """
Copy

Spark

spark.sql(query).printSchema()
Copy
root
 |-- a: float (nullable = false)
 |-- b: double (nullable = false)
 |-- c: decimal(2,1) (nullable = true)
 |-- d: string (nullable = false)
 |-- e: boolean (nullable = false)
 |-- f: void (nullable = true)

Snowpark Connect for Spark

snowpark_connect_spark.sql(query).printSchema()
Copy
root
 |-- a: double (nullable = false)
 |-- b: double (nullable = false)
 |-- c: decimal (nullable = true)
 |-- d: string (nullable = false)
 |-- e: boolean (nullable = true)
 |-- f: string (nullable = true)

NullType ニュアンス

Snowpark Connect for Spark は、Sparkでサポートされている NullType データ型をサポートしていません。これにより、データフレームの NullNone を使用すると、動作が変更されます。

Sparkでは、リテラル NULL (例: lit(None))は、 NullType として自動的に推測されます。|spconnect|内では、スキーマ推論中は StringType として推測されます。

df = self.spark.range(1).select(lit(None).alias("null_col"))
field = df.schema["null_col"]

# Spark: StructField('null_col', :code:`NullType`(), True)
# Snowpark Connect for Spark: StructField('null_col', :code:`StringType`(), True)
Copy

ArrayType MapType および ObjectType の構造化データ型。

構造化型のサポートは Snowpark Connect for Spark ではデフォルトでは利用できませんが、 ARRAYMAP および Object データ型は一般的な型付けされていないコレクションとして扱われます。これは、構造化型のサポートによって提供されるものとは異なり、要素型、フィールド名、スキーマ、またはNULL値の許容が強制されないことを意味します。

このサポートと依存関係がある場合は、アカウントチームと協力して、アカウントでこの機能を有効にしてください。

サポートされていない Spark APIs

以下は、従来の Spark および Spark Connect ではサポートされていますが、Snowpark Connect for Spark ではサポートされていない APIsです。

UDF の相違

StructType の相違

Sparkがユーザー定義関数(UDF)に使用するため StructType を変換するとき、Python の tuple 型に変換します。Snowpark Connect for Spark は StructType を Pythonの:code:dict 型に変換します。これには、要素のアクセスと出力に基本的な違いがあります。

  • Sparkは0、1、2、3などのインデックスにアクセスします。

  • Snowpark Connect for Spark は「_1」、「_2」などを使用してインデックスにアクセスします。

def f(e):
    return e[0]

    df = self.spark.createDataFrame([((1.0, 1.0), (1, 1))], ["c1", "c2"])
    result = df.select("*", udf(f, DoubleType())("c1"))

# This results in an index access issue. Workaround is to use _1, _2 as indicies.
# Workaround:

def f(e):
    return e['_1']

row = (
    self.spark.range(1)
    .selectExpr("struct(1, 2) as struct")
    .select(
        udf(lambda x: x, "struct<col1:int,col2:int>")("struct"),
    )
    .first()
)

self.assertEquals(row[0], Row(col1=1, col2=2))

# Spark: Row(col1=1, col2=2)

# Snowpark Connect for Spark: {'col1': 1, 'col2': 2}
Copy

反復子型の UDFs

反復子は、戻り型または入力型としてサポートされていません。

# This will not work
def func(iterator):
  for _ in iterator:
              ...

df = self.spark.range(10)
actual = df.repartition(1).mapInArrow(func, "a long").collect()
Copy

Importing files to a Python UDF

With Snowpark Connect for Spark, you can specify external libraries and files in Python UDFs. Snowflake includes Python files and archives in your code's execution context. You can import functions from these included files in a UDF without additional steps. This dependency-handling behavior works as described in ステージからアップロードされたコードを使用したPython UDF の作成.

To include external libraries and files, you provide stage paths to the files as the value of the configuration setting snowpark.connect.udf.imports. The configuration value should be an array of stage paths to the files, where the paths are separated by commas.

Code in the following example includes two files in the UDF's execution context. The UDF imports functions from these files and uses them in its logic.

# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.imports", "[@stage/library.py, @other_lib.zip]")

@udf(returnType = StringType())
def import_example(input: str) -> str:
  from library import first_function
  from other_lib.custom import second_function

  return first_function(input) + second_function(input)

spark.range(1).select(import_read_example("example_string")).show()
Copy

You can use the snowpark.connect.udf.imports setting to include other kinds of files as well, such as those with data your code needs to read. Note that when you do this, your code should only read from the included files; any writes to such files will be lost after the function's execution ends.

# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.imports", "[@stage/data.csv]")

@udf(returnType = StringType())
def import_read_example(file_name: str) -> str:
  with open(file_name) as f:
    return f.read()

spark.range(1).select(import_read_example("data.csv")).show()
Copy

Lambda 関数の制限

Snowpark Connect for Spark はLambda式と高次関数( transform 関数など)をサポートしますが、Lambda本文内からの外部列または式の参照をサポートしません。

この制限は、Snowflake <label-query_semistructured_data_limitations>` でのLambda式の :ref:` 制限によって引き起こされます。

data = [
    (1, [1, 2, 3]),
    (2, [4, 5]),
    (3, [])
]
df = spark.createDataFrame(data, ["id", "numbers"])

df.select(transform(df.numbers, lambda el: el + array_size(df.numbers))).show() # this fails in Spark Connect for Snowpark, because of a
reference to df.numbers which is outside of the lambda context

df.select(transform(df.numbers, lambda el: negative(el) + 1)).show() # this works
Copy

また、ユーザー定義関数(UDFs)はLambda式内ではサポートされないという制限もあります。これには、カスタム UDFs と、基になる実装が Snowflake UDFs に依存する特定の組み込み関数の両方が含まれます。Lambda式内で UDFを使用しようとするとエラーになります。

spark.range(1).select(split(lit('a,b,c'), ',')).show() # works
spark.range(1).select(transform(array(lit('a,b,c')), lambda x: split(x, ','))).show() # does not work, since `split` is implemented with UDF
Copy

Temporary views

By default, Snowpark Connect for Spark does not create a temporary view in Snowflake. You can specify that Snowpark Connect for Spark creates a temporary view by setting the configuration parameter snowpark.connect.temporary.views.create_in_snowflake to true.

If the parameter is set to false, Snowpark Connect for Spark stores views as DataFrames without creating a Snowflake view. This helps to prevent the issue that can occur when the view definition SQL created from Spark Connect request exceeds Snowflake view size limit (95KB).

Temporary views are normally visible when using Spark Connect Catalog API. However, they are not accessible when called from SQL statements with configuration snowpark.connect.sql.passthrough set to true. To create Snowflake temporary views, set configuration snowpark.connect.temporary.views.create_in_snowflake to true.

データソース

データソース

PySpark と比較した互換性の問題

Avro

ファイル型はサポートされていません。

CSV

以下では保存モードはサポートされていません:AppendIgnore

以下のオプションはサポートされていません:encodingquotequoteAllescapeescapeQuotescommentpreferDateenforceSchemaignoreLeadingWhiteSpaceignoreTrailingWhiteSpacenanValuepositiveInfnegativeInftimestampNTZFormatenableDateTimeParsingFallbackmaxColumnsmaxCharsPerColumnmodecolumnNameOfCorruptRecordcharToEscapeQuoteEscapingsamplingRatioemptyValuelocalelineSepunescapedQuoteHandlingcompression

JSON

以下では保存モードはサポートされていません:AppendIgnore

以下のオプションはサポートされていません:timeZoneprimitivesAsStringprefersDecimalallowCommentsallowUnquotedFieldNamesallowSingleQuotesallowNumericLeadingZerosallowBackslashEscapingAnyCharactermodecolumnNameOfCorruptRecordtimestampNTZFormatenableDateTimeParsingFallbackallowUnquotedControlCharsencodinglineSepsamplingRatiodropFieldIfAllNulllocaleallowNonNumericNumberscompressionignoreNullFields

Show の差:フィールドの値が文字列の場合、引用符で囲まれます。余分な「n」文字が結果に表示されます。

Orc

ファイル型はサポートされていません。

Parquet

以下では保存モードはサポートされていません:AppendIgnore

以下のオプションはサポートされていません:datetimeRebaseModeint96RebaseModemergeSchemacompression

サポートされていない設定:(ALL)

テキスト

以下では書き込みモードはサポートされていません:AppendIgnore

以下のオプションはサポートされていません:compression

lineSep パラメーターは書き込みではサポートされていません。

XML

ファイル型はサポートされていません。

Snowflakeテーブル

テーブルへの書き込みには、プロバイダー形式は必要ありません。

バケットとパーティション分割はサポートされていません。

ストレージの形式とバージョン管理はサポートされていません。

カタログ

Snowflake Horizon カタログ プロバイダーのサポート

  • カタログ プロバイダーとしてサポートされているのはSnowflakeのみです。

サポートされていないカタログ APIs

  • registerFunction

  • listFunctions

  • getFunction

  • functionExists

  • createExternalTable

部分的にサポートされているカタログ APIs

  • createTable (外部テーブルのサポートなし)

Iceberg

Snowflake 管理 Iceberg テーブル

Spark 用 Snowpark Connectは、外部で管理された Iceberg テーブルやカタログリンクのデータベースを含む Apache Iceberg™テーブルと動作します。

読む

履歴スナップショット、分岐、増分読み取りを含む Time Travel はサポートされていません。

書き込み

  • テーブルを作成するための Spark SQL の使用はサポートされていません。

  • スキーマのマージはサポートされていません。

  • テーブルを作成するには、以下が必要です。

    • 外部ボリュームを作成します。

    • 以下のいずれかの方法で、外部ボリュームニーズをテーブル作成にリンクします。

      • データベースに EXTERNAL_VOLUME を設定します。

      • :code:`snowpark.connect.iceberg.external_volume`を Sparkの構成に設定します。

外部管理 Iceberg テーブル

読む

  • Snowflakeの管理されていないテーブルエンティティを作成する必要があります。

  • 履歴スナップショット、分岐、増分読み取りを含む Time Travel はサポートされていません。

書き込み

  • テーブルの作成はサポートされていません。

  • 既存のIceberg テーブルへの書き込みはサポートされています。

列名の重複

Snowflake は、列名の重複をサポートしていません。

次のコードは、次の SQL コンパイルエラーのためビュー作成ステップで失敗します: duplicate column name 'foo'

df = spark.createDataFrame([
(1, 1),
(2, 2)
], ["foo", "foo"])

df.show() # works

df.createTempView("df_view") # Fails with SQL compilation error: duplicate column name 'foo'
Copy

これを回避するには、snowpark.connect.views.duplicate_column_names_handling_mode 構成オプションを次の値のいずれかに設定します:

  • rename:_dedup_1, _dedup_2 などのようなサフィックスが、最初の列の後のすべての重複列名に追加されます。

  • drop:1つを除くすべての重複列は削除されます。列の値が異なる場合は、誤った結果になる可能性があります。