Snowpark Connect for Spark 互換ガイド¶
このガイドでは、 Snowpark Connect for Spark Spark の実装 DataFrameAPIs とネイティブApache Sparkの互換性を文書化しています。Sparkワークロードを Snowpark Connect for Spark に移動する際に、主な違い、サポートされていない機能、および移行の考慮事項をユーザーが理解できるようにすることを目的としています。
Snowpark Connect for Spark は Snowflake実行エンジンの上での、なじみのある Spark DataFrameAPI エクスペリエンスの提供を目標としています。ただし、このトピックで説明する互換性のギャップがあります。このガイドは、これらの違いを説明し、移行を計画して適応させるのに役立ちます。これらは将来のリリースで対処される可能性があります。
DataTypes¶
サポートされていないデータ型¶
暗黙的なデータ型変換¶
Snowpark Connect for Spark を使用する場合は、データ型の処理方法に注意してください。Snowpark Connect for Spark は暗黙的に LongType として ByteType、 ShortType`および :code:`IntegerType を表します。 これは、 ByteType、 ShortType または IntegerType`で列やデータを定義することができる場合、データは :code:`LongType として Snowpark Connect for Spark によって表示され、返されることを意味します。同様に、特定の操作とコンテキストに応じた暗黙的な変換が FloatType および DoubleType で起こることがあります。Snowflake 実行エンジンは内部的にデータ型の圧縮を処理し、実際にデータを Byte または :code:`Short`として保存することができます。ただし、これらは実装の詳細とみなされ、エンドユーザーに公開されません。
意味的には、この表現は Spark クエリの正確さに影響を与えません。
ネイティブ PySpark からのデータ型 |
Snowpark Connect for Spark からのデータ型 |
|---|---|
|
|
|
|
|
|
|
|
次の例は、Sparkと Snowpark Connect for Spark のクエリ結果でのデータ型処理方法の違いを示しています。
クエリ¶
query = """
SELECT * FROM VALUES
(float(1.0), double(1.0), 1.0, "1", true, :code:`NULL`),
(float(2.0), double(2.0), 2.0, "2", false, :code:`NULL`),
(float(3.0), double(3.0), :code:`NULL`, "3", false, :code:`NULL`)
AS tab(a, b, c, d, e, f)
"""
Spark¶
spark.sql(query).printSchema()
root
|-- a: float (nullable = false)
|-- b: double (nullable = false)
|-- c: decimal(2,1) (nullable = true)
|-- d: string (nullable = false)
|-- e: boolean (nullable = false)
|-- f: void (nullable = true)
Snowpark Connect for Spark¶
snowpark_connect_spark.sql(query).printSchema()
root
|-- a: double (nullable = false)
|-- b: double (nullable = false)
|-- c: decimal (nullable = true)
|-- d: string (nullable = false)
|-- e: boolean (nullable = true)
|-- f: string (nullable = true)
NullType ニュアンス¶
Snowpark Connect for Spark は、Sparkでサポートされている NullType データ型をサポートしていません。これにより、データフレームの Null や None を使用すると、動作が変更されます。
Sparkでは、リテラル NULL (例: lit(None))は、 NullType として自動的に推測されます。|spconnect|内では、スキーマ推論中は StringType として推測されます。
df = self.spark.range(1).select(lit(None).alias("null_col"))
field = df.schema["null_col"]
# Spark: StructField('null_col', :code:`NullType`(), True)
# Snowpark Connect for Spark: StructField('null_col', :code:`StringType`(), True)
ArrayType MapType および ObjectType の構造化データ型。¶
構造化型のサポートは Snowpark Connect for Spark ではデフォルトでは利用できませんが、 ARRAY、 MAP および Object データ型は一般的な型付けされていないコレクションとして扱われます。これは、構造化型のサポートによって提供されるものとは異なり、要素型、フィールド名、スキーマ、またはNULL値の許容が強制されないことを意味します。
このサポートと依存関係がある場合は、アカウントチームと協力して、アカウントでこの機能を有効にしてください。
サポートされていない Spark APIs¶
以下は、従来の Spark および Spark Connect ではサポートされていますが、Snowpark Connect for Spark ではサポートされていない APIsです。
Dataframe.hint: Snowpark Connect for Spark は、データフレームに設定されたヒントを無視します。Snowflake クエリオプティマイザーは、最も効率的な実行戦略を自動的に決定します。
DataFrame.repartition:これは Snowpark Connect for Spark の無処理です。Snowflake は、分散コンピューティングインフラストラクチャ全体でのデータ配分とパーティション分割を自動的に管理します。
`pyspark RDD<https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html>`_: RDDAPIs は Spark Connect ではサポートされていません( Snowpark Connect for Spark を含む)。
UDF の相違¶
StructType の相違¶
Sparkがユーザー定義関数(UDF)に使用するため StructType を変換するとき、Python の tuple 型に変換します。Snowpark Connect for Spark は StructType を Pythonの:code:dict 型に変換します。これには、要素のアクセスと出力に基本的な違いがあります。
Sparkは0、1、2、3などのインデックスにアクセスします。
Snowpark Connect for Spark は「_1」、「_2」などを使用してインデックスにアクセスします。
def f(e):
return e[0]
df = self.spark.createDataFrame([((1.0, 1.0), (1, 1))], ["c1", "c2"])
result = df.select("*", udf(f, DoubleType())("c1"))
# This results in an index access issue. Workaround is to use _1, _2 as indicies.
# Workaround:
def f(e):
return e['_1']
row = (
self.spark.range(1)
.selectExpr("struct(1, 2) as struct")
.select(
udf(lambda x: x, "struct<col1:int,col2:int>")("struct"),
)
.first()
)
self.assertEquals(row[0], Row(col1=1, col2=2))
# Spark: Row(col1=1, col2=2)
# Snowpark Connect for Spark: {'col1': 1, 'col2': 2}
反復子型の UDFs¶
反復子は、戻り型または入力型としてサポートされていません。
# This will not work
def func(iterator):
for _ in iterator:
...
df = self.spark.range(10)
actual = df.repartition(1).mapInArrow(func, "a long").collect()
Importing files to a Python UDF¶
With Snowpark Connect for Spark, you can specify external libraries and files in Python UDFs. Snowflake includes Python files and archives in your code's execution context. You can import functions from these included files in a UDF without additional steps. This dependency-handling behavior works as described in ステージからアップロードされたコードを使用したPython UDF の作成.
To include external libraries and files, you provide stage paths to the files as the value of the configuration setting
snowpark.connect.udf.imports. The configuration value should be an array of stage paths to the files, where the paths are
separated by commas.
Code in the following example includes two files in the UDF's execution context. The UDF imports functions from these files and uses them in its logic.
# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.imports", "[@stage/library.py, @other_lib.zip]")
@udf(returnType = StringType())
def import_example(input: str) -> str:
from library import first_function
from other_lib.custom import second_function
return first_function(input) + second_function(input)
spark.range(1).select(import_read_example("example_string")).show()
You can use the snowpark.connect.udf.imports setting to include other kinds of files as well, such as those with data your code
needs to read. Note that when you do this, your code should only read from the included files; any writes to such files will be lost after
the function's execution ends.
# Files need to be previously staged
spark.conf.set("snowpark.connect.udf.imports", "[@stage/data.csv]")
@udf(returnType = StringType())
def import_read_example(file_name: str) -> str:
with open(file_name) as f:
return f.read()
spark.range(1).select(import_read_example("data.csv")).show()
Lambda 関数の制限¶
Snowpark Connect for Spark はLambda式と高次関数( transform 関数など)をサポートしますが、Lambda本文内からの外部列または式の参照をサポートしません。
この制限は、Snowflake <label-query_semistructured_data_limitations>` でのLambda式の :ref:` 制限によって引き起こされます。
data = [
(1, [1, 2, 3]),
(2, [4, 5]),
(3, [])
]
df = spark.createDataFrame(data, ["id", "numbers"])
df.select(transform(df.numbers, lambda el: el + array_size(df.numbers))).show() # this fails in Spark Connect for Snowpark, because of a
reference to df.numbers which is outside of the lambda context
df.select(transform(df.numbers, lambda el: negative(el) + 1)).show() # this works
また、ユーザー定義関数(UDFs)はLambda式内ではサポートされないという制限もあります。これには、カスタム UDFs と、基になる実装が Snowflake UDFs に依存する特定の組み込み関数の両方が含まれます。Lambda式内で UDFを使用しようとするとエラーになります。
spark.range(1).select(split(lit('a,b,c'), ',')).show() # works
spark.range(1).select(transform(array(lit('a,b,c')), lambda x: split(x, ','))).show() # does not work, since `split` is implemented with UDF
Temporary views¶
By default, Snowpark Connect for Spark does not create a temporary view in Snowflake. You can specify that Snowpark Connect for Spark creates a temporary view by
setting the configuration parameter snowpark.connect.temporary.views.create_in_snowflake to true.
If the parameter is set to false, Snowpark Connect for Spark stores views as DataFrames without creating a Snowflake view. This helps to prevent
the issue that can occur when the view definition SQL created from Spark Connect request exceeds Snowflake view size limit (95KB).
Temporary views are normally visible when using Spark Connect Catalog API. However, they are not accessible when called from SQL statements
with configuration snowpark.connect.sql.passthrough set to true. To create Snowflake temporary views, set configuration
snowpark.connect.temporary.views.create_in_snowflake to true.
データソース¶
データソース |
PySpark と比較した互換性の問題 |
|---|---|
Avro |
ファイル型はサポートされていません。 |
CSV |
以下では保存モードはサポートされていません: 以下のオプションはサポートされていません: |
JSON |
以下では保存モードはサポートされていません: 以下のオプションはサポートされていません:
|
Orc |
ファイル型はサポートされていません。 |
Parquet |
以下では保存モードはサポートされていません: 以下のオプションはサポートされていません: サポートされていない設定:(ALL) |
テキスト |
以下では書き込みモードはサポートされていません: 以下のオプションはサポートされていません:
|
XML |
ファイル型はサポートされていません。 |
Snowflakeテーブル |
テーブルへの書き込みには、プロバイダー形式は必要ありません。 バケットとパーティション分割はサポートされていません。 ストレージの形式とバージョン管理はサポートされていません。 |
カタログ¶
Snowflake Horizon カタログ プロバイダーのサポート¶
カタログ プロバイダーとしてサポートされているのはSnowflakeのみです。
サポートされていないカタログ APIs¶
registerFunctionlistFunctionsgetFunctionfunctionExistscreateExternalTable
部分的にサポートされているカタログ APIs¶
createTable(外部テーブルのサポートなし)
Iceberg¶
Snowflake 管理 Iceberg テーブル¶
Spark 用 Snowpark Connectは、外部で管理された Iceberg テーブルやカタログリンクのデータベースを含む Apache Iceberg™テーブルと動作します。
読む¶
履歴スナップショット、分岐、増分読み取りを含む Time Travel はサポートされていません。
書き込み¶
テーブルを作成するための Spark SQL の使用はサポートされていません。
スキーマのマージはサポートされていません。
テーブルを作成するには、以下が必要です。
外部ボリュームを作成します。
以下のいずれかの方法で、外部ボリュームニーズをテーブル作成にリンクします。
データベースに EXTERNAL_VOLUME を設定します。
:code:`snowpark.connect.iceberg.external_volume`を Sparkの構成に設定します。
外部管理 Iceberg テーブル¶
読む¶
Snowflakeの管理されていないテーブルエンティティを作成する必要があります。
履歴スナップショット、分岐、増分読み取りを含む Time Travel はサポートされていません。
書き込み¶
テーブルの作成はサポートされていません。
既存のIceberg テーブルへの書き込みはサポートされています。
列名の重複¶
Snowflake は、列名の重複をサポートしていません。
次のコードは、次の SQL コンパイルエラーのためビュー作成ステップで失敗します: duplicate column name 'foo'
df = spark.createDataFrame([
(1, 1),
(2, 2)
], ["foo", "foo"])
df.show() # works
df.createTempView("df_view") # Fails with SQL compilation error: duplicate column name 'foo'
これを回避するには、snowpark.connect.views.duplicate_column_names_handling_mode 構成オプションを次の値のいずれかに設定します:
rename:_dedup_1,_dedup_2などのようなサフィックスが、最初の列の後のすべての重複列名に追加されます。drop:1つを除くすべての重複列は削除されます。列の値が異なる場合は、誤った結果になる可能性があります。