Snowpark Connect for Spark 互換ガイド¶
このガイドでは、 Snowpark Connect for Spark Spark の実装 DataFrameAPIs とネイティブApache Sparkの互換性を文書化しています。Sparkワークロードを Snowpark Connect for Spark に移動する際に、主な違い、サポートされていない機能、および移行の考慮事項をユーザーが理解できるようにすることを目的としています。
Snowpark Connect for Spark は Snowflake実行エンジンの上での、なじみのある Spark DataFrameAPI エクスペリエンスの提供を目標としています。ただし、このトピックで説明する互換性のギャップがあります。このガイドは、これらの違いを説明し、移行を計画して適応させるのに役立ちます。これらは将来のリリースで対処される可能性があります。
DataTypes¶
サポートされていないデータ型¶
暗黙的なデータ型変換¶
Snowpark Connect for Spark を使用する場合は、データ型の処理方法に注意してください。 Snowpark Connect for Spark は暗黙的に LongType
として ByteType
、 ShortType`および :code:`IntegerType
を表します。これは、 ByteType
、 ShortType
または IntegerType`で列やデータを定義することができる場合、データは :code:`LongType
として Snowpark Connect for Spark によって表示され、返されることを意味します。 同様に、特定の操作とコンテキストに応じた暗黙的な変換が FloatType
および DoubleType
で起こることがあります。Snowflake 実行エンジンは内部的にデータ型の圧縮を処理し、実際にデータを Byte
または :code:`Short`として保存することができます。ただし、これらは実装の詳細とみなされ、エンドユーザーに公開されません。
意味的には、この表現は Spark クエリの正確さに影響を与えません。
ネイティブ PySpark からのデータ型 |
Snowpark Connect for Spark からのデータ型 |
---|---|
|
|
|
|
|
|
|
|
次の例は、Sparkと Snowpark Connect for Spark のクエリ結果でのデータ型処理方法の違いを示しています。
クエリ¶
query = """
SELECT * FROM VALUES
(float(1.0), double(1.0), 1.0, "1", true, :code:`NULL`),
(float(2.0), double(2.0), 2.0, "2", false, :code:`NULL`),
(float(3.0), double(3.0), :code:`NULL`, "3", false, :code:`NULL`)
AS tab(a, b, c, d, e, f)
"""
Spark¶
spark.sql(query).printSchema()
root
|-- a: float (nullable = false)
|-- b: double (nullable = false)
|-- c: decimal(2,1) (nullable = true)
|-- d: string (nullable = false)
|-- e: boolean (nullable = false)
|-- f: void (nullable = true)
Snowpark Connect for Spark¶
snowpark_connect_spark.sql(query).printSchema()
root
|-- a: double (nullable = false)
|-- b: double (nullable = false)
|-- c: decimal (nullable = true)
|-- d: string (nullable = false)
|-- e: boolean (nullable = true)
|-- f: string (nullable = true)
NullType
ニュアンス¶
Snowpark Connect for Spark は、Sparkでサポートされている NullType データ型をサポートしていません。これにより、データフレームの Null
や None
を使用すると、動作が変更されます。
Sparkでは、リテラル NULL
(例: lit(None)
)は、 NullType
として自動的に推測されます。|spconnect|内では、スキーマ推論中は StringType
として推測されます。
df = self.spark.range(1).select(lit(None).alias("null_col"))
field = df.schema["null_col"]
# Spark: StructField('null_col', :code:`NullType`(), True)
# |spconnect|: StructField('null_col', :code:`StringType`(), True)
ArrayType
MapType
および ObjectType
の構造化データ型。¶
構造化型のサポートは Snowpark Connect for Spark ではデフォルトでは利用できませんが、 ARRAY
、 MAP
および Object
データ型は一般的な型付けされていないコレクションとして扱われます。これは、構造化型のサポートによって提供されるものとは異なり、要素型、フィールド名、スキーマ、またはNULL値の許容が強制されないことを意味します。
このサポートと依存関係がある場合は、アカウントチームと協力して、アカウントでこの機能を有効にしてください。
サポートされていない Spark APIs¶
以下は、従来の Spark および Spark Connect ではサポートされていますが、Snowpark Connect for Spark ではサポートされていない APIsです。
Dataframe.hint: Snowpark Connect for Spark は、データフレームに設定されたヒントを無視します。Snowflake クエリオプティマイザーは、最も効率的な実行戦略を自動的に決定します。
DataFrame.repartition:これは Snowpark Connect for Spark の無処理です。Snowflake は、分散コンピューティングインフラストラクチャ全体でのデータ配分とパーティション分割を自動的に管理します。
`pyspark RDD<https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html>`_: RDDAPIs は Spark Connect ではサポートされていません( Snowpark Connect for Spark を含む)。
UDF の相違¶
StructType
の相違¶
Sparkがユーザー定義関数(UDF)に使用するため StructType
を変換するとき、Python の tuple
型に変換します。 Snowpark Connect for Spark は StructType
を Pythonの:code:dict
型に変換します。これには、要素のアクセスと出力に基本的な違いがあります。
Sparkは0、1、2、3などのインデックスにアクセスします。
Snowpark Connect for Spark は「_1」、「_2」などを使用してインデックスにアクセスします。
def f(e):
return e[0]
df = self.spark.createDataFrame([((1.0, 1.0), (1, 1))], ["c1", "c2"])
result = df.select("*", udf(f, DoubleType())("c1"))
# This results in an index access issue. Workaround is to use _1, _2 as indicies.
# Workaround:
def f(e):
return e['_1']
row = (
self.spark.range(1)
.selectExpr("struct(1, 2) as struct")
.select(
udf(lambda x: x, "struct<col1:int,col2:int>")("struct"),
)
.first()
)
self.assertEquals(row[0], Row(col1=1, col2=2))
# Spark: Row(col1=1, col2=2)
# |spconnect|: {'col1': 1, 'col2': 2}
反復子型の UDFs¶
反復子は、戻り型または入力型としてサポートされていません。
# This will not work
def func(iterator):
for _ in iterator:
...
df = self.spark.range(10)
actual = df.repartition(1).mapInArrow(func, "a long").collect()
Lambda 関数の制限¶
Snowpark Connect for Spark はLambda式と高次関数( transform
関数など)をサポートしますが、Lambda本文内からの外部列または式の参照をサポートしません。
この制限は、Snowflake <label-query_semistructured_data_limitations>` でのLambda式の :ref:` 制限によって引き起こされます。
data = [
(1, [1, 2, 3]),
(2, [4, 5]),
(3, [])
]
df = spark.createDataFrame(data, ["id", "numbers"])
df.select(transform(df.numbers, lambda el: el + array_size(df.numbers))).show() # this fails in Spark Connect for Snowpark, because of a
reference to df.numbers which is outside of the lambda context
df.select(transform(df.numbers, lambda el: negative(el) + 1)).show() # this works
また、ユーザー定義関数(UDFs)はLambda式内ではサポートされないという制限もあります。これには、カスタム UDFs と、基になる実装が Snowflake UDFs に依存する特定の組み込み関数の両方が含まれます。Lambda式内で UDFを使用しようとするとエラーになります。
spark.range(1).select(split(lit('a,b,c'), ',')).show() # works
spark.range(1).select(transform(array(lit('a,b,c')), lambda x: split(x, ','))).show() # does not work, since `split` is implemented with UDF
データソース¶
データソース |
PySpark と比較した互換性の問題 |
---|---|
Avro |
ファイル型はサポートされていません。 |
CSV |
以下では保存モードはサポートされていません: 以下のオプションはサポートされていません: |
JSON |
以下では保存モードはサポートされていません: 以下のオプションはサポートされていません:
|
Orc |
ファイル型はサポートされていません。 |
Parquet |
以下では保存モードはサポートされていません: 以下のオプションはサポートされていません: サポートされていない設定:(ALL) |
テキスト |
以下では書き込みモードはサポートされていません: 以下のオプションはサポートされていません:
|
XML |
ファイル型はサポートされていません。 |
Snowflakeテーブル |
テーブルへの書き込みには、プロバイダー形式は必要ありません。 バケットとパーティション分割はサポートされていません。 ストレージの形式とバージョン管理はサポートされていません。 |
カタログ¶
Snowflake Horizon カタログ プロバイダーのサポート¶
カタログ プロバイダーとしてサポートされているのはSnowflakeのみです。
サポートされていないカタログ APIs¶
registerFunction
listFunctions
getFunction
functionExists
createExternalTable
部分的にサポートされているカタログ APIs¶
createTable
(外部テーブルのサポートなし)
Iceberg¶
Snowflake 管理 Iceberg テーブル¶
Spark 用 Snowpark Connectは、外部で管理された Iceberg テーブルやカタログリンクのデータベースを含む Apache Iceberg™テーブルと動作します。
読む¶
履歴スナップショット、分岐、増分読み取りを含む Time Travel はサポートされていません。
書き込み¶
テーブルを作成するための Spark SQL の使用はサポートされていません。
スキーマのマージはサポートされていません。
テーブルを作成するには、以下が必要です。
外部ボリュームを作成します。
以下のいずれかの方法で、外部ボリュームニーズをテーブル作成にリンクします。
データベースに EXTERNAL_VOLUME を設定します。
:code:`snowpark.connect.iceberg.external_volume`を Sparkの構成に設定します。
外部管理 Iceberg テーブル¶
読む¶
Snowflakeの管理されていないテーブルエンティティを作成する必要があります。
履歴スナップショット、分岐、増分読み取りを含む Time Travel はサポートされていません。
書き込み¶
テーブルの作成はサポートされていません。
既存のIceberg テーブルへの書き込みはサポートされています。
列名の重複¶
Snowflake は、列名の重複をサポートしていません。
次のコードは、次の SQL コンパイルエラーのためビュー作成ステップで失敗します: duplicate column name 'foo'
df = spark.createDataFrame([
(1, 1),
(2, 2)
], ["foo", "foo"])
df.show() # works
df.createTempView("df_view") # Fails with SQL compilation error: duplicate column name 'foo'
これを回避するには、snowpark.connect.views.duplicate_column_names_handling_mode
構成オプションを次の値のいずれかに設定します:
rename
:_dedup_1
,_dedup_2
などのようなサフィックスが、最初の列の後のすべての重複列名に追加されます。drop
:1つを除くすべての重複列は削除されます。列の値が異なる場合は、誤った結果になる可能性があります。