Snowpark Connect for Spark プロパティ

Snowpark Connect for Spark は標準のSparkと同様の方法でカスタム設定をサポートしています。構成プロパティは、キーと値のペアを使用してセッションの set メソッドを通じてのみ変更できます。Snowpark Connect for Spark は、実行に影響を与える限定されたプロパティセットのみを認識することに注意してください。サポートされていないプロパティは、例外を発生させずに無視されます。

サポートされているSparkプロパティ

Snowpark Connect for Spark は、Sparkプロパティのサブセットをサポートしています。

プロパティ名

デフォルト

意味

以来

spark.app.name

(なし)

クエリ追跡用のアプリケーション名をSnowflake query_tag`(:code:`Spark-Connect-App-Name={name})に設定します。

1.0.0

spark.Catalog.databaseFilterInformationSchema

false

true の場合、カタログ操作のデータベースリストから INFORMATION_SCHEMA を除外します。

1.0.0

spark.hadoop.fs.s3a.access.key

(なし)

S3の場所を読み書きするときにS3認証に使用する AWS アクセスキー ID。

1.0.0

spark.hadoop.fs.s3a.assumed.role.arn

(なし)

ロールベース認証を使用する場合のS3アクセスを備えた AWS IAM ロール ARN。

1.0.0

spark.hadoop.fs.s3a.secret.key

(なし)

S3の場所を読み書きするときに使用するS3認証用の AWS シークレットアクセスキー。

1.0.0

spark.hadoop.fs.s3a.server-side-encryption.key

(なし)

AWS_SSE_KMS 暗号化タイプを使用する場合のサーバー側暗号化用の AWS KMS キー ID。

1.0.0

spark.hadoop.fs.s3a.session.token

(なし)

STS を使用する場合の一時的なS3認証情報の AWS セッショントークン。

1.0.0

spark.sql.ansi.enabled

false

より厳密な型チェックとエラー処理のために ANSISQL モードを有効にします。true の場合、算術オーバーフローと無効なキャストでは、 NULL を返す代わりにエラーが発生します。

1.0.0

spark.sql.caseSensitive

false

識別子の大文字と小文字の区別を制御します。false の場合、列名とテーブル名は大文字と小文字を区別しません(Snowflakeで自動的に大文字になります)

1.0.0

spark.sql.crossJoin.enabled

true

暗黙的なクロス結合を有効または無効にします。false の場合、または結合条件が欠落している場合や単純である場合は、エラーが発生します。

1.0.0

spark.sql.execution.pythonUDTF.arrow.enabled

false

true の場合、Python UDTF のシリアライズ/デシリアライズに対してApache Arrow最適化が有効になります。

1.0.0

spark.sql.globalTempDatabase

global_temp

グローバル仮ビューのスキーマ名。存在しない場合は自動的に作成されます。

1.0.0

spark.sql.legacy.allowHashOnMapType

false

true の場合、 MAP 型列ハッシュを許可します。デフォルトでは、Sparkの動作との一貫性を保つために、 MAP 型をハッシュすることはできません。

1.0.0

spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue

false

データセットグループ化キーの命名時のレガシー動作。

1.6.0

spark.sql.mapKeyDedupPolicy

EXCEPTION

マップの作成で重複したキーが見つかった場合の動作を制御します。値: :code:`EXCEPTION`(エラーを発生させる)または :code:`LAST_WIN`(最後の値を保持)

1.0.0

spark.sql.parser.quotedRegexColumnNames

false

true の場合、 SQL クエリ内の引用符で囲まれた列名の正規表現パターンマッチングが有効になります(例: SELECT '(col1|col2)' FROM table

1.0.0

spark.sql.parquet.outputTimestampType

TIMESTAMP_MILLIS

Parquetの出力タイムスタンプ型を制御します。TIMESTAMP_MILLIS または TIMESTAMP_MICROS をサポートします。

1.7.0

spark.sql.pyspark.inferNestedDictAsStruct.enabled

false

true の場合、DataFrame の作成中に、ネストされたPythonディクショナリを MapType ではなく StructType として推測します。

1.0.0

spark.sql.pyspark.legacy.inferArrayTypeFromFirstElement.enabled

false

true の場合、すべての要素をサンプリングする代わりに、最初の要素からのみ配列要素型を推測します。

1.0.0

spark.sql.repl.eagerEval.enabled

false

true の場合、 show() を呼び出さずに DataFrame の結果を自動的に表示する REPL での先行評価が有効になります。

1.0.0

spark.sql.repl.eagerEval.maxNumRows

20

REPL 先行評価モードに表示する行の最大数。

1.0.0

spark.sql.repl.eagerEval.truncate

20

切り捨て前の REPL 先行評価表示における列値の最大幅。

1.0.0

spark.sql.session.localRelationCacheThreshold

2147483647

ローカル関係をキャッシュするためのバイトしきい値。これより大きな関係は、パフォーマンスを改善させるためにキャッシュされます。

1.0.0

spark.sql.session.timeZone

<system_local_timezone>

タイムスタンプ操作に使用されるセッションタイムゾーン。ALTER SESSION SET TIMEZONE 経由でSnowflakeセッションと同期しました。

1.0.0

spark.sql.sources.default

parquet

形式が明示的に指定されていない場合の、読み取り/書き込み操作のデフォルトデータソース形式。

1.0.0

spark.sql.timestampType

TIMESTAMP_LTZ

タイムスタンプ操作のデフォルトのタイムスタンプ型。値: :code:`TIMESTAMP_LTZ`(現地時間帯)または :code:`TIMESTAMP_NTZ`(タイムゾーンなし)

1.0.0

spark.sql.tvf.allowMultipleTableArguments.enabled

true

true の場合、テーブル値の関数が複数のテーブル引数を受け入れることを可能にします。

1.0.0

サポートされた Snowpark Connect for Spark プロパティ

Snowpark Connect for Spark 固有のカスタム構成プロパティ

プロパティ名

デフォルト

意味

以来

fs.azure.sas.<container>.<account>.blob.core.windows.net

(なし)

BLOBストレージ認証用のAzure SAS トークン。Azure Blob Storageの場所への読み取りまたは書き込み時に使用されます。

1.0.0

fs.azure.sas.fixed.token.<account>.dfs.core.windows.net

(なし)

ADLS Gen2(データレイクストレージ)認証用Azure SAS トークン。Azure Data Lake Storage Gen2の場所への読み取りまたは書き込み時に使用されます。

1.0.0

mapreduce.fileoutputcommitter.marksuccessfuljobs

false

true の場合、 Hadoop/Sparkワークフローとの互換性のために、書き込み操作の成功後 _SUCCESS ファイルが生成されます。

1.0.0

parquet.enable.summary-metadata

false

Parquet要約メタデータファイルを生成するための代替構成。これまたは spark.sql.parquet.enable.summary-metadata により機能が有効になります。

1.4.0

snowflake.repartition.for.writes

false

true の場合、書き込み時に DataFrame.repartition(n) が出力を n ファイルに分割するように強制します。Sparkの動作と一致しますが、オーバーヘッドが追加されます。

1.0.0

snowpark.connect.cte.optimization_enabled

false

true の場合、Snowparkセッションで共通テーブル式(CTE)の最適化が有効になりクエリパフォーマンスが向上します。

1.0.0

snowpark.connect.describe_cache_ttl_seconds

300

クエリキャッシュエントリの有効時間(秒)繰り返しのスキーマ検索を減らします。

1.0.0

snowpark.connect.enable_snowflake_extension_behavior

false

true の場合、Sparkの動作とは異なる可能性のあるSnowflake固有の拡張機能(MAP 型のハッシュや MD5 戻り値の型など)が有効になります。

1.0.0

snowpark.connect.handleIntegralOverflow

false

true の場合、統合的なオーバーフローの動作は、Sparkのアプローチに合わせられています。

1.7.0

snowpark.connect.iceberg.external_volume

(なし)

Icebergテーブル操作用のSnowflake外部ボリューム名。

1.0.0

snowpark.connect.integralTypesEmulation

client_default

小数から整数型への変換を制御します。値: client_defaultenableddisabled

1.7.0

snowpark.connect.scala.version

2.12

使用されるScalaバージョンを制御します(2.12 または 2.13 をサポート)

1.7.0

snowpark.connect.sql.partition.external_table_location

(なし)

パーティション書き込み用の外部テーブルの場所パス。

1.4.0

snowpark.connect.temporary.views.create_in_snowflake

false

true の場合、ローカルで管理するのではなく、Snowflakeで直接仮ビューを作成します。

1.0.1

snowpark.connect.udf.imports [DEPRECATED 1.7.0]

(なし)

UDF 実行のためにインポートするファイルまたはモジュールのコンマ区切りリスト。変更時に UDF の再作成をトリガーします。

1.0.0

snowpark.connect.udf.python.imports

(なし)

Python UDF 実行用にインポートするファイル/モジュールのコンマ区切りリスト。変更時に UDF の再作成をトリガーします。

1.7.0

snowpark.connect.udf.java.imports

(なし)

Java UDF 実行用にインポートするファイルまたはモジュールのコンマ区切りリスト。変更時に UDF の再作成をトリガーします。

1.7.0

snowpark.connect.udf.packages

(なし)

UDFs 登録時に含めるPythonパッケージのコンマ区切りリスト。

1.0.0

snowpark.connect.udtf.compatibility_mode

false

true の場合、Spark UDTF セマンティクスとの互換性を向上させるために、Spark互換の UDTF 動作が有効になります。

1.0.0

snowpark.connect.version

<current_version>

読み取り専用現在の Snowpark Connect for Spark バージョンを返します。

1.0.0

snowpark.connect.views.duplicate_column_names_handling_mode

rename

ビューで重複する列名の処理方法。値: :code:`rename`(サフィックスを追加):code:`fail`(エラーを発生させる)または :code:`drop`(重複を削除)

1.0.0

spark.sql.parquet.enable.summary-metadata

false

true の場合、Parquetの書き込み中にParquetの概要メタデータファイル(_metadata_common_metadata)が生成されます。

1.4.0

fs.azure.sas.<container>.<account>.blob.core.windows.net

BLOBストレージ認証用Azure SAS トークンを指定します。Azure Blob Storageの場所への読み取りまたは書き込み時に使用されます。

デフォルト: (なし)

以来:1.0.0

fs.azure.sas.fixed.token.<account>.dfs.core.windows.net

ADLS Gen2(データレイクストレージ)認証用のAzure SAS トークンを指定します。Azure Data Lake Storage Gen2の場所への読み取りまたは書き込み時に使用されます。

デフォルト: (なし)

以来:1.0.0

mapreduce.fileoutputcommitter.marksuccessfuljobs

HadoopまたはSparkワークフローとの互換性のための書き込み操作が成功した後に _SUCCESS ファイルを生成するには、 true を指定します。

デフォルト: false

以来:1.0.0

parquet.enable.summary-metadata

Parquet概要メタデータファイルを生成するための代替構成を指定します。このプロパティまたは spark.sql.parquet.enable.summary-metadata でその機能を有効にします。

デフォルト: false

以来:1.4.0

snowflake.repartition.for.writes

true を指定すると、書き込み時に DataFrame.repartition(n) が出力を n 個のファイルに分割するように強制します。Sparkの動作と一致しますが、オーバーヘッドが追加されます。

デフォルト: false

以来:1.0.0

snowpark.connect.cte.optimization_enabled

クエリパフォーマンスを向上させるために、Snowparkセッションで共通テーブル式(CTE)の最適化を有効にするには、 true を指定します。

デフォルト: false

以来:1.0.0

コメント

Snowflake共通テーブル式(CTEs) を有効にする設定。この設定により、多くの反復コードブロックが存在するSnowflakeクエリが最適化されます。この変更により、クエリのコンパイルと実行の両方のパフォーマンスが向上します。

snowpark.connect.describe_cache_ttl_seconds

クエリキャッシュエントリの有効時間を秒単位で指定します。繰り返しのスキーマ検索を減らします。

デフォルト: 300

以来:1.0.0

snowpark.connect.enable_snowflake_extension_behavior

Sparkの動作とは異なる可能性のあるSnowflake固有の拡張機能(MAP 型の MD5 戻り値の型のハッシュなど)を有効にするには、 true を指定します。

デフォルト: false

以来:1.0.0

コメント

true に設定した場合、特定の操作の動作を変更します。

snowpark.connect.handleIntegralOverflow

統合的なオーバーフローの動作をSparkのアプローチに合わせるには、 true を指定します。

デフォルト: false

以来:1.7.0

snowpark.connect.iceberg.external_volume

Icebergテーブル操作用のSnowflake外部ボリューム名を指定します。

デフォルト: (なし)

以来:1.0.0

snowpark.connect.integralTypesEmulation

小数を整数型に変換する方法を指定します。値: client_defaultenableddisabled

デフォルト: client_default

以来:1.7.0

コメント

デフォルトでは、 Snowpark Connect for Spark は、すべての整数型を Long 型として扱います。これは、 Snowflakeで数値が表現される 方法に起因します。統合型のエミュレートにより、データソースからの読み取り時に、Snowpark型とSpark型間の正確なマッピングが可能になります。

デフォルトのオプション client_default はScalaクライアントからスクリプトが実行されたときにのみエミュレートをアクティブ化します。統合型は、次の精度に基づいてマップされます。

精度

Spark型

19

LongType

10

IntegerType

5

ShortType

3

ByteType

その他

DecimalType(precision, 0)

他の精度が見つかった場合、最終的な型は DecimalType にマッピングされます。

snowpark.connect.scala.version

使用するScalaバージョンを指定します(2.12 または 2.13 をサポートします)

デフォルト: 2.12

以来:1.7.0

snowpark.connect.sql.partition.external_table_location

パーティション化された書き込みの外部テーブルの場所パスを指定します。

デフォルト: (なし)

以来:1.4.0

コメント

提供されたディレクトリからパーティション化されたファイルの正確なサブセットのみを読み取るには、追加の設定が必要です。この機能は、 外部ステージ に保存されたファイルでのみ使用できます。読み取りファイルを整理するために、 Snowpark Connect for Spark は 外部テーブル を使用します。

この機能は、構成 snowpark.connect.sql.partition.external_table_location が設定されている場合に有効化されます。外部テーブルが作成される既存のデータベース名とスキーマ名が含まれている必要があります。

外部ステージに保存されているParquetファイルを読み取ると、外部テーブルが作成されます。内部ステージ上のファイルの場合は作成されません。スキーマを提供すると、実行時間が短縮され、ソースからスキーマを推論するコストが削減されます。

最高のパフォーマンスを得るには、 Snowflake外部テーブルフィルタリングの制限 に従ってフィルターしてください。

spark.conf.set("snowpark.connect.sql.partition.external_table_location", "<database-name>.<schema-name>")

spark.read.parquet("@external-stage/example").filter(col("x") > lit(1)).show()

schema = StructType([StructField("x",IntegerType()),StructField("y",DoubleType())])

spark.read.schema(schema).parquet("@external-stage/example").filter(col("x") > lit(1)).show()
Copy

snowpark.connect.temporary.views.create_in_snowflake

仮ビューをローカルで管理するのではなく、Snowflakeで直接作成するには、 true を指定します。

デフォルト: false

以来:1.0.1

snowpark.connect.udf.imports [DEPRECATED 1.7.0]

UDF 実行のためにインポートするファイルとモジュールのコンマ区切りリストを指定します。この値が変更されると、 UDF 再作成がトリガーされます。

デフォルト: (なし)

以来:1.0.0

snowpark.connect.udf.python.imports

Python UDF 実行用にインポートするファイルとモジュールのコンマ区切りリストを指定します。この値が変更されると、 UDF 再作成がトリガーされます。

デフォルト: (なし)

以来:1.7.0

snowpark.connect.udf.java.imports

Java UDF 実行用にインポートするファイルとモジュールのコンマ区切りリストを指定します。変更時に UDF の再作成をトリガーします。

デフォルト: (なし)

以来:1.7.0

コメント

この構成は、 snowpark.connect.udf.python.imports と非常によく似ています。これを使用すると、 `registerJavaFunction<https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.UDFRegistration.registerJavaFunction.html>`_ を使用して作成されたJava UDFs の外部ライブラリやファイルを指定できます。構成は、不要な依存関係の混在を防ぐために、相互に排他的です。

外部ライブラリとファイルを含めるには、 snowpark.connect.udf.java.imports 構成設定の値としてファイルへのステージパスを指定します。構成値は、ファイルへのステージパスの配列である必要があります。パスはコンマで区切られます。

次の例のコードには、 UDF の実行コンテキストの2つのファイルが含まれています。UDF はこれらのファイルから関数をインポートし、ロジックで使用します。

# Files need to be previously staged

spark.conf.set("snowpark.connect.udf.java.imports", "[@stage/library.jar]")

spark.registerJavaFunction("javaFunction", "com.example.ExampleFunction")

spark.sql("SELECT javaFunction('arg')").show()
Copy

snowpark.connect.udf.java.imports 設定を使用して、コードが読み取る必要のあるデータなど、他の種類のファイルも含めます。これを行う場合、コードは含まれているファイルからのみ読み取る必要があることに注意してください。関数の実行が終了すると、そのようなファイルへの書き込みは失われます。

snowpark.connect.udf.packages

UDFs 登録時に含めるPythonパッケージのコンマ区切りリストを指定します。

デフォルト: (なし)

以来:1.0.0

コメント

これを使って、Python UDFs で利用可能な追加パッケージを定義できます。値は依存関係のコンマ区切りリストです。

Snowflakeで次の SQL を実行すると、サポートされているシステムパッケージのリストを見つけることができます。

SELECT * FROM INFORMATION_SCHEMA.PACKAGES WHERE LANGUAGE = 'python';
Copy
spark.conf.set("snowpark.connect.udf.packages", "[numpy]")

@udtf(returnType="val: int")

class Powers:

  def eval(self, x: int):
      import numpy as np

      for v in np.power(np.array([x, x, x]), [0, 1, 2]):
          yield (int(v),)

spark.udtf.register(name="powers", f=Powers)

spark.sql("SELECT * FROM powers(10)").show()
Copy

詳細については、 Python をご参照ください。

snowpark.connect.udtf.compatibility_mode

true を指定すると、Spark互換の UDTF 動作が有効になり、Spark UDTF セマンティクスとの互換性が向上します。

デフォルト: false

以来:1.0.0

コメント

このプロパティは、 UDTFs がSpark互換の動作を使用するか、またはデフォルトのSnowpark動作を使用するかを決定します。true に設定した場合、Sparkの出力型の強制とエラー処理パターンを模倣する互換性ラッパーを適用します。

有効にすると、 UDTFs はSparkスタイルの自動型強制(例: 文字列「true」をブール値に、ブール値を整数に)とエラー処理を適用する互換性ラッパーを使用します。ラッパーは、位置アクセスと名前アクセスの両方においてテーブル引数を行のようなオブジェクトに変換し、Sparkの動作パターンに合わせて SQL null値を適切に処理します。

snowpark.connect.version

現在の Snowpark Connect for Spark バージョンを返します。読み取り専用

デフォルト: <current_version>

以来:1.0.0

snowpark.connect.views.duplicate_column_names_handling_mode

ビューで重複する列名の処理方法を指定します。許可された値には :code:`rename`(サフィックスを追加):code:`fail`(エラーを発生させる)または :code:`drop`(重複を削除)などがあります。

デフォルト: rename

以来:1.0.0

コメント

Snowflake は、列名の重複をサポートしていません。

次のコードは、次の SQL コンパイルエラーのためビュー作成ステップで失敗します:「重複した列名『foo』」

df = spark.createDataFrame([
(1, 1),
(2, 2)
], ["foo", "foo"])

df.show() # works

df.createTempView("df_view") # Fails with SQL compilation error: duplicate column name 'foo'
Copy

これを回避するには、 snowpark.connect.views.duplicate_column_names_handling_mode 構成オプションを次の値のいずれかに設定します:

  • rename:_dedup_1, _dedup_2 などのようなサフィックスが、最初の列の後のすべての重複列名に追加されます。

  • drop:1つを除くすべての重複列は削除されます。列の値が異なる場合は、誤った結果になる可能性があります。

snowpark.connect.udf.java.imports

Java UDF 実行用にインポートするファイルとモジュールのコンマ区切りリストを指定します。変更時に UDF の再作成をトリガーします。

デフォルト: (なし)

以来:1.7.0

コメント

この構成は、 snowpark.connect.udf.python.imports と非常によく似ています。これを使用して、 `registerJavaFunction<https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.UDFRegistration.registerJavaFunction.html>`_ を使用して作成されたJava UDFs の外部ライブラリやファイルを指定できます。構成は、不要な依存関係の混在を防ぐために、相互に排他的です。

外部ライブラリとファイルを含めるには、 snowpark.connect.udf.java.imports 構成設定の値としてファイルへのステージパスを指定します。値は、ファイルへのステージパスの配列であり、パスはコンマで区切られます。

次の例のコードには、 UDF の実行コンテキストの2つのファイルが含まれています。UDF はこれらのファイルから関数をインポートし、ロジックで使用します。

# Files need to be previously staged

spark.conf.set("snowpark.connect.udf.java.imports", "[@stage/library.jar]")

spark.registerJavaFunction("javaFunction", "com.example.ExampleFunction")

spark.sql("SELECT javaFunction('arg')").show()
Copy

snowpark.connect.udf.java.imports 設定を使用して、コードが読み取る必要のあるデータなど、他の種類のファイルも含めます。これを行う場合、コードは含まれているファイルからのみ読み取る必要があることに注意してください。関数の実行が終了すると、そのようなファイルへの書き込みは失われます。

snowpark.connect.udf.packages

UDFs 登録時に含めるPythonパッケージのコンマ区切りリストを指定します。

デフォルト: (なし)

以来:1.0.0

コメント

設定により、Python UDFs で利用可能な追加パッケージを定義できます。値は依存関係のコンマ区切りリストです。

Snowflakeで次の SQL を実行すると、サポートされているシステムパッケージのリストを見つけることができます。

SELECT * FROM INFORMATION_SCHEMA.PACKAGES WHERE LANGUAGE = 'python';
Copy
spark.conf.set("snowpark.connect.udf.packages", "[numpy]")

@udtf(returnType="val: int")

class Powers:

  def eval(self, x: int):
      import numpy as np

      for v in np.power(np.array([x, x, x]), [0, 1, 2]):
          yield (int(v),)

spark.udtf.register(name="powers", f=Powers)

spark.sql("SELECT * FROM powers(10)").show()
Copy

参照情報:パッケージ参照情報

snowpark.connect.udtf.compatibility_mode

true を指定すると、Spark互換の UDTF 動作が有効になり、Spark UDTF セマンティクスとの互換性が向上します。

デフォルト: false

以来:1.0.0

コメント

この設定は、 UDTFs がSpark互換の動作を使用するか、またはデフォルトのSnowpark動作を使用するかを決定します。有効にすると(true)、Sparkの出力型の強制とエラー処理パターンを模倣する互換性ラッパーを適用します。

有効にすると、 UDTFs はSparkスタイルの自動型強制(例: 文字列「true」をブール値に、ブール値を整数に)とエラー処理を適用する互換性ラッパーを使用します。ラッパーは、位置アクセスと名前アクセスの両方においてテーブル引数を行のようなオブジェクトに変換し、Sparkの動作パターンに合わせて SQL null値を適切に処理します。