Snowpark Migration Accelerator:Python用問題コード

SPRKPY1000

メッセージ:Source project spark-core version is xx.xx:xx.x.x, the spark-core version supported by snowpark is 2.12:3.1.2 so there may be functional differences between the existing mappings.

カテゴリ: 警告。

説明

この問題は、ソースコードのPysparkバージョンがサポートされていない場合に発生します。これは、既存のマッピングの間に機能差がある可能性があることを意味します。

その他の推奨事項

  • Snowparkとの互換性のために SMA によってスキャンされたpysparkのバージョンは2.12から3.1.2です。この範囲外のバージョンを使用している場合、ツールは一貫性のない結果を出す可能性があります。スキャンするソースコードのバージョンを変更することができます。

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1001

Message**:** This code section has parsing errors

Category**:** Parsing error.

説明

解析エラーは、Snowpark Migration Accelerator(SMA)がファイル内のコードを正しく読み取れなかったり、理解できなかったりした場合に報告されます(ファイルを正しく「解析」できませんでした)。この問題コードは、ファイルに1つ以上の解析エラーがある場合に表示されます。

シナリオ

入力: EWI のメッセージは、コードが無効な構文を持つ場合に表示されます。

def foo():
    x = %%%%%%1###1

出力: SMA は、解析エラーを見つけ、対応する EWI メッセージを解析エラーに追加してコメントします。

def foo():
    x
## EWI: SPRKPY1101 => Unrecognized or invalid CODE STATEMENT @(2, 7). Last valid token was 'x' @(2, 5), failed token '=' @(2, 7)
##      = %%%%%%1###1

その他の推奨事項

  • Check that the file contains valid Python code. (You can use the issues.csv file to find all files with this EWI code to determine which file(s) were not processed by the tool due to parsing error(s).) Many parsing errors occur because only part of the code is input into the tool, so it's bets to ensure that the code will run in the source. If it is valid, report that you encountered a parsing error using the Report an Issue option in the SMA. Include the line of code that was causing a parsing error in the description when you file this issue.

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1002

Message**:** < element > is not supported,Spark element is not supported.

Category**:** Conversion error.

説明

このエラーは、Snowparkでサポートされておらず、独自のエラーコードが関連付けられていない要素の使用をツールが検出した場合に表示されます。これは、サポートされていない要素に対して SMA が使用する一般的なエラーコードです。

その他の推奨事項

  • メッセージ上のオプションや要素がサポートされていなくても、解決策が見つからないわけではありません。ツール自体が解決策を見つけられないというだけのことです。

  • pyspark.mlライブラリのサポートされていない要素に遭遇した場合は、代替の方法を検討します。Snowflakeのこのガイドのような、mlに関連する問題のウォークスルーに利用可能な追加のガイドがあります。

  • ソースコードの構文が正しいか確認します。(issues.csvファイルを使用して、変換エラーの発生箇所を特定することができます。)構文が正しければ、 SMA の問題を報告するオプションを使用して、特定の要素で変換エラーが発生したことを報告します。この問題を報告する際に、エラーの原因となったコード行を説明文に含めます。

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1003

Message**:** An error occurred when loading the symbol table.

Category**:** Conversion error.

説明

この問題は、記号テーブルの記号処理にエラーがある場合に発生します。記号テーブルは、 SMA の基になるアーキテクチャの一部であり、より複雑な変換を可能にします。このエラーは、ソースコード内の予期せぬステートメントが原因である可能性があります。

その他の推奨事項

  • This is unlikely to be an error in the source code itself, but rather is an error in how the tool processes the source code. The best resolution would be to post an issue in the SMA.

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1004

Message**:** The symbol table could not be loaded.

Category**:** Parsing error.

説明

この問題は、ツールの実行プロセスで予期せぬエラーが発生した場合に表示されます。記号テーブルがロードできないため、ツールは評価または変換プロセスを開始できません。

その他の推奨事項

SPRKPY1005

警告

This issue code has been deprecated since Spark Conversion Core Version 4.8.0

Message**:** pyspark.conf.SparkConf is not required

Category**:** Warning.

説明

This issue appears when the tool detects the usage of pyspark.conf.SparkConf which is not required.

シナリオ

入力

SparkConf はパラメーターなし、またはloadDefaultsとともに呼び出すことができます。

from pyspark import SparkConf

my_conf = SparkConf(loadDefaults=True)

出力

For both cases (with or without parameters) SMA creates a Snowpark Session.builder object:

#EWI: SPRKPY1005 => pyspark.conf.SparkConf is not required
#from pyspark import SparkConf
pass

#EWI: SPRKPY1005 => pyspark.conf.SparkConf is not required
my_conf = Session.builder.configs({"user" : "my_user", "password" : "my_password", "account" : "my_account", "role" : "my_role", "warehouse" : "my_warehouse", "database" : "my_database", "schema" : "my_schema"}).create()

その他の推奨事項

  • これは不要なパラメーターが削除され、警告コメントが挿入されたものです。ユーザーからの追加アクションは必要ありません。

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1006

警告

This issue code has been deprecated since Spark Conversion Core Version 4.8.0

Message**:** pyspark.context.SparkContext is not required

Category**:** Warning.

説明

This issue appears when the tool detects the usage of pyspark.context.SparkContext, which is not required in Snowflake.

シナリオ

入力

この例では、Sparkクラスターへの接続を作成するために2つのコンテキストがあります。

from pyspark import SparkContext

sql_context1 = SparkContext(my_sc1)
sql_context2 = SparkContext(sparkContext=my_sc2)

出力

Snowflakeにはクラスターが存在しないため、コンテキストは必要ありません。そのため、Sparkのプロパティを含む変数my_sc1とmy_sc2は不要か、コードを修正する必要があることに注意してください。

from snowflake.snowpark import Session
#EWI: SPRKPY1006 => pyspark.sql.context.SparkContext is not required
sql_context1 = my_sc1
#EWI: SPRKPY1006 => pyspark.sql.context.SparkContext is not required

sql_context2 = my_sc2

その他の推奨事項

  • これは不要なパラメーターが削除され、警告コメントが挿入されたものです。ユーザーからのアクションはありません。

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1007

警告

This issue code has been deprecated since Spark Conversion Core Version 4.8.0

Message**:** pyspark.sql.context.SQLContext is not required

Category**:** Warning.

説明

This issue appears when the tool detects the usage of pyspark.sql.context.SQLContext, which is not required.

シナリオ

入力

ここでは、 SparkContext のオーバーロードを変更した例を示します。

from pyspark import SQLContext

my_sc1 = SQLContext(myMaster, myAppName, mySparkHome, myPyFiles, myEnvironment, myBatctSize, mySerializer, my_conf1)
my_sc2 = SQLContext(conf=my_conf2)
my_sc3 = SQLContext()

出力

出力コードでは、pyspark.SQLContext の行がコメントされ、シナリオが構成への参照に置き換えられています。Sparkのプロパティを含んでいる変数my_sc1とmy_sc2は、不要な場合やコードを修正するために変更される場合があります。

#EWI: SPRKPY1007 => pyspark.sql.context.SQLContext is not required
#from pyspark import SQLContext
pass

#EWI: SPRKPY1007 => pyspark.sql.context.SQLContext is not required
sql_context1 = my_sc1
#EWI: SPRKPY1007 => pyspark.sql.context.SQLContext is not required
sql_context2 = my_sc2

その他の推奨事項

  • これは不要なパラメーターであるため、ソースコードに警告コメントを挿入して削除します。ユーザーからのアクションはありません。

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1008

メッセージ: pyspark.sql.context.HiveContext is not required

カテゴリ: 警告。

説明

This issue appears when the tool detects the usage of pyspark.sql.context.HiveContext, which is not required.

シナリオ

入力

この例では、Hiveストアへの接続を作成する例を示します。

from pyspark.sql import HiveContext
hive_context = HiveContext(sc)
df = hive_context.table("myTable")
df.show()

出力

In Snowflake there are not Hive stores, so the Hive Context is not required, You can still use parquet files on Snowflake please check this tutorial to learn how.

#EWI: SPRKPY1008 => pyspark.sql.context.HiveContext is not required
hive_context = sc
df = hive_context.table("myTable")
df.show()

the sc variable refers to a Snow Park Session Object

推奨される修正

For the output code in the example you should add the Snow Park Session Object similar to this code:

## Here manually we can add the Snowpark Session object via a json config file called connection.json
import json
from snowflake.snowpark import Session
jsonFile = open("connection.json")
connection_parameter = json.load(jsonFile)
jsonFile.close()
sc = Session.builder.configs(connection_parameter).getOrCreate()

hive_context = sc
df = hive_context.table("myTable")
df.show()

その他の推奨事項

SPRKPY1009

Message**:** pyspark.sql.dataframe.DataFrame.approxQuantile has a workaround

Category**:** Warning.

説明

This issue appears when the tool detects the usage of pyspark.sql.dataframe.DataFrame.approxQuantile which has a workaround.

シナリオ

入力

It's important understand that Pyspark uses two different approxQuantile functions, here we use the DataFrame approxQuantile version

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = [['Sun', 10],
        ['Mon', 64],
        ['Thr', 12],
        ['Wen', 15],
        ['Thu', 68],
        ['Fri', 14],
        ['Sat', 13]]

columns = ['Day', 'Ammount']
df = spark.createDataFrame(data, columns)
df.approxQuantile('Ammount', [0.25, 0.5, 0.75], 0)

出力

SMA は、approxQuantileが使用された行に EWI SPRKPY1009 を返すため、修正箇所を識別するのに使用できます。

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Sun', 10],
        ['Mon', 64],
        ['Thr', 12],
        ['Wen', 15],
        ['Thu', 68],
        ['Fri', 14],
        ['Sat', 13]]

columns = ['Day', 'Ammount']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1009 => pyspark.sql.dataframe.DataFrame.approxQuantile has a workaround, see documentation for more info
df.approxQuantile('Ammount', [0.25, 0.5, 0.75], 0)

推奨される修正

Use Snowpark approxQuantile method. Some parameters don't match so they require some manual adjustments. for the output code's example a recommended fix could be:

from snowflake.snowpark import Session
...
df = spark.createDataFrame(data, columns)

df.stat.approx_quantile('Ammount', [0.25, 0.5, 0.75])

SnowPark には、 DataFrame.approxQuantileのrelativeErrorパラメーターが存在しません。

その他の推奨事項

SPRKPY1010

メッセージ: pyspark.sql.dataframe.DataFrame.checkpoint has a workaround

カテゴリ: 警告。

説明

This issue appears when the tool detects the usage of pyspark.sql.dataframe.DataFrame.checkpoint which has a workaround.

シナリオ

入力

PySpark では、チェックポイントは、データフレームの論理計画を切り捨てるために使用されます。これは、理論プランの拡大を回避するためです。

import tempfile
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = [['Q1', 300000],
        ['Q2', 60000],
        ['Q3', 500002],
        ['Q4', 130000]]

columns = ['Quarter', 'Score']
df = spark.createDataFrame(data, columns)
with tempfile.TemporaryDirectory() as d:
    spark.sparkContext.setCheckpointDir("/tmp/bb")
    df.checkpoint(False)

出力

SMA returns the EWI SPRKPY1010 over the line where approxQuantile is used, so you can use to identify where to fix. Note that also marks the setCheckpointDir as unsupported, but a checpointed directory is not required for the fix.

import tempfile
from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 300000],
        ['Q2', 60000],
        ['Q3', 500002],
        ['Q4', 130000]]

columns = ['Quarter', 'Score']
df = spark.createDataFrame(data, columns)
with tempfile.TemporaryDirectory() as d:
    #EWI: SPRKPY1002 => pyspark.context.SparkContext.setCheckpointDir is not supported
    spark.setCheckpointDir("/tmp/bb")
    #EWI: SPRKPY1010 => pyspark.sql.dataframe.DataFrame.checkpoint has a workaround, see documentation for more info
    df.checkpoint(False)

推奨される修正

Snowparkは明示的なチェックポイントを不要にします。Snowparkは、Snowflakeクエリ最適化エンジンによって最適化された SQL に基づく操作で動作するため、一方的な計算や制御不能になる論理プランが不要になるからです。

However there could be scenarios where you would require persist the result of a computation on a dataframe. In this scenarios you can save materialize the results by writing the dataframe on a Snowflake Table or in a Snowflake Temporary Table.

  • 永続的なテーブルや計算結果を使用することで、セッション終了後でもいつでもアクセスすることができます。

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 300000],
        ['Q2', 60000],
        ['Q3', 500002],
        ['Q4', 130000]]

columns = ['Quarter', 'Score']
df = spark.createDataFrame(data, columns)
df.write.save_as_table("my_table", table_type="temporary") # Save the dataframe into Snowflake table "my_table".
df2 = Session.table("my_table") # Now I can access the stored result quering the table "my_table"
  • 別の方法として、仮テーブルを使用すると、セッション終了後にテーブルが削除されるという利点があります。

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 300000],
        ['Q2', 60000],
        ['Q3', 500002],
        ['Q4', 130000]]

columns = ['Quarter', 'Score']
df = spark.createDataFrame(data, columns)
df.write.save_as_table("my_temp_table", table_type="temporary") # Save the dataframe into Snowflake table "my_temp_table".
df2 = Session.table("my_temp_table") # Now I can access the stored result quering the table "my_temp_table"

その他の推奨事項

SPRKPY1011

メッセージ: pyspark.sql.dataframe.DataFrameStatFunctions.approxQuantile has a workaround

カテゴリ: 警告。

説明

This issue appears when the tool detects the usage of pyspark.sql.dataframe.DataFrameStatFunctions.approxQuantile which has a workaround.

シナリオ

入力

It's important understand that Pyspark uses two different approxQuantile functions, here we use the DataFrameStatFunctions approxQuantile version.

import tempfile
from pyspark.sql import SparkSession, DataFrameStatFunctions
spark = SparkSession.builder.getOrCreate()
data = [['Q1', 300000],
        ['Q2', 60000],
        ['Q3', 500002],
        ['Q4', 130000]]

columns = ['Quarter', 'Gain']
df = spark.createDataFrame(data, columns)
aprox_quantille = DataFrameStatFunctions(df).approxQuantile('Gain', [0.25, 0.5, 0.75], 0)
print(aprox_quantille)

出力

SMA は、approxQuantileが使用された行に EWI SPRKPY1011 を返すため、修正箇所を識別するのに使用できます。

import tempfile
from snowflake.snowpark import Session, DataFrameStatFunctions
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 300000],
        ['Q2', 60000],
        ['Q3', 500002],
        ['Q4', 130000]]

columns = ['Quarter', 'Gain']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1011 => pyspark.sql.dataframe.DataFrameStatFunctions.approxQuantile has a workaround, see documentation for more info
aprox_quantille = DataFrameStatFunctions(df).approxQuantile('Gain', [0.25, 0.5, 0.75], 0)

推奨される修正

You can use Snowpark approxQuantile method. Some parameters don't match so they require some manual adjustments. for the output code's example a recommended fix could be:

from snowflake.snowpark import Session # remove DataFrameStatFunctions because is not required
...
df = spark.createDataFrame(data, columns)

aprox_quantille = df.stat.approx_quantile('Ammount', [0.25, 0.5, 0.75])

SnowPark には、 DataFrame.approxQuantileのrelativeErrorパラメーターが存在しません。

その他の推奨事項

SPRKPY1012

警告

この問題コードは、 廃止 されました。

メッセージ: pyspark.sql.dataframe.DataFrameStatFunctions.writeTo has a workaround

カテゴリ: 警告。

説明

This issue appears when the tool detects the usage of pyspark.sql.dataframe.DataFrameStatFunctions.writeTo which has a workaround.

シナリオ

入力

この例では、dataframe dfはSparkテーブルの「table」に書き込まれます。

writer = df.writeTo("table")

出力

SMA は、 DataFrameStatFunctions.writeToが使用された行に EWI SPRKPY1012 を返すため、修正箇所を識別するのに使用できます。

#EWI: SPRKPY1012 => pyspark.sql.dataframe.DataFrameStatFunctions.writeTo has a workaround, see documentation for more info
writer = df.writeTo("table")

推奨される修正

代わりに、df.write.SaveAsTable()を使用します。

import df.write as wt
writer = df.write.save_as_table(table)

その他の推奨事項

SPRKPY1013

メッセージ: pyspark.sql.functions.acosh has a workaround

カテゴリ: 警告。

説明

This issue appears when the tool detects the usage of pyspark.sql.functions.acosh which has a workaround.

シナリオ

入力

On this example pyspark calculates the acosh for a dataframe by using pyspark.sql.functions.acosh

from pyspark.sql import SparkSession
from pyspark.sql.functions import acosh
spark = SparkSession.builder.getOrCreate()
data = [['V1', 30],
        ['V2', 60],
        ['V3', 50],
        ['V4', 13]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_with_acosh = df.withColumn("acosh_value", acosh(df["value"]))

出力

SMA は、acoshが使用された行に EWI SPRKPY1013 を返すため、修正箇所を識別するのに使用できます。

from snowflake.snowpark import Session

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 30],
        ['V2', 60],
        ['V3', 50],
        ['V4', 13]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1013 => pyspark.sql.functions.acosh has a workaround, see documentation for more info
df_with_acosh = df.withColumn("acosh_value", acosh(df["value"]))

推奨される修正

There is no direct "acosh" implementation but "call_function" can be used instead, using "acosh" as the first parameter, and colName as the second one.

import snowflake.snowpark as snowpark
from snowflake.snowpark import Session
from snowflake.snowpark.functions import call_function, col

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 30],
        ['V2', 60],
        ['V3', 50],
        ['V4', 13]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_with_acosh = df.select(call_function('ACOSH', col('value')))

その他の推奨事項

SPRKPY1014

メッセージ: pyspark.sql.functions.asinh has a workaround

カテゴリ: 警告。

説明

This issue appears when the tool detects the usage of pyspark.sql.functions.asinh which has a workaround.

シナリオ

入力

On this example pyspark calculates the asinh for a dataframe by using pyspark.sql.functions.asinh.

from pyspark.sql import SparkSession
from pyspark.sql.functions import asinh
spark = SparkSession.builder.getOrCreate()
data = [['V1', 3.0],
        ['V2', 60.0],
        ['V3', 14.0],
        ['V4', 3.1]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_result = df.withColumn("asinh_value", asinh(df["value"]))

出力

SMA はasinhが使われている行に EWI SPRKPY1014 を返すため、修正箇所を識別するのに使用できます。

from snowflake.snowpark import Session

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 3.0],
        ['V2', 60.0],
        ['V3', 14.0],
        ['V4', 3.1]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1014 => pyspark.sql.functions.asinh has a workaround, see documentation for more info
df_result = df.withColumn("asinh_value", asinh(df["value"]))

推奨される修正

There is no direct "asinh" implementation but "call_function" can be used instead, using "asinh" as the first parameter, and colName as the second one.

import snowflake.snowpark as snowpark
from snowflake.snowpark import Session
from snowflake.snowpark.functions import call_function, col

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 3.0],
        ['V2', 60.0],
        ['V3', 14.0],
        ['V4', 3.1]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_result = df.select(call_function('asinh', col('value')))

その他の推奨事項

SPRKPY1015

メッセージ: pyspark.sql.functions.atanh has a workaround

カテゴリ: 警告。

説明

This issue appears when the tool detects the usage of pyspark.sql.functions.atanh which has a workaround.

シナリオ

入力

On this example pyspark calculates the atanh for a dataframe by using pyspark.sql.functions.atanh.

from pyspark.sql import SparkSession
from pyspark.sql.functions import atanh
spark = SparkSession.builder.getOrCreate()
data = [['V1', 0.14],
        ['V2', 0.32],
        ['V3', 0.4],
        ['V4', -0.36]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_result = df.withColumn("atanh_value", atanh(df["value"]))

出力

SMA は、atanhが使用された行に EWI SPRKPY1015 を返すため、修正が必要な箇所を識別するのに使用できます。

from snowflake.snowpark import Session

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 0.14],
        ['V2', 0.32],
        ['V3', 0.4],
        ['V4', -0.36]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1015 => pyspark.sql.functions.atanh has a workaround, see documentation for more info
df_result = df.withColumn("atanh_value", atanh(df["value"]))

推奨される修正

There is no direct "atanh" implementation but "call_function" can be used instead, using "atanh" as the first parameter, and colName as the second one.

import snowflake.snowpark as snowpark
from snowflake.snowpark import Session
from snowflake.snowpark.functions import call_function, col

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 0.14],
        ['V2', 0.32],
        ['V3', 0.4],
        ['V4', -0.36]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_result = df.select(call_function('atanh', col('value')))

その他の推奨事項

SPRKPY1016

警告

This issue code has been deprecated since Spark Conversion Core Version 0.11.7

メッセージ: pyspark.sql.functions.collect_set has a workaround

カテゴリ: 警告。

説明

This issue appears when the tool detects the usage of pyspark.sql.functions.collect_set which has a workaround.

シナリオ

入力

Using collect*set to get the elements of _colname* without duplicates:

col = collect_set(colName)

出力

SMA は、collect_setが使用された行に EWI SPRKPY1016 を返すため、修正箇所を識別するのに使用できます。

#EWI: SPRKPY1016 => pyspark.sql.functions.collect_set has a workaround, see documentation for more info
col = collect_set(colName)

推奨される修正

array_agg関数を使用し、値Trueの第2引数を追加します。

col = array_agg(col, True)

その他の推奨事項

SPRKPY1017

警告

This issue code has been deprecated since Spark Conversion Core Version 4.8.0

pyspark.sql.functions.date_addには回避策があります。

カテゴリ: 警告。

説明

This issue appears when the tool detects the usage of pyspark.sql.functions.date_add which has a workaround.

シナリオ

入力

この例では、date_addを使用して、dataframe dfの現在の日付から5日後の日付を計算しています。

col = df.select(date_add(df.colName, 5))

出力

SMA は、date_addが使用された行に EWI SPRKPY1017 を返すため、修正箇所を識別するのに使用できます。

#EWI: SPRKPY1017 => pyspark.sql.functions.date_add has a workaround, see documentation for more info
col = df.select(date_add(df.colName, 5))

推奨される修正

Import snowflake.snowpark.functions, which contains an implementation for date_add (and alias dateAdd) function.

from snowflake.snowpark.functions import date_add

col = df.select(date_add(df.dt, 1))

その他の推奨事項

SPRKPY1018

警告

This issue code has been deprecated since Spark Conversion Core Version 4.8.0

メッセージ: pyspark.sql.functions.date_sub has a workaround

カテゴリ: 警告。

説明

This issue appears when the tool detects the usage of pyspark.sql.functions.date_sub which has a workaround.

シナリオ

入力

この例では、date_addを使用して、dataframe dfのために現在の日付から5日前の日付を計算しています。

col = df.select(date_sub(df.colName, 5))

出力

SMA は、date_subが使用された行に EWI SPRKPY1018 を返すため、修正箇所を識別するのに使用できます。

#EWI: SPRKPY1018 => pyspark.sql.functions.date_sub has a workaround, see documentation for more info
col = df.select(date_sub(df.colName, 5))

推奨される修正

Import snowflake.snowpark.functions, which contains an implementation for date_sub function.

from pyspark.sql.functions import date_sub
df.withColumn("date", date_sub(df.colName, 5))

その他の推奨事項

SPRKPY1019

警告

This issue code has been deprecated since Spark Conversion Core Version 4.8.0

メッセージ: pyspark.sql.functions.datediff has a workaround

カテゴリ: 警告。

説明

This issue appears when the tool detects the usage of pyspark.sql.functions.datediff which has a workaround.

シナリオ

入力

この例では、「today」と他の日付の差分を計算するためにdatediffを使用しています。

contacts = (contacts
            #days since last event
            .withColumn('daysSinceLastEvent', datediff(lit(today),'lastEvent'))
            #days since deployment
            .withColumn('daysSinceLastDeployment', datediff(lit(today),'lastDeploymentEnd'))
            #days since online training
            .withColumn('daysSinceLastTraining', datediff(lit(today),'lastTraining'))
            #days since last RC login
            .withColumn('daysSinceLastRollCallLogin', datediff(lit(today),'adx_identity_lastsuccessfullogin'))
            #days since last EMS login
            .withColumn('daysSinceLastEMSLogin', datediff(lit(today),'vms_lastuserlogin'))
           )

出力

SMA は、datediffが使用された行に EWI SPRKPY1019 を返すため、修正箇所を識別するのに使用できます。

from pyspark.sql.functions import datediff
#EWI: SPRKPY1019 => pyspark.sql.functions.datediff has a workaround, see documentation for more info
contacts = (contacts
            #days since last event
            .withColumn('daysSinceLastEvent', datediff(lit(today),'lastEvent'))
            #days since deployment
            .withColumn('daysSinceLastDeployment', datediff(lit(today),'lastDeploymentEnd'))
            #days since online training
            .withColumn('daysSinceLastTraining', datediff(lit(today),'lastTraining'))
            #days since last RC login
            .withColumn('daysSinceLastRollCallLogin', datediff(lit(today),'adx_identity_lastsuccessfullogin'))
            #days since last EMS login
            .withColumn('daysSinceLastEMSLogin', datediff(lit(today),'vms_lastuserlogin'))
           )

SMA convert pyspark.sql.functions.datediff onto snowflake.snowpark.functions.daydiff that also calculates the diference in days between two dates.

推奨される修正

datediff(part: string ,end: ColumnOrName, start: ColumnOrName)

Action: Import snowflake.snowpark.functions, which contains an implementation for datediff function that requires an extra parameter for date time part and allows more versatility on calculate differences between dates.

from snowflake.snowpark import Session
from snowflake.snowpark.functions import datediff
contacts = (contacts
            #days since last event
            .withColumn('daysSinceLastEvent', datediff('day', lit(today),'lastEvent'))
            #days since deployment
            .withColumn('daysSinceLastDeployment', datediff('day',lit(today),'lastDeploymentEnd'))
            #days since online training
            .withColumn('daysSinceLastTraining', datediff('day', lit(today),'lastTraining'))
            #days since last RC login
            .withColumn('daysSinceLastRollCallLogin', datediff('day', lit(today),'adx_identity_lastsuccessfullogin'))
            #days since last EMS login
            .withColumn('daysSinceLastEMSLogin', datediff('day', lit(today),'vms_lastuserlogin'))
           )

推奨

SPRKPY1020

メッセージ: pyspark.sql.functions.instr has a workaround

カテゴリ: 警告。

説明

This issue appears when the tool detects the usage of pyspark.sql.functions.instr which has a workaround.

シナリオ

入力

pyspark instrの基本的な使用例は以下のとおりです。

from pyspark.sql import SparkSession
from pyspark.sql.functions import instr
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('abcd',)], ['test',])
df.select(instr(df.test, 'cd').alias('result')).collect()

出力:

SMA はinstrが使われている行に EWI SPRKPY1020 を返すため、修正箇所を識別するのに使用できます。

from snowflake.snowpark import Session

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
df = spark.createDataFrame([('abcd',)], ['test',])
#EWI: SPRKPY1020 => pyspark.sql.functions.instr has a workaround, see documentation for more info
df.select(instr(df.test, 'cd').alias('result')).collect()

推奨される修正

Requires a manual change by using the function charindex and changing the order of the first two parameters.

import snowflake.snowpark as snowpark
from snowflake.snowpark import Session
from snowflake.snowpark.functions import charindex, lit

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
df = spark.createDataFrame([('abcd',)], ['test',])
df.select(charindex(lit('cd'), df.test).as_('result')).show()

その他の推奨事項

SPRKPY1021

警告

この問題コードは、 廃止 されました。

メッセージ: pyspark.sql.functions.last has a workaround, see documentation for more info

カテゴリ: 警告

説明

This issue appears when the SMA detects a use of the pyspark.sql.functions.last function, which has a workaround.

シナリオ

入力

Below is an example of a use of the pyspark.sql.functions.last function that generates this EWI. In this example, the last function is used to get the last value for each name.

df = spark.createDataFrame([("Alice", 1), ("Bob", 2), ("Charlie", 3), ("Alice", 4), ("Bob", 5)], ["name", "value"])
df_grouped = df.groupBy("name").agg(last("value").alias("last_value"))

出力

The SMA adds the EWI SPRKPY1021 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([("Alice", 1), ("Bob", 2), ("Charlie", 3), ("Alice", 4), ("Bob", 5)], ["name", "value"])
#EWI: SPRKPY1021 => pyspark.sql.functions.last has a workaround, see documentation for more info
df_grouped = df.groupBy("name").agg(last("value").alias("last_value"))

推奨される修正

As a workaround, you can use the Snowflake LAST_VALUE function. To invoke this function from Snowpark, use the snowflake.snowpark.functions.call_builtin function and pass the string last_value as the first argument and the corresponding column as the second argument. If you were using the name of the column in the last function, you should convert it into a column when calling the call_builtin function.

df = spark.createDataFrame([("Alice", 1), ("Bob", 2), ("Charlie", 3), ("Alice", 4), ("Bob", 5)], ["name", "value"])
df_grouped = df.groupBy("name").agg(call_builtin("last_value", col("value")).alias("last_value"))

その他の推奨事項


description: >- The mode parameter in the methods of CSV, JSON and PARQUET is transformed to overwrite


SPRKPY1022

メッセージ: pyspark.sql.functions.log10 has a workaround, see documentation for more info

カテゴリ: 警告

説明

This issue appears when the SMA detects a use of the pyspark.sql.functions.log10 function, which has a workaround.

シナリオ

入力

Below is an example of a use of the pyspark.sql.functions.log10 function that generates this EWI. In this example, the log10 function is used to calculate the base-10 logarithm of the value column.

df = spark.createDataFrame([(1,), (10,), (100,), (1000,), (10000,)], ["value"])
df_with_log10 = df.withColumn("log10_value", log10(df["value"]))

出力

The SMA adds the EWI SPRKPY1022 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([(1,), (10,), (100,), (1000,), (10000,)], ["value"])
#EWI: SPRKPY1022 => pyspark.sql.functions.log10 has a workaround, see documentation for more info
df_with_log10 = df.withColumn("log10_value", log10(df["value"]))

推奨される修正

As a workaround, you can use the snowflake.snowpark.functions.log function by passing the literal value 10 as the base.

df = spark.createDataFrame([(1,), (10,), (100,), (1000,), (10000,)], ["value"])
df_with_log10 = df.withColumn("log10_value", log(10, df["value"]))

その他の推奨事項

SPRKPY1023

メッセージ: pyspark.sql.functions.log1p has a workaround, see documentation for more info

カテゴリ: 警告

説明

This issue appears when the SMA detects a use of the pyspark.sql.functions.log1p function, which has a workaround.

シナリオ

入力

Below is an example of a use of the pyspark.sql.functions.log1p function that generates this EWI. In this example, the log1p function is used to calculate the natural logarithm of the value column.

df = spark.createDataFrame([(0,), (1,), (10,), (100,)], ["value"])
df_with_log1p = df.withColumn("log1p_value", log1p(df["value"]))

出力

The SMA adds the EWI SPRKPY1023 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([(0,), (1,), (10,), (100,)], ["value"])
#EWI: SPRKPY1023 => pyspark.sql.functions.log1p has a workaround, see documentation for more info
df_with_log1p = df.withColumn("log1p_value", log1p(df["value"]))

推奨される修正

As a workaround, you can use the call_function function by passing the string ln as the first argument and by adding 1 to the second argument.

df = spark.createDataFrame([(0,), (1,), (10,), (100,)], ["value"])
df_with_log1p = df.withColumn("log1p_value", call_function("ln", lit(1) + df["value"]))

その他の推奨事項

SPRKPY1024

メッセージ: pyspark.sql.functions.log2 has a workaround, see documentation for more info

カテゴリ: 警告

説明

This issue appears when the SMA detects a use of the pyspark.sql.functions.log2 function, which has a workaround.

シナリオ

入力

Below is an example of a use of the pyspark.sql.functions.log2 function that generates this EWI. In this example, the log2 function is used to calculate the base-2 logarithm of the value column.

df = spark.createDataFrame([(1,), (2,), (4,), (8,), (16,)], ["value"])
df_with_log2 = df.withColumn("log2_value", log2(df["value"]))

出力

The SMA adds the EWI SPRKPY1024 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([(1,), (2,), (4,), (8,), (16,)], ["value"])
#EWI: SPRKPY1024 => pyspark.sql.functions.log2 has a workaround, see documentation for more info
df_with_log2 = df.withColumn("log2_value", log2(df["value"]))

推奨される修正

As a workaround, you can use the snowflake.snowpark.functions.log function by passing the literal value 2 as the base.

df = session.createDataFrame([(1,), (2,), (4,), (8,), (16,)], ["value"])
df_with_log2 = df.withColumn("log2_value", log(2, df["value"]))

その他の推奨事項

SPRKPY1025

警告

この問題コードは、 廃止 されました。

メッセージ: pyspark.sql.functions.ntile has a workaround, see documentation for more info

カテゴリ: 警告

説明

This issue appears when the SMA detects a use of the pyspark.sql.functions.ntile function, which has a workaround.

シナリオ

入力

Below is an example of a use of the pyspark.sql.functions.ntile function that generates this EWI. In this example, the ntile function is used to divide the rows into 3 buckets.

df = spark.createDataFrame([("Alice", 50), ("Bob", 30), ("Charlie", 60), ("David", 90), ("Eve", 70), ("Frank", 40)], ["name", "score"])
windowSpec = Window.orderBy("score")
df_with_ntile = df.withColumn("bucket", ntile(3).over(windowSpec))

出力

The SMA adds the EWI SPRKPY1025 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([("Alice", 50), ("Bob", 30), ("Charlie", 60), ("David", 90), ("Eve", 70), ("Frank", 40)], ["name", "score"])
windowSpec = Window.orderBy("score")
#EWI: SPRKPY1025 => pyspark.sql.functions.ntile has a workaround, see documentation for more info
df_with_ntile = df.withColumn("bucket", ntile(3).over(windowSpec))

推奨される修正

Snowpark has an equivalent ntile function, however, the argument pass to it should be a column. As a workaround, you can convert the literal argument into a column using the snowflake.snowpark.functions.lit function.

df = spark.createDataFrame([("Alice", 50), ("Bob", 30), ("Charlie", 60), ("David", 90), ("Eve", 70), ("Frank", 40)], ["name", "score"])
windowSpec = Window.orderBy("score")
df_with_ntile = df.withColumn("bucket", ntile(lit(3)).over(windowSpec))

その他の推奨事項

SPRKPY1026

警告

This issue code has been deprecated since Spark Conversion Core 4.3.2

メッセージ: pyspark.sql.readwriter.DataFrameReader.csv has a workaround, see documentation for more info

カテゴリ: 警告

説明

This issue appears when the SMA detects a use of the pyspark.sql.readwriter.DataFrameReader.csv function, which has a workaround.

シナリオ

入力

Below is an example of a use of the pyspark.sql.readwriter.DataFrameReader.csv function that generates this EWI. In this example, the csv function is used to read multiple .csv files with a given schema and uses some extra options such as encoding, header and sep to fine-tune the behavior of reading the files.

file_paths = [
  "path/to/your/file1.csv",
  "path/to/your/file2.csv",
  "path/to/your/file3.csv",
]

df = session.read.csv(
  file_paths,
  schema=my_schema,
  encoding="UTF-8",
  header=True,
  sep=","
)

出力

The SMA adds the EWI SPRKPY1026 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

file_paths = [
  "path/to/your/file1.csv",
  "path/to/your/file2.csv",
  "path/to/your/file3.csv",
]

#EWI: SPRKPY1026 => pyspark.sql.readwriter.DataFrameReader.csv has a workaround, see documentation for more info
df = session.read.csv(
  file_paths,
  schema=my_schema,
  encoding="UTF-8",
  header=True,
  sep=","
)

推奨される修正

In this section, we explain how to configure the path parameter, the schema parameter and some options to make them work in Snowpark.

1. パスパラメーター

Snowpark requires the path parameter to be a stage location so, as a workaround, you can create a temporary stage and add each .csv file to that stage using the prefix file://.

2. スキーマパラメーター

Snowpark does not allow defining the schema as a parameter of the csv function. As a workaround, you can use the snowflake.snowpark.DataFrameReader.schema function.

3. オプションパラメーター

Snowpark does not allow defining the extra options as parameters of the csv function. As a workaround, for many of them you can use the snowflake.snowpark.DataFrameReader.option function to specify those parameters as options of the DataFrameReader.

注釈

以下のオプションはSnowparkでは サポートされていません

  • columnNameOfCorruptRecord

  • emptyValue

  • enforceSchema

  • header

  • ignoreLeadingWhiteSpace

  • ignoreTrailingWhiteSpace

  • inferSchema

  • locale

  • maxCharsPerColumn

  • maxColumns

  • mode

  • multiLine

  • nanValue

  • negativeInf

  • nullValue

  • positiveInf

  • quoteAll

  • samplingRatio

  • timestampNTZFormat

  • unescapedQuoteHandling

以下は、Snowparkで動作するように上記の提案を適用した後の入力コードの全体的な例です。

stage = f'{session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {stage}')

session.file.put(f"file:///path/to/your/file1.csv", f"@{stage}")
session.file.put(f"file:///path/to/your/file2.csv", f"@{stage}")
session.file.put(f"file:///path/to/your/file3.csv", f"@{stage}")

df = session.read.schema(my_schema).option("encoding", "UTF-8").option("sep", ",").csv(stage)

その他の推奨事項

SPRKPY1027

警告

This issue code has been deprecated since Spark Conversion Core 4.5.2

メッセージ: pyspark.sql.readwriter.DataFrameReader.json has a workaround, see documentation for more info

カテゴリ: 警告

説明

This issue appears when the SMA detects a use of the pyspark.sql.readwriter.DataFrameReader.json function, which has a workaround.

シナリオ

入力

Below is an example of a use of the pyspark.sql.readwriter.DataFrameReader.json function that generates this EWI. In this example, the json function is used to read multiple .json files with a given schema and uses some extra options such as primitiveAsString and dateFormat to fine-tune the behavior of reading the files.

file_paths = [
  "path/to/your/file1.json",
  "path/to/your/file2.json",
  "path/to/your/file3.json",
]

df = session.read.json(
  file_paths,
  schema=my_schema,
  primitiveAsString=True,
  dateFormat="2023-06-20"
)

出力

The SMA adds the EWI SPRKPY1027 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

file_paths = [
  "path/to/your/file1.json",
  "path/to/your/file2.json",
  "path/to/your/file3.json",
]

#EWI: SPRKPY1027 => pyspark.sql.readwriter.DataFrameReader.json has a workaround, see documentation for more info
df = session.read.json(
  file_paths,
  schema=my_schema,
  primitiveAsString=True,
  dateFormat="2023-06-20"
)

推奨される修正

In this section, we explain how to configure the path parameter, the schema parameter and some options to make them work in Snowpark.

1. パスパラメーター

Snowpark requires the path parameter to be a stage location so, as a workaround, you can create a temporary stage and add each .json file to that stage using the prefix file://.

2. スキーマパラメーター

Snowpark does not allow defining the schema as a parameter of the json function. As a workaround, you can use the snowflake.snowpark.DataFrameReader.schema function.

3. オプションパラメーター

Snowpark does not allow defining the extra options as parameters of the json function. As a workaround, for many of them you can use the snowflake.snowpark.DataFrameReader.option function to specify those parameters as options of the DataFrameReader.

注釈

以下のオプションはSnowparkではサポートされていません。

  • allowBackslashEscapingAnyCharacter

  • allowComments

  • allowNonNumericNumbers

  • allowNumericLeadingZero

  • allowSingleQuotes

  • allowUnquotedControlChars

  • allowUnquotedFieldNames

  • columnNameOfCorruptRecord

  • dropFiledIfAllNull

  • encoding

  • ignoreNullFields

  • lineSep

  • locale

  • mode

  • multiline

  • prefersDecimal

  • primitiveAsString

  • samplingRatio

  • timestampNTZFormat

  • timeZone

以下は、Snowparkで動作するように上記の提案を適用した後の入力コードの全体的な例です。

stage = f'{session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {stage}')

session.file.put(f"file:///path/to/your/file1.json", f"@{stage}")
session.file.put(f"file:///path/to/your/file2.json", f"@{stage}")
session.file.put(f"file:///path/to/your/file3.json", f"@{stage}")

df = session.read.schema(my_schema).option("dateFormat", "2023-06-20").json(stage)

その他の推奨事項

SPRKPY1028

メッセージ: pyspark.sql.readwriter.DataFrameReader.orc has a workaround, see documentation for more info

カテゴリ: 警告

説明

This issue appears when the SMA detects a use of the pyspark.sql.readwriter.DataFrameReader.orc function, which has a workaround.

シナリオ

入力

Below is an example of a use of the pyspark.sql.readwriter.DataFrameReader.orc function that generates this EWI. In this example, the orc function is used to read multiple .orc files and uses some extra options such as mergeSchema and recursiveFileLookup to fine-tune the behavior of reading the files.

file_paths = [
  "path/to/your/file1.orc",
  "path/to/your/file2.orc",
  "path/to/your/file3.orc",
]

df = session.read.orc(
  file_paths,
  mergeSchema="True",
  recursiveFileLookup="True"
)

出力

The SMA adds the EWI SPRKPY1028 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

file_paths = [
  "path/to/your/file1.orc",
  "path/to/your/file2.orc",
  "path/to/your/file3.orc",
]

#EWI: SPRKPY1028 => pyspark.sql.readwriter.DataFrameReader.orc has a workaround, see documentation for more info
df = session.read.orc(
  file_paths,
  mergeSchema="True",
  recursiveFileLookup="True"
)

推奨される修正

In this section, we explain how to configure the path parameter and the extra options to make them work in Snowpark.

1. パスパラメーター

Snowpark requires the path parameter to be a stage location so, as a workaround, you can create a temporary stage and add each .orc file to that stage using the prefix file://.

2. オプションパラメーター

Snowpark does not allow defining the extra options as parameters of the orc function. As a workaround, for many of them you can use the snowflake.snowpark.DataFrameReader.option function to specify those parameters as options of the DataFrameReader.

注釈

以下のオプションはSnowparkではサポートされていません。

  • compression

  • mergeSchema

以下は、Snowparkで動作するように上記の提案を適用した後の入力コードの全体的な例です。

stage = f'{session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {stage}')

session.file.put(f"file:///path/to/your/file1.orc", f"@{stage}")
session.file.put(f"file:///path/to/your/file2.orc", f"@{stage}")
session.file.put(f"file:///path/to/your/file3.orc", f"@{stage}")

df = session.read.option(recursiveFileLookup, "True").orc(stage)

その他の推奨事項

SPRKPY1029

メッセージ:This issue appears when the tool detects the usage of pyspark.sql.readwriter.DataFrameReader.parquet.この関数はサポートされていますが、SnowparkとSpark API の違いによっては、手動での変更が必要になる場合があります。

カテゴリ: 警告

説明

This issue appears when the SMA detects a use of the pyspark.sql.readwriter.DataFrameReader.parquet function. This function is supported by Snowpark, however, there are some differences that would require some manual changes.

シナリオ

入力

Below is an example of a use of the pyspark.sql.readwriter.DataFrameReader.parquet function that generates this EWI.

file_paths = [
  "path/to/your/file1.parquet",
  "path/to/your/file2.parquet",
  "path/to/your/file3.parquet",
]

df = session.read.parquet(
  *file_paths,
  mergeSchema="true",
  pathGlobFilter="*file*",
  recursiveFileLookup="true",
  modifiedBefore="2024-12-31T00:00:00",
  modifiedAfter="2023-12-31T00:00:00"
)

出力

The SMA adds the EWI SPRKPY1029 to the output code to let you know that this function is supported by Snowpark, but it requires some manual adjustments. Please note that the options supported by Snowpark are transformed into option function calls and those that are not supported are removed. This is explained in more detail in the next sections.

file_paths = [
  "path/to/your/file1.parquet",
  "path/to/your/file2.parquet",
  "path/to/your/file3.parquet"
]

#EWI: SPRKPY1076 => Some of the included parameters are not supported in the parquet function, the supported ones will be added into a option method.
#EWI: SPRKPY1029 => This issue appears when the tool detects the usage of pyspark.sql.readwriter.DataFrameReader.parquet. This function is supported, but some of the differences between Snowpark and the Spark API might require making some manual changes.
df = session.read.option("PATTERN", "*file*").parquet(
  *file_paths
)

推奨される修正

In this section, we explain how to configure the paths and options parameters to make them work in Snowpark.

1. パスパラメーター

In Spark, this parameter can be a local or cloud location. Snowpark only accepts cloud locations using a snowflake stage. So, you can create a temporal stage and add each file into it using the prefix file://.

2. オプションパラメーター

Snowpark does not allow defining the different options as parameters of the parquet function. As a workaround, you can use the option or options functions to specify those parameters as extra options of the DataFrameReader.

Snowpark options と PySpark options は完全には同じではないため、手動で変更することが必要になる場合があります。以下は、Snowparkで最も一般的な PySpark オプションの構成方法の詳細です。

2.1 mergeSchemaオプション

Parquet supports schema evolution, allowing users to start with a simple schema and gradually add more columns as needed. This can result in multiple parquet files with different but compatible schemas. In Snowflake, thanks to the infer_schema capabilities you don't need to do that and therefore the mergeSchema option can just be removed.

2.2 pathGlobFilter オプション

If you want to load only a subset of files from the stage, you can use the pattern option to specify a regular expression that matches the files you want to load. The SMA already automates this as you can see in the output of this scenario.

2.3 recursiveFileLookupstr オプション

This option is not supported by Snowpark. The best recommendation is to use a regular expression like with the pathGlobFilter option to achieve something similar.

2.4 modifiedBefore / modifiedAfterオプション

You can achieve the same result in Snowflake by using the metadata columns.

注釈

以下のオプションはSnowparkではサポートされていません。

  • compression

  • datetimeRebaseMode

  • int96RebaseMode

  • mergeSchema

以下は、Snowparkで動作させるために入力コードをどのように変換すべきかの包括的な例です。

from snowflake.snowpark.column import METADATA_FILE_LAST_MODIFIED, METADATA_FILENAME

temp_stage = f'{session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}')

session.file.put(f"file:///path/to/your/file1.parquet", f"@{temp_stage}")
session.file.put(f"file:///path/to/your/file2.parquet", f"@{temp_stage}")
session.file.put(f"file:///path/to/your/file3.parquet", f"@{temp_stage}")

df = session.read \
  .option("PATTERN", ".*file.*") \
  .with_metadata(METADATA_FILENAME, METADATA_FILE_LAST_MODIFIED) \
  .parquet(temp_stage) \
  .where(METADATA_FILE_LAST_MODIFIED < '2024-12-31T00:00:00') \
  .where(METADATA_FILE_LAST_MODIFIED > '2023-12-31T00:00:00')

その他の推奨事項

  • Snowflakeでは、parquetデータの取り込みに次のような他のアプローチを活用できます。

  • 移行の際には、 SMA のレポートを活用してファイルのインベントリを作成し、モダナイゼーション後にどのステージ/テーブルにデータがマッピングされるかを決定するのが良い方法です。

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1030

警告

この問題コードは、 廃止 されました。

メッセージ: pyspark.sql.session.SparkSession.Builder.appName has a workaround, see documentation for more info

カテゴリ: 警告

説明

This issue appears when the SMA detects a use of the pyspark.sql.session.SparkSession.Builder.appName function, which has a workaround.

シナリオ

入力

Below is an example of a use of the pyspark.sql.session.SparkSession.Builder.appName function that generates this EWI. In this example, the appName function is used to set MyApp as the name of the application.

session = SparkSession.builder.appName("MyApp").getOrCreate()

出力

The SMA adds the EWI SPRKPY1030 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

#EWI: SPRKPY1030 => pyspark.sql.session.SparkSession.Builder.appName has a workaround, see documentation for more info
session = Session.builder.appName("MyApp").getOrCreate()

推奨される修正

As a workaround, you can import the snowpark_extensions package which provides an extension for the appName function.

import snowpark_extensions
session = SessionBuilder.appName("MyApp").getOrCreate()

その他の推奨事項

SPRKPY1031

警告

This issue code has been deprecated since Spark Conversion Core 2.7.0

メッセージ: pyspark.sql.column.Column.contains has a workaround, see documentation for more info

カテゴリ: 警告

説明

This issue appears when the SMA detects a use of the pyspark.sql.column.Column.contains function, which has a workaround.

シナリオ

入力

Below is an example of a use of the pyspark.sql.column.Column.contains function that generates this EWI. In this example, the contains function is used to filter the rows where the 'City' column contains the substring 'New'.

df = spark.createDataFrame([("Alice", "New York"), ("Bob", "Los Angeles"), ("Charlie", "Chicago")], ["Name", "City"])
df_filtered = df.filter(col("City").contains("New"))

出力

The SMA adds the EWI SPRKPY1031 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([("Alice", "New York"), ("Bob", "Los Angeles"), ("Charlie", "Chicago")], ["Name", "City"])
#EWI: SPRKPY1031 => pyspark.sql.column.Column.contains has a workaround, see documentation for more info
df_filtered = df.filter(col("City").contains("New"))

推奨される修正

As a workaround, you can use the snowflake.snowpark.functions.contains function by passing the column as the first argument and the element to search as the second argument. If the element to search is a literal value then it should be converted into a column expression using the lit function.

from snowflake.snowpark import functions as f
df = spark.createDataFrame([("Alice", "New York"), ("Bob", "Los Angeles"), ("Charlie", "Chicago")], ["Name", "City"])
df_filtered = df.filter(f.contains(col("City"), f.lit("New")))

その他の推奨事項

SPRKPY1032

Message: *spark element* is not defined

カテゴリ: 変換エラー

説明

この問題は、指定された要素に対して適切なマッピングステータスを SMA が決定できなかった場合に発生します。これは、この要素がSnowparkでサポートされているかどうかを SMA がまだ知らないことを意味します。これは、定義されていない要素に対して SMA が使用する一般的なエラーコードです。

シナリオ

入力

Below is an example of a function for which the SMA could not determine an appropriate mapping status. In this case, you should assume that not_defined_function() is a valid PySpark function and the code runs.

sc.parallelize(["a", "b", "c", "d", "e"], 3).not_defined_function().collect()

出力

The SMA adds the EWI SPRKPY1032 to the output code to let you know that this element is not defined.

#EWI: SPRKPY1032 => pyspark.rdd.RDD.not_defined_function is not defined
sc.parallelize(["a", "b", "c", "d", "e"], 3).not_defined_function().collect()

推奨される修正

問題の識別を試みるために、以下を検証します。

  • ソースコードの構文が正しいか、スペルが正しいかを確認します。

  • Check if you are using a PySpark version supported by the SMA. To know which PySpark version is supported by the SMA at the moment of running the SMA, you can review the first page of the DetailedReport.docx file.

If this is a valid PySpark element, please report that you encountered a conversion error on that particular element using the Report an Issue option of the SMA and include any additional information that you think may be helpful.

Please note that if an element is not defined, it does not mean that it is not supported by Snowpark. You should check the Snowpark Documentation to verify if an equivalent element exist.

その他の推奨事項

SPRKPY1033

警告

この問題コードは、 廃止 されました。

メッセージ: pyspark.sql.functions.asc has a workaround, see documentation for more info

カテゴリ: 警告

説明

This issue appears when the SMA detects a use of the pyspark.sql.functions.asc function, which has a workaround.

シナリオ

The pyspark.sql.functions.asc function takes either a column object or the name of the column as a string as its parameter. Both scenarios are not supported by Snowpark so this EWI is generated.

シナリオ1

入力

Below is an example of a use of the pyspark.sql.functions.asc function that takes a column object as parameter.

df.orderBy(asc(col))

出力

The SMA adds the EWI SPRKPY1033 to the output code to let you know that the asc function with a column object parameter is not directly supported by Snowpark, but it has a workaround.

#EWI: SPRKPY1033 => pyspark.sql.functions.asc has a workaround, see documentation for more info
df.orderBy(asc(col))

推奨される修正

As a workaround, you can call the snowflake.snowpark.Column.asc function from the column parameter.

df.orderBy(col.asc())
シナリオ2

入力

Below is an example of a use of the pyspark.sql.functions.asc function that takes the name of the column as parameter.

df.orderBy(asc("colName"))

出力

The SMA adds the EWI SPRKPY1033 to the output code to let you know that the asc function with a column name parameter is not directly supported by Snowpark, but it has a workaround.

#EWI: SPRKPY1033 => pyspark.sql.functions.asc has a workaround, see documentation for more info
df.orderBy(asc("colName"))

推奨される修正

As a workaround, you can convert the string parameter into a column object using the snowflake.snowpark.functions.col function and then call the snowflake.snowpark.Column.asc function.

df.orderBy(col("colName").asc())

その他の推奨事項

SPRKPY1034

警告

この問題コードは、 廃止 されました。

メッセージ: pyspark.sql.functions.desc has a workaround, see documentation for more info

カテゴリ: 警告

説明

This issue appears when the SMA detects a use of the pyspark.sql.functions.desc function, which has a workaround.

シナリオ

The pyspark.sql.functions.desc function takes either a column object or the name of the column as a string as its parameter. Both scenarios are not supported by Snowpark so this EWI is generated.

シナリオ1

入力

Below is an example of a use of the pyspark.sql.functions.desc function that takes a column object as parameter.

df.orderBy(desc(col))

出力

The SMA adds the EWI SPRKPY1034 to the output code to let you know that the desc function with a column object parameter is not directly supported by Snowpark, but it has a workaround.

#EWI: SPRKPY1034 => pyspark.sql.functions.desc has a workaround, see documentation for more info
df.orderBy(desc(col))

推奨される修正

As a workaround, you can call the snowflake.snowpark.Column.desc function from the column parameter.

df.orderBy(col.desc())
シナリオ2

入力

Below is an example of a use of the pyspark.sql.functions.desc function that takes the name of the column as parameter.

df.orderBy(desc("colName"))

出力

The SMA adds the EWI SPRKPY1034 to the output code to let you know that the desc function with a column name parameter is not directly supported by Snowpark, but it has a workaround.

#EWI: SPRKPY1034 => pyspark.sql.functions.desc has a workaround, see documentation for more info
df.orderBy(desc("colName"))

推奨される修正

As a workaround, you can convert the string parameter into a column object using the snowflake.snowpark.functions.col function and then call the snowflake.snowpark.Column.desc function.

df.orderBy(col("colName").desc())

その他の推奨事項

SPRKPY1035

警告

この問題コードは、 廃止 されました。

メッセージ: pyspark.sql.functions.reverse has a workaround, see documentation for more info

カテゴリ: 警告

説明

This issue appears when the SMA detects a use of the pyspark.sql.functions.reverse function, which has a workaround.

シナリオ

入力

Below is an example of a use of the pyspark.sql.functions.reverse function that generates this EWI. In this example, the reverse function is used to reverse each string of the word column.

df = spark.createDataFrame([("hello",), ("world",)], ["word"])
df_reversed = df.withColumn("reversed_word", reverse(df["word"]))
df_reversed = df.withColumn("reversed_word", reverse("word"))

出力

The SMA adds the EWI SPRKPY1035 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([("hello",), ("world",)], ["word"])
#EWI: SPRKPY1035 => pyspark.sql.functions.reverse has a workaround, see documentation for more info
df_reversed = df.withColumn("reversed_word", reverse(df["word"]))
#EWI: SPRKPY1035 => pyspark.sql.functions.reverse has a workaround, see documentation for more info
df_reversed = df.withColumn("reversed_word", reverse("word"))

推奨される修正

As a workaround, you can import the snowpark_extensions package which provides an extension for the reverse function.

import snowpark_extensions

df = spark.createDataFrame([("hello",), ("world",)], ["word"])
df_reversed = df.withColumn("reversed_word", reverse(df["word"]))
df_reversed = df.withColumn("reversed_word", reverse("word"))

その他の推奨事項

SPRKPY1036

警告

この問題コードは、 廃止 されました。

メッセージ: pyspark.sql.column.Column.getField has a workaround, see documentation for more info

カテゴリ: 警告

説明

This issue appears when the SMA detects a use of the pyspark.sql.column.Column.getField function, which has a workaround.

シナリオ

入力

Below is an example of a use of the pyspark.sql.column.Column.getField function that generates this EWI. In this example, the getField function is used to extract the name from the info column.

df = spark.createDataFrame([(1, {"name": "John", "age": 30}), (2, {"name": "Jane", "age": 25})], ["id", "info"])
df_with_name = df.withColumn("name", col("info").getField("name"))

出力

The SMA adds the EWI SPRKPY1036 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([(1, {"name": "John", "age": 30}), (2, {"name": "Jane", "age": 25})], ["id", "info"])
#EWI: SPRKPY1036 => pyspark.sql.column.Column.getField has a workaround, see documentation for more info
df_with_name = df.withColumn("name", col("info").getField("name"))

推奨される修正

As a workaround, you can use the Snowpark column indexer operator with the name of the field as the index.

df = spark.createDataFrame([(1, {"name": "John", "age": 30}), (2, {"name": "Jane", "age": 25})], ["id", "info"])
df_with_name = df.withColumn("name", col("info")["name"])

その他の推奨事項

SPRKPY1037

警告

この問題コードは、 廃止 されました。

メッセージ: pyspark.sql.functions.sort_array has a workaround, see documentation for more info

カテゴリ: 警告

説明

This issue appears when the SMA detects a use of the pyspark.sql.functions.sort_array function, which has a workaround.

シナリオ

入力

Below is an example of a use of the pyspark.sql.functions.sort_array function that generates this EWI. In this example, the sort_array function is used to sort the numbers array in ascending and descending order.

df = spark.createDataFrame([(1, [3, 1, 2]), (2, [10, 5, 8]), (3, [6, 4, 7])], ["id", "numbers"])
df_sorted_asc = df.withColumn("sorted_numbers_asc", sort_array("numbers", asc=True))
df_sorted_desc = df.withColumn("sorted_numbers_desc", sort_array("numbers", asc=False))

出力

The SMA adds the EWI SPRKPY1037 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([(1, [3, 1, 2]), (2, [10, 5, 8]), (3, [6, 4, 7])], ["id", "numbers"])
#EWI: SPRKPY1037 => pyspark.sql.functions.sort_array has a workaround, see documentation for more info
df_sorted_asc = df.withColumn("sorted_numbers_asc", sort_array("numbers", asc=True))
#EWI: SPRKPY1037 => pyspark.sql.functions.sort_array has a workaround, see documentation for more info
df_sorted_desc = df.withColumn("sorted_numbers_desc", sort_array("numbers", asc=False))

推奨される修正

As a workaround, you can import the snowpark_extensions package which provides an extension for the sort_array function.

import snowpark_extensions

df = spark.createDataFrame([(1, [3, 1, 2]), (2, [10, 5, 8]), (3, [6, 4, 7])], ["id", "numbers"])
df_sorted_asc = df.withColumn("sorted_numbers_asc", sort_array("numbers", asc=True))
df_sorted_desc = df.withColumn("sorted_numbers_desc", sort_array("numbers", asc=False))

その他の推奨事項

SPRKPY1038

Message: *spark element* is not yet recognized

カテゴリ: 変換エラー

説明

この問題は、 SMA によって認識されなかった PySpark 要素がソースコードにある場合に発生します。これは、以下のようなさまざまな理由で起こる可能性があります。

  • PySpark に存在しない要素。

  • SMA がまだサポートしていない PySpark バージョンで追加された要素。

  • 要素を処理する際の SMA の内部エラー。

これは、認識できない要素に対して SMA が使用する一般的なエラーコードです。

シナリオ

入力

以下は、 PySpark に存在しないため、 SMA で認識できなかった関数の使用例です。

from pyspark.sql import functions as F
F.unrecognized_function()

出力

The SMA adds the EWI SPRKPY1038 to the output code to let you know that this element could not be recognized.

from snowflake.snowpark import functions as F
#EWI: SPRKPY1038 => pyspark.sql.functions.non_existent_function is not yet recognized
F.unrecognized_function()

推奨される修正

問題の識別を試みるために、以下を検証します。

  • PySpark に要素が存在するかどうかを確認します。

  • 要素のスペルが正しいかどうかを確認します。

  • Check if you are using a PySpark version supported by the SMA. To know which PySpark version is supported by the SMA at the moment of running the SMA, you can review the first page of the DetailedReport.docx file.

If it is a valid PySpark element, please report that you encountered a conversion error on that particular element using the Report an Issue option of the SMA and include any additional information that you think may be helpful.

Please note that if an element could not be recognized by the SMA, it does not mean that it is not supported by Snowpark. You should check the Snowpark Documentation to verify if an equivalent element exist.

その他の推奨事項

SPRKPY1039

警告

この問題コードは、 廃止 されました。

メッセージ: pyspark.sql.column.Column.getItem has a workaround, see documentation for more info

カテゴリ: 警告

説明

This issue appears when the SMA detects a use of the pyspark.sql.column.Column.getItem function, which has a workaround.

シナリオ

入力

Below is an example of a use of the pyspark.sql.column.Column.getItem function that generates this EWI. In this example, the getItem function is used to get an item by position and by key.

df = spark.createDataFrame([(1, ["apple", "banana", "orange"]), (2, ["carrot", "avocado", "banana"])], ["id", "fruits"])
df.withColumn("first_fruit", col("fruits").getItem(0))

df = spark.createDataFrame([(1, {"apple": 10, "banana": 20}), (2, {"carrot": 15, "grape": 25}), (3, {"pear": 30, "apple": 35})], ["id", "fruit_quantities"])
df.withColumn("apple_quantity", col("fruit_quantities").getItem("apple"))

出力

The SMA adds the EWI SPRKPY1039 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([(1, ["apple", "banana", "orange"]), (2, ["carrot", "avocado", "banana"])], ["id", "fruits"])
#EWI: SPRKPY1039 => pyspark.sql.column.Column.getItem has a workaround, see documentation for more info
df.withColumn("first_fruit", col("fruits").getItem(0))

df = spark.createDataFrame([(1, {"apple": 10, "banana": 20}), (2, {"carrot": 15, "grape": 25}), (3, {"pear": 30, "apple": 35})], ["id", "fruit_quantities"])
#EWI: SPRKPY1039 => pyspark.sql.column.Column.getItem has a workaround, see documentation for more info
df.withColumn("apple_quantity", col("fruit_quantities").getItem("apple"))

推奨される修正

回避策として、フィールドの名前または位置をインデックスとして Snowpark列インデクサー演算子 を使用することができます。

df = spark.createDataFrame([(1, ["apple", "banana", "orange"]), (2, ["carrot", "avocado", "banana"])], ["id", "fruits"])
df.withColumn("first_fruit", col("fruits")[0])

df = spark.createDataFrame([(1, {"apple": 10, "banana": 20}), (2, {"carrot": 15, "grape": 25}), (3, {"pear": 30, "apple": 35})], ["id", "fruit_quantities"])
df.withColumn("apple_quantity", col("fruit_quantities")["apple"])

その他の推奨事項

SPRKPY1040

警告

この問題コードは、 廃止 されました。

メッセージ: pyspark.sql.functions.explode has a workaround, see documentation for more info

カテゴリ: 警告

説明

This issue appears when the SMA detects a use of the pyspark.sql.functions.explode function, which has a workaround.

シナリオ

入力

Below is an example of a use of the pyspark.sql.functions.explode function that generates this EWI. In this example, the explode function is used to generate one row per array item for the numbers column.

df = spark.createDataFrame([("Alice", [1, 2, 3]), ("Bob", [4, 5]), ("Charlie", [6, 7, 8, 9])], ["name", "numbers"])
exploded_df = df.select("name", explode(df.numbers).alias("number"))

出力

The SMA adds the EWI SPRKPY1040 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([("Alice", [1, 2, 3]), ("Bob", [4, 5]), ("Charlie", [6, 7, 8, 9])], ["name", "numbers"])
#EWI: SPRKPY1040 => pyspark.sql.functions.explode has a workaround, see documentation for more info
exploded_df = df.select("name", explode(df.numbers).alias("number"))

推奨される修正

As a workaround, you can import the snowpark_extensions package which provides an extension for the explode function.

import snowpark_extensions

df = spark.createDataFrame([("Alice", [1, 2, 3]), ("Bob", [4, 5]), ("Charlie", [6, 7, 8, 9])], ["name", "numbers"])
exploded_df = df.select("name", explode(df.numbers).alias("number"))

その他の推奨事項

SPRKPY1041

警告

This issue code has been deprecated since Spark Conversion Core Version 2.9.0

メッセージ: pyspark.sql.functions.explode_outer has a workaround

カテゴリ: 警告

説明

This issue appears when the tool detects the usage of pyspark.sql.functions.explode_outer which has a workaround.

シナリオ

入力

この例では、 explode_outer というメソッドを選択呼出しで使用しています。

df = spark.createDataFrame(
    [(1, ["foo", "bar"], {"x": 1.0}),
     (2, [], {}),
     (3, None, None)],
    ("id", "an_array", "a_map")
)

df.select("id", "an_array", explode_outer("a_map")).show()

出力

The tool adds the EWI SPRKPY1041 indicating that a workaround can be implemented.

df = spark.createDataFrame(
    [(1, ["foo", "bar"], {"x": 1.0}),
     (2, [], {}),
     (3, None, None)],
    ("id", "an_array", "a_map")
)

#EWI: SPRKPY1041 => pyspark.sql.functions.explode_outer has a workaround, see documentation for more info
df.select("id", "an_array", explode_outer("a_map")).show()

推奨される修正

As a workaround, you can import the snowpark_extensions package, which contains a helper for the explode_outer function.

import snowpark_extensions

df = spark.createDataFrame(
    [(1, ["foo", "bar"], {"x": 1.0}),
     (2, [], {}),
     (3, None, None)],
    ("id", "an_array", "a_map")
)

df.select("id", "an_array", explode_outer("a_map")).show()

その他の推奨事項

SPRKPY1042

メッセージ: pyspark.sql.functions.posexplode has a workaround

カテゴリ: 警告

説明

This issue appears when the tool detects the usage of pyspark.sql.functions.posexplode which has a workaround.

シナリオ

There are a couple of scenarios that this method can handle depending on the type of column it is passed as a parameter, it can be a list of values or a map/directory (keys/values).

シナリオ1

入力

Below is an example of the usage of posexplode passing as a parameter of a list of values.

df = spark.createDataFrame(
    [Row(a=1,
         intlist=[1, 2, 3])])

df.select(posexplode(df.intlist)).collect()

出力

The tool adds the EWI SPRKPY1042 indicating that a workaround can be implemented.

df = spark.createDataFrame(
    [Row(a=1,
         intlist=[100, 200, 300])])
#EWI: SPRKPY1042 => pyspark.sql.functions.posexplode has a workaround, see documentation for more info

df.select(posexplode(df.intlist)).show()

推奨される修正

For having the same behavior, use the method functions.flatten, drop extra columns, and rename index and value column names.

df = spark.createDataFrame(
  [Row(a=1,
       intlist=[1, 2, 3])])

df.select(
    flatten(df.intlist))\
    .drop("DATA", "SEQ", "KEY", "PATH", "THIS")\
    .rename({"INDEX": "pos", "VALUE": "col"}).show()
シナリオ2

入力

Below is another example of the usage of posexplode passing as a parameter a map/dictionary (keys/values)

df = spark.createDataFrame([
    [1, [1, 2, 3], {"Ashi Garami": "Single Leg X"}, "Kimura"],
    [2, [11, 22], {"Sankaku": "Triangle"}, "Coffee"]
],
schema=["idx", "lists", "maps", "strs"])

df.select(posexplode(df.maps)).show()

出力

The tool adds the EWI SPRKPY1042 indicating that a workaround can be implemented.

df = spark.createDataFrame([
    [1, [1, 2, 3], {"Ashi Garami": "Single Leg X"}, "Kimura"],
    [2, [11, 22], {"Sankaku": "Triangle"}, "Coffee"]
],
schema=["idx", "lists", "maps", "strs"])
#EWI: SPRKPY1042 => pyspark.sql.functions.posexplode has a workaround, see documentation for more info

df.select(posexplode(df.maps)).show()

推奨される修正

As a workaround, you can use functions.row_number to get the position and functions.explode with the name of the field to get the value the key/value for dictionaries.

df = spark.createDataFrame([
    [10, [1, 2, 3], {"Ashi Garami": "Single Leg X"}, "Kimura"],
    [11, [11, 22], {"Sankaku": "Triangle"}, "Coffee"]
],
    schema=["idx", "lists", "maps", "strs"])

window = Window.orderBy(col("idx").asc())

df.select(
    row_number().over(window).alias("pos"),
    explode(df.maps).alias("key", "value")).show()

注意: row_numberを使用する場合は完全に等価ではありません。row_numberが1から始まるからです(Sparkメソッドのように0ではありません)。

その他の推奨事項

SPRKPY1043

メッセージ: pyspark.sql.functions.posexplode_outer has a workaround

カテゴリ: 警告

説明

This issue appears when the tool detects the usage of pyspark.sql.functions.posexplode_outer which has a workaround.

シナリオ

There are a couple of scenarios that this method can handle depending on the type of column it is passed as a parameter, it can be a list of values or a map/directory (keys/values).

シナリオ1

入力

Below is an example that shows the usage of posexplode_outer passing a list of values.

df = spark.createDataFrame(
    [
        (1, ["foo", "bar"]),
        (2, []),
        (3, None)],
    ("id", "an_array"))

df.select("id", "an_array", posexplode_outer("an_array")).show()

出力

The tool adds the EWI SPRKPY1043 indicating that a workaround can be implemented.

df = spark.createDataFrame(
    [
        (1, ["foo", "bar"]),
        (2, []),
        (3, None)],
    ("id", "an_array"))
#EWI: SPRKPY1043 => pyspark.sql.functions.posexplode_outer has a workaround, see documentation for more info

df.select("id", "an_array", posexplode_outer("an_array")).show()

推奨される修正

For having the same behavior, use the method functions.flatten sending the outer parameter in True, drop extra columns, and rename index and value column names.

df = spark.createDataFrame(
    [
        (1, ["foo", "bar"]),
        (2, []),
        (3, None)],
    ("id", "an_array"))

df.select(
    flatten(df.an_array, outer=True))\
    .drop("DATA", "SEQ", "KEY", "PATH", "THIS")\
    .rename({"INDEX": "pos", "VALUE": "col"}).show()
シナリオ2

入力

以下は、 マップ/ディクショナリ(キー/値) を渡すposexplode_outerの別の使用例です。

df = spark.createDataFrame(
    [
        (1, {"x": 1.0}),
        (2, {}),
        (3, None)],
    ("id", "a_map"))

df.select(posexplode_outer(df.a_map)).show()

出力

The tool adds the EWI SPRKPY1043 indicating that a workaround can be implemented.

df = spark.createDataFrame(
    [
        (1, {"x": "Ashi Garami"}),
        (2, {}),
        (3, None)],
    ("id", "a_map"))
#EWI: SPRKPY1043 => pyspark.sql.functions.posexplode_outer has a workaround, see documentation for more info

df.select(posexplode_outer(df.a_map)).show()

推奨される修正

As a workaround, you can use functions.row_number to get the position and functions.explode_outer with the name of the field to get the value of the key/value for dictionaries.

df = spark.createDataFrame(
    [
        (1, {"x": "Ashi Garami"}),
        (2,  {}),
        (3, None)],
    ("id", "a_map"))

window = Window.orderBy(col("id").asc())

df.select(
    row_number().over(window).alias("pos"),
          explode_outer(df.a_map)).show()

注意: row_numberを使用する場合は完全に等価ではありません。row_numberが1から始まるからです(Sparkメソッドのように0ではありません)。

その他の推奨事項

SPRKPY1044

警告

This issue code has been deprecated since Spark Conversion Core Version 2.4.0

メッセージ: pyspark.sql.functions.split has a workaround

カテゴリ: 警告。

説明

This issue appears when the tool detects the usage of pyspark.sql.functions.split which has a workaround.

シナリオ

メソッドに渡されるパラメーターの量によって、いくつかのシナリオがあります。

シナリオ1

入力

Below is an example when the function split has just the str and pattern parameters

F.split('col', '\\|')

出力

The tool shows the EWI SPRKPY1044 indicating there is a workaround.

#EWI: SPRKPY1044 => pyspark.sql.functions.split has a workaround, see the documentation for more info
F.split('col', '\\|')

推奨される修正

As a workaround, you can call the function snowflake.snowpark.functions.lit with the pattern parameter and send it into the split.

F.split('col', lit('\\|'))
## the result of lit will be sent to the split function

シナリオ2

入力

Below is another example when the function split has the str, pattern, and limit parameters.

F.split('col', '\\|', 2)

出力

The tool shows the EWI SPRKPY1044 indicating there is a workaround.

#EWI: SPRKPY1044 => pyspark.sql.functions.split has a workaround, see the documentation for more info
F.split('col', '\\|', 2)

推奨される修正

このシナリオはサポートされていません。

その他の推奨事項

SPRKPY1045

メッセージ: pyspark.sql.functions.map_values has a workaround

カテゴリ: 警告。

説明

この関数は、 マップ/ディクショナリ9キー/値) を含む列から値のリストを抽出するために使用されます。

The issue appears when the tool detects the usage of pyspark.sql.functions.map_values which has a workaround.

シナリオ

入力

Below is an example of the usage of the method map_values.

df = spark.createDataFrame(
    [(1, {'Apple': 'Fruit', 'Potato': 'Vegetable'})],
    ("id", "a_map"))

df.select(map_values("a_map")).show()

出力

The tool adds the EWI SPRKPY1045 indicating that a workaround can be implemented.

df = spark.createDataFrame(
    [(1, {'Apple': 'Fruit', 'Potato': 'Vegetable'})],
    ("id", "a_map"))
#EWI: SPRKPY1045 => pyspark.sql.functions.map_values has a workaround, see documentation for more info

df.select(map_values("a_map")).show()

推奨される修正

As a workaround, you can create an udf to get the values for a column. The below example shows how to create the udf, then assign it to F.map_values, and then make use of it.

from snowflake.snowpark import functions as F
from snowflake.snowpark.types import ArrayType, MapType

map_values_udf=None

def map_values(map):
    global map_values_udf
    if not map_values_udf:
        def _map_values(map: dict)->list:
            return list(map.values())
        map_values_udf = F.udf(_map_values,return_type=ArrayType(),input_types=[MapType()],name="map_values",is_permanent=False,replace=True)
    return map_values_udf(map)

F.map_values = map_values

df.select(map_values(colDict))

その他の推奨事項

SPRKPY1046

警告

This issue code has been deprecated since Spark Conversion Core Version 2.1.22

メッセージ: pyspark.sql.functions.monotonically_increasing_id has a workaround

カテゴリ: 警告。

説明

This issue appears when the tool detects the usage of pyspark.sql.functions.monotonically_increasing_id which has a workaround.

シナリオ

入力

Below is an example of the usage of the method monotonically_increasing_id.

from pyspark.sql import functions as F

spark.range(0, 10, 1, 2).select(F.monotonically_increasing_id()).show()

出力

The tool adds the EWI SPRKPY1046 indicating that a workaround can be implemented.

from pyspark.sql import functions as F
#EWI: SPRKPY1046 => pyspark.sql.functions.monotonically_increasing_id has a workaround, see documentation for more info
spark.range(0, 10, 1, 2).select(F.monotonically_increasing_id()).show()

推奨される修正

ツールバージョンを更新します。

その他の推奨事項

SPRKPY1047

警告

This issue code has been deprecated since Spark Conversion Core Version 4.6.0

説明

This issue appears when the tool detects the usage of pyspark.context.SparkContext.setLogLevel which has a workaround.

シナリオ

入力

Below is an example of the usage of the method setLogLevel.

sparkSession.sparkContext.setLogLevel("WARN")

出力

The tool adds the EWI SPRKPY1047 indicating that a workaround can be implemented.

#EWI: SPRKPY1047 => pyspark.context.SparkContext.setLogLevel has a workaround, see documentation for more info
sparkSession.sparkContext.setLogLevel("WARN")

推奨される修正

Replace the setLogLevel function usage with logging.basicConfig that provides a set of convenience functions for simple logging usage. In order to use it, we need to import two modules, "logging" and "sys", and the level constant should be replaced using the "Level equivalent table":

import logging
import sys
logging.basicConfig(stream=sys.stdout, level=logging.WARNING)
  • レベル等価表

レベルソースパラメーター

レベルターゲットパラメーター

"ALL"

<mark style="color:red;">**This has no equivalent**</mark>

"DEBUG"

logging.DEBUG

"ERROR"

logging.ERROR

"FATAL"

logging.CRITICAL

"INFO"

logging.INFO

"OFF"

logging.NOTSET

"TRACE"

<mark style="color:red;">**This has no equivalent**</mark>

"WARN"

logging.WARNING

その他の推奨事項

SPRKPY1048

警告

This issue code has been deprecated since Spark Conversion Core Version 2.4.0

メッセージ: pyspark.sql.session.SparkSession.conf has a workaround

カテゴリ: 警告。

説明

This issue appears when the tool detects the usage of pyspark.sql.session.SparkSession.conf which has a workaround.

シナリオ

入力

Below is an example of how to set a configuration into the property conf .

spark.conf.set("spark.sql.crossJoin.enabled", "true")

出力

The tool adds the EWI SPRKPY1048 indicating that a workaround can be implemented.

#EWI: SPRKPY1048 => pyspark.sql.session.SparkSession.conf has a workaround, see documentation for more info
spark.conf.set("spark.sql.crossJoin.enabled", "true")

推奨される修正

SparkSession.confはPysparkでのみ使用される特定の設定を渡すために使用され、Snowparkには適用されません。コードを削除したり、コメントしたりすることができます

#spark.conf.set("spark.sql.crossJoin.enabled", "true")

その他の推奨事項

SPRKPY1049

警告

This issue code has been deprecated since Spark Conversion Core Version 2.1.9

メッセージ: pyspark.sql.session.SparkSession.sparkContext has a workaround

カテゴリ: 警告。

説明

This issue appears when the tool detects the usage of pyspark.sql.session.SparkSession.sparkContext which has a workaround.

シナリオ

入力

Below is an example that creates a spark session and then uses the SparkContext property to print the appName.

print("APP Name :"+spark.sparkContext.appName())

出力

The tool adds the EWI SPRKPY1049 indicating that a workaround can be implemented.

#EWI: SPRKPY1049 => pyspark.sql.session.SparkSession.sparkContext has a workaround, see documentation for more info
print("APP Name :"+spark.sparkContext.appName())

推奨される修正

SparkContext は、 SnowPark ではサポートされていませんが、 SparkContext のメソッドやプロパティにセッションインスタンスから直接アクセスすることができます。

## Pyspark
print("APP Name :"+spark.sparkContext.appName())
can be used in SnowPark removing the sparkContext as:
#Manual adjustment in SnowPark
print("APP Name :"+spark.appName());

その他の推奨事項

SPRKPY1050

メッセージ: pyspark.conf.SparkConf.set has a workaround

カテゴリ: 警告。

説明

This issue appears when the tool detects the usage of pyspark.conf.SparkConf.set which has a workaround.

シナリオ

入力

Below is an example that sets a variable using conf.set.

conf = SparkConf().setAppName('my_app')

conf.set("spark.storage.memoryFraction", "0.5")

出力

The tool adds the EWI SPRKPY1050 indicating that a workaround can be implemented.

conf = SparkConf().setAppName('my_app')

#EWI: SPRKPY1050 => pyspark.conf.SparkConf.set has a workaround, see documentation for more info
conf.set("spark.storage.memoryFraction", "0.5")

推奨される修正

SparkConf.setは、Pysparkでのみ使用される構成設定を設定するために使用され、Snowparkには適用されません。コードを削除したり、コメントしたりすることができます

#conf.set("spark.storage.memoryFraction", "0.5")

その他の推奨事項

SPRKPY1051

警告

This issue code has been deprecated since Spark Conversion Core Version 2.4.0

メッセージ: pyspark.sql.session.SparkSession.Builder.master has a workaround

カテゴリ: 警告。

説明

This issue appears when the tool detects pyspark.sql.session.SparkSession.Builder.master usage which has a workaround.

シナリオ

入力

Below is an example of the usage of the method builder.master to set the Spark Master URL to connect to local using 1 core.

spark = SparkSession.builder.master("local[1]")

出力

The tool adds the EWI SPRKPY1051 indicating that a workaround can be implemented.

#EWI: SPRKPY1051 => pyspark.sql.session.SparkSession.Builder.master has a workaround, see documentation for more info
spark = Session.builder.master("local[1]")

推奨される修正

pyspark.sql.session.SparkSession.Builder.master is used to set up a Spark Cluster. Snowpark doesn't use Spark Clusters so you can remove or comment the code.

## spark = Session.builder.master("local[1]")

その他の推奨事項

SPRKPY1052

警告

This issue code has been deprecated since Spark Conversion Core Version 2.8.0

メッセージ: pyspark.sql.session.SparkSession.Builder.enableHiveSupport has a workaround

カテゴリ: 警告。

説明

This issue appears when the tool detects the usage of pyspark.sql.session.SparkSession.Builder.enableHiveSupport which has a workaround.

シナリオ

入力

Below is an example that configures the SparkSession and enables the hive support using the method enableHiveSupport.

spark = Session.builder.appName("Merge_target_table")\
        .config("spark.port.maxRetries","100") \
        .enableHiveSupport().getOrCreate()

出力

The tool adds the EWI SPRKPY1052 indicating that a workaround can be implemented.

#EWI: SPRKPY1052 => pyspark.sql.session.SparkSession.Builder.enableHiveSupport has a workaround, see documentation for more info
spark = Session.builder.appName("Merge_target_table")\
        .config("spark.port.maxRetries","100") \
        .enableHiveSupport().getOrCreate()

推奨される修正

Remove the use of enableHiveSupport function because it is not needed in Snowpark.

spark = Session.builder.appName("Merge_target_table")\
        .config("spark.port.maxRetries","100") \
        .getOrCreate()

その他の推奨事項

SPRKPY1053

メッセージ:An error occurred when extracting the dbc files.

カテゴリ: 警告。

説明

この問題は、dbcファイルを抽出できない場合に発生します。この警告は、重すぎる、アクセスできない、読み取り専用などのような原因が考えられます。

その他の推奨事項

  • 回避策として、ファイルが重すぎて処理できない場合は、ファイルのサイズを確認することができます。また、アクセスの問題を避けるために、ツールがアクセスできるかどうかを分析します。

  • サポートの詳細については、 snowconvert-info@snowflake.com にメールでお問い合わせください。Snowflakeとサポート契約を結んでいる場合は、セールスエンジニアにご連絡ください。必要なサポートにご案内します。

SPRKPY1080

メッセージ:The value of SparkContext is replaced with 'session' variable.

カテゴリ: 警告

説明

SparkのコンテキストはSnowparkセッションを作成するsessionという変数に格納されます。

シナリオ

入力

このスニペットは SparkContext を説明します

## Input Code
from pyspark import SparkContext
from pyspark.sql import SparkSession

def example1():

    sc = SparkContext("local[*]", "TestApp")

    sc.setLogLevel("ALL")
    sc.setLogLevel("DEBUG")

出力

この出力コードでは、 SMA は、 PySpark.SparkContext を SparkSession で置き換えています。SMA は、「connection.json」ファイルの接続を置き換えるテンプレートも追加し、この構成をconnection_parameter変数にロードしていることに注意してください。

## Output Code
import logging
import sys
import json
from snowflake.snowpark import Session
from snowflake.snowpark import Session

def example1():
    jsonFile = open("connection.json")
    connection_parameter = json.load(jsonFile)
    jsonFile.close()
    #EWI: SPRKPY1080 => The value of SparkContext is replaced with 'session' variable.
    sc = Session.builder.configs(connection_parameter).getOrCreate()
    sc.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
    logging.basicConfig(stream = sys.stdout, level = logging.NOTSET)
    logging.basicConfig(stream = sys.stdout, level = logging.DEBUG)

推奨される修正

構成ファイル「connection.json」を必要な接続情報で更新する必要があります。

{
  "user": "my_user",
  "password": "my_password",
  "account": "my_account",
  "role": "my_role",
  "warehouse": "my_warehouse",
  "database": "my_database",
  "schema": "my_schema"
}

その他の推奨事項

SPRKPY1054

メッセージ: pyspark.sql.readwriter.DataFrameReader.format is not supported.

カテゴリ: 警告。

説明

This issue appears when the pyspark.sql.readwriter.DataFrameReader.format has an argument that is not supported by Snowpark.

シナリオ

There are some scenarios depending on the type of format you are trying to load. It can be a supported , or non-supported format.

シナリオ1

入力

このツールは、読み込む形式のタイプを分析します。

  • Csv

  • JSON

  • Parquet

  • Orc

The below example shows how the tool transforms the format method when passing a Csv value.

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df1 = spark.read.format('csv').load('/path/to/file')

出力

The tool transforms the format method into a Csv method call.

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()

df1 = spark.read.csv('/path/to/file')

推奨される修正

この場合、ツールは EWI を表示しません。

シナリオ2

入力

The below example shows how the tool transforms the format method when passing a Jdbc value.

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()

df2 = spark.read.format('jdbc') \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("dbtable", "employee") \
    .option("user", "root") \
    .option("password", "root") \
    .load()

出力

The tool shows the EWI SPRKPY1054 indicating that the value "jdbc" is not supported.

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()

#EWI: SPRKPY1054 => pyspark.sql.readwriter.DataFrameReader.format with argument value "jdbc" is not supported.
#EWI: SPRKPY1002 => pyspark.sql.readwriter.DataFrameReader.load is not supported

df2 = spark.read.format('jdbc') \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("dbtable", "employee") \
    .option("user", "root") \
    .option("password", "root") \
    .load()

推奨される修正

For the not supported scenarios, there is no specific fix since it depends on the files that are trying to be read.

シナリオ3

入力

The below example shows how the tool transforms the format method when passing a CSV, but using a variable instead.

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()

myFormat = 'csv'
df3 = spark.read.format(myFormat).load('/path/to/file')

出力

Since the tool can not determine the value of the variable in runtime, shows the EWI SPRKPY1054 indicating that the value "" is not supported.

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()

myFormat = 'csv'
#EWI: SPRKPY1054 => pyspark.sql.readwriter.DataFrameReader.format with argument value "" is not supported.
#EWI: SPRKPY1002 => pyspark.sql.readwriter.DataFrameReader.load is not supported
df3 = spark.read.format(myFormat).load('/path/to/file')

推奨される修正

As a workaround, you can check the value of the variable and add it as a string to the format call.

その他の推奨事項

SPRKPY1055

メッセージ: pyspark.sql.readwriter.DataFrameReader.option key value is not supported.

カテゴリ: 警告。

説明

This issue appears when the pyspark.sql.readwriter.DataFrameReader.option key value is not supported by SnowFlake.

このツールは、オプションコールのパラメーターを分析し、メソッド(CSV または JSON または PARQUET)によって、キー値にSnowparkでの等価値があるかないかを判断します。すべてのパラメーターに等価値がある場合、ツールは EWI を追加せず、キー値を等価値に置き換えます。そうでない場合、ツールは EWI を追加します。

等価性のリスト:

  • CSV の等価性:

Sparkオプションキー

Snowparkの等価性

sep

FIELD_DELIMITER

header

PARSE_HEADER

lineSep

RECORD_DELIMITER

pathGlobFilter

PATTERN

quote

FIELD_OPTIONALLY_ENCLOSED_BY

nullValue

NULL_IF

dateFormat

DATE_FORMAT

timestampFormat

TIMESTAMP_FORMAT

inferSchema

INFER_SCHEMA

delimiter

FIELD_DELIMITER

  • JSON の等価性:

Sparkオプションキー

Snowparkの等価性

dateFormat

DATE_FORMAT

timestampFormat

TIMESTAMP_FORMAT

pathGlobFilter

PATTERN

  • PARQUET の等価性:

Sparkオプションキー

Snowparkの等価性

pathGlobFilter

PATTERN

上記の表にないキーオプションは、Snowparkではサポートされていないか、同等のものがありません。この場合、ツールは EWI にパラメーター情報を追加し、チェーンから削除します。

シナリオ

以下のシナリオは、 CSV、 JSON、 PARQUET に適用されます。

There are a couple of scenarios depending on the value of the key used in the option method.

シナリオ1

入力

Below is an example of a option call using a equivalent key.

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

## CSV example:
spark.read.option("header", True).csv(csv_file_path)

## Json example:
spark.read.option("dateFormat", "dd-MM-yyyy").json(json_file_path)

## Parquet example:
spark.read.option("pathGlobFilter", "*.parquet").parquet(parquet_file_path)

出力

ツールは、キーを正しい等価値で変換します。

from snowflake.snowpark import Session

spark = Session.builder.getOrCreate()

## CSV example:
spark.read.option("PARSE_HEADER", True).csv(csv_file_path)

## Json example:
spark.read.option("DATE_FORMAT", "dd-MM-yyyy").json(json_file_path)

## Parquet example:
spark.read.option("PATTERN", "*.parquet").parquet(parquet_file_path)

推奨される修正

ツールはキーの値を変換するため、修正する必要はありません。

シナリオ2

入力

Below is an example of a option call using a non-equivalent key.

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

## CSV example:
spark.read.option("anotherKeyValue", "myVal").csv(csv_file_path)

## Json example:
spark.read.option("anotherKeyValue", "myVal").json(json_file_path)

## Parquet example:
spark.read.option("anotherKeyValue", "myVal").parquet(parquet_file_path)

出力

The tool adds the EWI SPRKPY1055 indicating the key is not supported and removes the option call.

from snowflake.snowpark import Session

spark = Session.builder.getOrCreate()

## CSV example:
#EWI: SPRKPY1055 => pyspark.sql.readwriter.DataFrameReader.option with key value "anotherKeyValue" is not supported.
spark.read.csv(csv_file_path)

## Json example:
#EWI: SPRKPY1055 => pyspark.sql.readwriter.DataFrameReader.option with key value "anotherKeyValue" is not supported.
spark.read.json(json_file_path)

## Parquet example:
#EWI: SPRKPY1055 => pyspark.sql.readwriter.DataFrameReader.option with key value "anotherKeyValue" is not supported.
spark.read.parquet(parquet_file_path)

推奨される修正

変換後の動作を確認することをお勧めします。

その他の推奨事項

  • 非等価パラメーターが存在する場合は、変換後の動作を確認することをお勧めします。

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1056

警告

この問題コードは、 廃止 されました。

メッセージ: pyspark.sql.readwriter.DataFrameReader.option argument _ <argument_name> _ is not a literal and can't be evaluated

カテゴリ: 警告

説明

This issue appears when the argument's key or value of the pyspark.sql.readwriter.DataFrameReader.option function is not a literal value (for example a variable). The SMA does a static analysis of your source code, and therefore it is not possible to evaluate the content of the argument.

シナリオ

入力

Below is an example of a use of the pyspark.sql.readwriter.DataFrameReader.option function that generates this EWI.

my_value = ...
my_option = ...

df1 = spark.read.option("dateFormat", my_value).format("csv").load('filename.csv')
df2 = spark.read.option(my_option, "false").format("csv").load('filename.csv')

出力

The SMA adds the EWI SPRKPY1056 to the output code to let you know that the argument of this function is not a literal value, and therefore it could not be evaluated by the SMA.

my_value = ...
my_option = ...

#EWI: SPRKPY1056 => pyspark.sql.readwriter.DataFrameReader.option argument "dateFormat" is not a literal and can't be evaluated
df1 = spark.read.option("dateFormat", my_value).format("csv").load('filename.csv')
#EWI: SPRKPY1056 => pyspark.sql.readwriter.DataFrameReader.option argument key is not a literal and can't be evaluated
df2 = spark.read.option(my_option, "false").format("csv").load('filename.csv')

推奨される修正

Even though the SMA was unable to evaluate the argument, it does not mean that it is not supported by Snowpark. Please make sure that the value of the argument is valid and equivalent in Snowpark by checking the documentation.

その他の推奨事項

SPRKPY1057

警告

This Issue Code has been deprecated since Spark Conversion Core Version 4.8.0

Message: PySpark Dataframe Option argument contains a value that is not a literal, therefore cannot be evaluated

カテゴリ: 警告。

説明

この問題コードは、廃止されました。古いバージョンをお使いの場合は、最新のバージョンにアップグレードしてください。

その他の推奨事項

SPRKPY1058

メッセージ: < method > with < key > Platform specific key is not supported.

カテゴリ: ConversionError

説明

The get and set methods from pyspark.sql.conf.RuntimeConfig are not supported with a Platform specific key.

シナリオ

Not all usages of get or set methods are going to have an EWI in the output code. This EWI appears when the tool detects the usage of these methods with a Platform specific key which is not supported.

シナリオ1

入力

Below is an example of the get or set methods with supported keys in Snowpark.

session.conf.set("use_constant_subquery_alias", False)
spark.conf.set("sql_simplifier_enabled", True)

session.conf.get("use_constant_subquery_alias")
session.conf.get("use_constant_subquery_alias")

出力

Snowparkではキーがサポートされているため、ツールは出力コードに EWI を追加しません。

session.conf.set("use_constant_subquery_alias", True)
session.conf.set("sql_simplifier_enabled", False)

session.conf.get("use_constant_subquery_alias")
session.conf.get("sql_simplifier_enabled")

推奨される修正

このシナリオに対する推奨される修正はありません。

シナリオ2

入力

以下はサポートされていないキーを使用した例です。

data =
    [
      ("John", 30, "New York"),
      ("Jane", 25, "San Francisco")
    ]

session.conf.set("spark.sql.shuffle.partitions", "50")
spark.conf.set("spark.yarn.am.memory", "1g")

session.conf.get("spark.sql.shuffle.partitions")
session = spark.conf.get("spark.yarn.am.memory")

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

出力

The tool adds this EWI SPRKPY1058 on the output code to let you know that these methods are not supported with a Platform specific key.

data =
    [
      ("John", 30, "New York"),
      ("Jane", 25, "San Francisco")
    ]

#EWI: SPRKPY1058 => pyspark.sql.conf.RuntimeConfig.set method with this "spark.sql.shuffle.partitions" Platform specific key is not supported.
spark.conf.set("spark.sql.shuffle.partitions", "50")
#EWI: SPRKPY1058 => pyspark.sql.conf.RuntimeConfig.set method with this "spark.yarn.am.memory" Platform specific key is not supported.
spark.conf.set("spark.yarn.am.memory", "1g")

#EWI: SPRKPY1058 => pyspark.sql.conf.RuntimeConfig.get method with this "spark.sql.shuffle.partitions" Platform specific key is not supported.
spark.conf.get("spark.sql.shuffle.partitions")
#EWI: SPRKPY1058 => pyspark.sql.conf.RuntimeConfig.get method with this "spark.yarn.am.memory" Platform specific key is not supported.
spark.conf.get("spark.yarn.am.memory")

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

推奨される修正

推奨される修正は、これらのメソッドを削除することです。

data =
    [
      ("John", 30, "New York"),
      ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

その他の推奨事項

SPRKPY1059

警告

This issue code has been deprecated since Spark Conversion Core Version 2.45.1

Message: pyspark.storagelevel.StorageLevel has a workaround, see documentation.

カテゴリ: 警告

説明

Currently, the use of StorageLevel is not required in Snowpark since Snowflake controls the storage. For more information, you can refer to the EWI SPRKPY1072

その他の推奨事項

  • アプリケーションを最新バージョンにアップグレードします。

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1060

メッセージ:The authentication mechanism is connection.json (template provided).

カテゴリ: 警告。

説明

This issue appears when the tool detects the usage of pyspark.conf.SparkConf.

シナリオ

入力

Snowparkでは認証コードが異なるため、ツールは使用を削除し、代わりに 接続構成ファイル(connection.json) を作成します。

from pyspark import SparkConf

my_conf = SparkConf(loadDefaults=True)

出力

The tool adds the EWI SPRKPY1060 indicating that the authentication mechanism is different.

#EWI: SPRKPY1002 => pyspark.conf.SparkConf is not supported
#EWI: SPRKPY1060 => The authentication mechanism is connection.json (template provided).
#my_conf = Session.builder.configs(connection_parameter).getOrCreate()

my_conf = None

推奨される修正

To create a connection it is necessary that you fill in the information in the connection.json file.

{
  "user": "<USER>",
  "password": "<PASSWORD>",
  "account": "<ACCOUNT>",
  "role": "<ROLE>",
  "warehouse": "<WAREHOUSE>",
  "database": "<DATABASE>",
  "schema": "<SCHEMA>"
}

その他の推奨事項

SPRKPY1061

メッセージ:Snowpark does not support unix_timestamp functions

カテゴリ: 警告

説明

In Snowpark, the first parameter is mandatory; the issue appears when the tool detects the usage of pyspark.sql.functions.unix_timestamp with no parameters.

シナリオ

入力

Below an example that calls the unix_timestamp method without parameters.

data = [["2015-04-08", "10"],["2015-04-10", "15"]]

df = spark.createDataFrame(data, ['dt', 'val'])
df.select(unix_timestamp()).show()

出力

The Snowpark signature for this function unix_timestamp(e: ColumnOrName, fmt: Optional["Column"] = None), as you can notice the first parameter it's required.

The tool adds this EWI SPRKPY1061 to let you know that function unix_timestamp with no parameters it's not supported in Snowpark.

data = [["2015-04-08", "10"],["2015-04-10", "15"]]

df = spark.createDataFrame(data, ['dt', 'val'])
#EWI: SPRKPY1061 => Snowpark does not support unix_timestamp functions with no parameters. See documentation for more info.
df.select(unix_timestamp()).show()

推奨される修正

回避策として、少なくともタイムスタンプ文字列の名前または列を追加することができます。

data = [["2015-04-08", "10"],["2015-04-10", "15"]]

df = spark.createDataFrame(data, ["dt", "val"])
df.select(unix_timestamp("dt")).show()

その他の推奨事項

SPRKPY1062

メッセージ:Snowpark does not support GroupedData.pivot without parameter "values".

カテゴリ: 警告

説明

This issue appears when the SMA detects the usage of the pyspark.sql.group.GroupedData.pivot function without the "values" parameter (the list of values to pivot on).

現時点では、Snowpark Pythonのピボット関数は、ピボットする値のリストを明示的に指定する必要があります。

シナリオ

シナリオ1

入力

The SMA detects an expression that matches the pattern dataFrame.groupBy("columnX").pivot("columnY") and the pivot does not have the values parameter.

df.groupBy("date").pivot("category").sum("amount")

出力

SMA は、「values」パラメーターのないピボット関数がサポートされていないことを示す EWI メッセージを追加します。

さらに、ピボット関数の2番目のパラメーターとして、列に翻訳される値のリストを計算するリスト読解を追加します。この操作は大容量のデータでは効率が悪いため、明示的に値を指定することをお勧めします。

#EWI: SPRKPY1062 => pyspark.sql.group.GroupedData.pivot without parameter 'values' is not supported. See documentation for more info.
df.groupBy("date").pivot("category", [v[0] for v in df.select("category").distinct().limit(10000).collect()]]).sum("amount")

推奨される修正

このシナリオでは、 SMA は、ピボット関数の2番目のパラメーターに、列に翻訳される値のリストを計算するリスト読解を追加します。

df = spark.createDataFrame([
      Row(category="Client_ID", date=2012, amount=10000),
      Row(category="Client_name",   date=2012, amount=20000)
  ])

df.groupBy("date").pivot("category", ["dotNET", "Java"]).sum("amount")
シナリオ2

入力

The SMA couldn't detect an expression that matches the pattern dataFrame.groupBy("columnX").pivot("columnY") and the pivot does not have the values parameter.

df1.union(df2).groupBy("date").pivot("category").sum("amount")

出力

SMA は、「values」パラメーターのないピボット関数がサポートされていないことを示す EWI メッセージを追加します。

#EWI: SPRKPY1062 => pyspark.sql.group.GroupedData.pivot without parameter 'values' is not supported. See documentation for more info.
df1.union(df2).groupBy("date").pivot("category").sum("amount")

推奨される修正

以下のように、ピボット対象となる個別の値のリストを追加します。

df = spark.createDataFrame([
      Row(course="dotNET", year=2012, earnings=10000),
      Row(course="Java",   year=2012, earnings=20000)
  ])

df.groupBy("year").pivot("course", ["dotNET", "Java"]).sum("earnings").show()

その他の推奨事項

  • ピボットの対象となる明確な値のリストを計算することは、大容量のデータセットでは効率的な操作ではなく、ブロック呼び出しになる可能性があります。ピボットする明確な値のリストを明示的に指定することを検討してください。

  • ピボットする個別の値のリストを明示的に指定しない(非推奨)場合は、ピボット関数の第2引数に以下のコードを追加して、ランタイム時に値を推測することができます*

[v[0] for v in <df>.select(<column>).distinct().limit(<count>).collect()]]

****Replace*** :code:`<df>` with the corresponding DataFrame, with the column to pivot and with the number of rows to select.

SPRKPY1063

メッセージ: pyspark.sql.pandas.functions.pandas_udf has workaround.

カテゴリ: 警告

説明

This issue appears when the tool detects the usage of pyspark.sql.pandas.functions.pandas_udf which has a workaround.

シナリオ

入力

pandas_udf関数は、大容量データを扱うユーザー定義関数を作成するために使用します。

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def modify_df(pdf):
    return pd.DataFrame({'result': pdf['col1'] + pdf['col2'] + 1})
df = spark.createDataFrame([(1, 2), (3, 4), (1, 1)], ["col1", "col2"])
new_df = df.groupby().apply(modify_df)

出力

SMA は、pandas_udfに回避策があることを示す EWI メッセージを追加します。

#EWI: SPRKPY1062 => pyspark.sql.pandas.functions.pandas_udf has a workaround, see documentation for more info
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)

def modify_df(pdf):
    return pd.DataFrame({'result': pdf['col1'] + pdf['col2'] + 1})

df = spark.createDataFrame([(1, 2), (3, 4), (1, 1)], ["col1", "col2"])

new_df = df.groupby().apply(modify_df)

推奨される修正

Specify explicitly the parameters types as a new parameter input_types, and remove functionType parameter if applies. Created function must be called inside a select statement.

@pandas_udf(
    return_type = schema,
    input_types = [PandasDataFrameType([IntegerType(), IntegerType()])]
)

def modify_df(pdf):
    return pd.DataFrame({'result': pdf['col1'] + pdf['col2'] + 1})

df = spark.createDataFrame([(1, 2), (3, 4), (1, 1)], ["col1", "col2"])

new_df = df.groupby().apply(modify_df) # You must modify function call to be a select and not an apply

その他の推奨事項

SPRKPY1064

Message: The *Spark element* does not apply since snowflake uses snowpipe mechanism instead.

カテゴリ: 警告

説明

この問題は、pyspark.streamingライブラリのいずれかの要素の使用をツールが検出した場合に発生します。

シナリオ

入力

以下は、この EWI をトリガーする要素の1つを使った例です。

from pyspark.streaming.listener import StreamingListener

var = StreamingListener.Java
var.mro()

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])
df.show()

出力

The SMA adds the EWI SPRKPY1064 on the output code to let you know that this function does not apply.

#EWI: SPRKPY1064 => The element does not apply since snowflake uses snowpipe mechanism instead.

var = StreamingListener.Java
var.mro()

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])
df.show()

推奨される修正

The SMA removes the import statement and adds the issue to the Issues.csv inventory, remove any usages of the Spark element.

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])
df.show()

その他の推奨事項

SPRKPY1065

メッセージ:The pyspark.context.SparkContext.broadcast does not apply since snowflake use data-clustering mechanism to compute the data.

カテゴリ: 警告

説明

This issue appears when the tool detects the usage of element pyspark.context.SparkContext.broadcast, which is not necessary due to the use of data-clustering of Snowflake.

入力コード

この例ではブロードキャスト変数が作成され、この変数によってすべてのノードでデータをより効率的に共有することができます。

sc = SparkContext(conf=conf_spark)

mapping = {1: 10001, 2: 10002}

bc = sc.broadcast(mapping)

出力コード

SMA は、ブロードキャストが不要であることを示す EWI メッセージを追加します。

sc = conf_spark

mapping = {1: 10001, 2: 10002}
#EWI: SPRKPY1065 => The element does not apply since snowflake use data-clustering mechanism to compute the data.

bc = sc.broadcast(mapping)

推奨される修正

使用されているすべてのpyspark.context.SparkContext.broadcastを削除します。

sc = conf_spark

mapping = {1: 10001, 2: 10002}

その他の推奨事項

SPRKPY1066

メッセージ:The Spark element does not apply since snowflake use micro-partitioning mechanism are created automatically.

カテゴリ: 警告

説明

この問題は、パーティションに関連する要素の使用をツールが検出した場合に発生します。

Those elements do not apply due the use of micro-partitions of Snowflake.

入力コード

In this example sortWithinPartitions it's used to create a partition in a DataFrame sorted by the specified column.

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
df.sortWithinPartitions("age", ascending=False)

出力コード

SMA は、Spark要素が不要であることを示す EWI メッセージを追加します。

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
#EWI: SPRKPY1066 => The element does not apply since snowflake use micro-partitioning mechanism are created automatically.
df.sortWithinPartitions("age", ascending=False)

推奨される修正

要素の使用を削除します。

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])

その他の推奨事項

SPRKPY1067

メッセージ:The pyspark.sql.functions.split has parameters that are not supported in Snowpark.

カテゴリ: 警告

説明

This issue appears when the tool detects the usage of pyspark.sql.functions.split with more than two parameters or a regex pattern as a parameter; both cases are not supported.

シナリオ

シナリオ1

入力コード

この例では、split関数に2つ以上のパラメーターがあります。

df.select(split(columnName, ",", 5))

出力コード

ツールは出力コードにこの EWI を追加して、2つ以上のパラメーターを持つ場合は、この関数がサポートされていないことを知らせます。

#EWI: SPRKPY1067 => Snowpark does not support split functions with more than two parameters or containing regex pattern. See documentation for more info.
df.select(split(columnName, ",", 5))

推奨される修正

2つのパラメーターのみを持つsplit関数を保持します。

df.select(split(columnName, ","))
シナリオ2

入力コード

この例では、split関数のパラメーターに正規表現パターンがあります。

df.select(split(columnName, "^([\d]+-[\d]+-[\d])"))

出力コード

ツールは出力コードにこの EWI を追加して、正規表現パターンをパラメーターとして持つ場合は、この関数がサポートされていないことを知らせます。

#EWI: SPRKPY1067 => Snowpark does not support split functions with more than two parameters or containing regex pattern. See documentation for more info.
df.select(split(columnName, "^([\d]+-[\d]+-[\d])"))

推奨される修正

The spark signature for this method functions.split(str: ColumnOrName, pattern: str, limit: int = - 1) not exactly match with the method in Snowpark functions.split(str: Union[Column, str], pattern: Union[Column, str]) so for now the scenario using regular expression does not have a recommended fix.

その他の推奨事項

SPRKPY1068

メッセージ: toPandas contains columns of type ArrayType that is not supported and has a workaround.

カテゴリ: 警告

説明

pyspark.sql.DataFrame.toPandas doesn't work properly If there are columns of type ArrayType. The workaround for these cases is converting those columns into a Python Dictionary by using json.loads method.

シナリオ

入力

ToPandas は、元の DataFrame のデータをPandas DataFrame として返します。

sparkDF = spark.createDataFrame([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0))
])

pandasDF = sparkDF.toPandas()

出力

ツールはこの EWI を追加して、 ArrayType のタイプの列がある場合に、toPandasがサポートされていないことを知らせますが、回避策があります。

sparkDF = spark.createDataFrame([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0))
])
#EWI: SPRKPY1068 => toPandas doesn't work properly If there are columns of type ArrayType. The workaround for these cases is converting those columns into a Python Dictionary by using json.loads method. example: df[colName] = json.loads(df[colName]).
pandasDF = sparkDF.toPandas()

推奨される修正

pandas_df = sparkDF.toPandas()

## check/convert all resulting fields from calling toPandas when they are of
## type ArrayType,
## they will be reasigned by converting them into a Python Dictionary
## using json.loads method​

for field in pandas_df.schema.fields:
    if isinstance(field.datatype, ArrayType):
        pandas_df[field.name] = pandas_df[field.name].apply(lambda x: json.loads(x) if x is not None else x)

その他の推奨事項

SPRKPY1069

メッセージ:If partitionBy parameter is a list, Snowpark will throw an error.

カテゴリ: 警告

説明

When there is a usage of pyspark.sql.readwriter.DataFrameWriter.parquet method where it comes to the parameter partitionBy, the tool shows the EWI.

This is because in Snowpark the DataFrameWriter.parquet only supports a ColumnOrSqlExpr as a partitionBy parameter.

シナリオ

シナリオ1

入力コード:

このシナリオでは、partitionByパラメーターはリストではありません。

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])

df.write.parquet(file_path, partitionBy="age")

出力コード:

The tool adds the EWI SPRKPY1069 to let you know that Snowpark throws an error if parameter is a list.

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])

#EWI: SPRKPY1069 => If partitionBy parameter is a list, Snowpark will throw and error.
df.write.parquet(file_path, partition_by = "age", format_type_options = dict(compression = "None"))

推奨される修正

There is not a recommended fix for this scenario because the tool always adds this EWI just in case the partitionBy parameter is a list. Remember that in Snowpark, only accepts cloud locations using a snowflake stage.

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])

stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
Session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {stage}').show()
Session.file.put(f"file:///path/to/data/file.parquet", f"@{stage}")

df.write.parquet(stage, partition_by = "age", format_type_options = dict(compression = "None"))
シナリオ2

入力コード:

このシナリオでは、partitionByパラメーターはリストです。

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])

df.write.parquet(file_path, partitionBy=["age", "name"])

出力コード:

The tool adds the EWI SPRKPY1069 to let you know that Snowpark throws an error if parameter is a list.

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])

#EWI: SPRKPY1069 => If partitionBy parameter is a list, Snowpark will throw and error.
df.write.parquet(file_path, partition_by = ["age", "name"], format_type_options = dict(compression = "None"))

推奨される修正

If the value of the parameter is a list, then replace it with a ColumnOrSqlExpr.

df.write.parquet(file_path, partition_by = sql_expr("age || name"), format_type_options = dict(compression = "None"))

その他の推奨事項

SPRKPY1070

Message: The mode argument is transformed to overwrite, check the variable value and set the corresponding bool value.

カテゴリ: 警告

説明

以下が使用されている場合、

The tool analyzes the parameter mode to determinate if the value is overwrite.

シナリオ

シナリオ1

入力コード

このシナリオでは、ツールはmodeパラメーターが対応するbool値をセットできることを検出します。

df.write.csv(file_path, mode="overwrite")

出力コード:

The SMA tool analyzes the mode parameter, determinate that the value is overwrite and set the corresponding bool value

df.write.csv(file_path, format_type_options = dict(compression = "None"), overwrite = True)

推奨される修正

このシナリオでは、ツールが対応する変換を実行したため、推奨される修正はありません。

シナリオ2:

入力コード

In this scenario the tool can not validate the value is overwrite.

df.write.csv(file_path, mode=myVal)

出力コード:

SMA は、「上書き」するようにモードパラメーターが変換されたことを示す EWI メッセージが追加されますが、これは、変数値を確認して正しいブール値を設定した方が良いことを知らせるためでもあります。

#EWI: SPRKPY1070 => The 'mode' argument is transformed to 'overwrite', check the variable value and set the corresponding bool value.
df.write.csv(file_path, format_type_options = dict(compression = "None"), overwrite = myVal)

推奨される修正

Check for the value of the parameter mode and add the correct value for the parameter overwrite.

df.write.csv(file_path, format_type_options = dict(compression = "None"), overwrite = True)

その他の推奨事項

SPRKPY1071

メッセージ:The function pyspark.rdd.RDD.getNumPartitions is not required in Snowpark.そのため、参照はすべて削除する必要があります。

カテゴリ: 警告

説明

This issue appears when the tool finds the use of the pyspark.rdd.RDD.getNumPartitions function. Snowflake uses micro-partitioning mechanism, so the use of this function is not required.

シナリオ

入力

getNumPartitions は、 RDD 上のパーティションの数を返します。

df = spark.createDataFrame([('2015-04-08',), ('5',), [Row(a=1, b="b")]], ['dt', 'num', 'row'])

print(df.getNumPartitions())

出力

ツールはこの EWI を追加して、 getNumPartitions が不要であることを知らせます。

df = spark.createDataFrame([('2015-04-08',), ('5',), [Row(a=1, b="b")]], ['dt', 'num', 'row'])
#EWI: SPRKPY1071 => The getNumPartitions are not required in Snowpark. So, you should remove all references.

print(df.getNumPartitions())

推奨される修正

この関数の使用をすべて削除します。

df = spark.createDataFrame([('2015-04-08',), ('5',), [Row(a=1, b="b")]], ['dt', 'num', 'row'])

その他の推奨事項

SPRKPY1072

メッセージ:The use of StorageLevel is not required in Snowpark.

カテゴリ: 警告。

説明

This issue appears when the tool finds the use of the StorageLevel class, which works like "flags" to set the storage level. Since Snowflake controls the storage, the use of this function is not required.

その他の推奨事項

SPRKPY1073

メッセージ: pyspark.sql.functions.udf without parameters or return type parameter are not supported

カテゴリ: 警告。

説明

This issue appears when the tool detects the usage of pyspark.sql.functions.udf as function or decorator and is not supported in two specifics cases, when it has no parameters or return type parameter.

シナリオ

シナリオ1

入力

Pysparkでは、ユーザー定義関数を入力や戻り値のタイプのパラメーターなしで作成できます。

from pyspark.sql import SparkSession, DataFrameStatFunctions
from pyspark.sql.functions import col, udf

spark = SparkSession.builder.getOrCreate()
data = [['Q1', 'Test 1'],
        ['Q2', 'Test 2'],
        ['Q3', 'Test 1'],
        ['Q4', 'Test 1']]

columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)

my_udf = udf(lambda s: len(s))
df.withColumn('Len Value' ,my_udf(col('Value')) ).show()

出力

Snowparkでは、Udf関数に入力や戻り値のタイプが必要です。これらは提供されておらず、 SMA は、これらのパラメーターを変換できません。

from snowflake.snowpark import Session, DataFrameStatFunctions
from snowflake.snowpark.functions import col, udf

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 'Test 1'],
        ['Q2', 'Test 2'],
        ['Q3', 'Test 1'],
        ['Q4', 'Test 1']]

columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1073 => pyspark.sql.functions.udf function without the return type parameter is not supported. See documentation for more info.
my_udf = udf(lambda s: len(s))

df.withColumn('Len Value' ,my_udf(col('Value')) ).show()

推奨される修正

To fix this scenario is required to add the import for the returns types of the input and output, and then the parameters of return*type and input_types[] on the udf function _my_udf*.

from snowflake.snowpark import Session, DataFrameStatFunctions
from snowflake.snowpark.functions import col, udf
from snowflake.snowpark.types import IntegerType, StringType

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 'Test 1'],
        ['Q2', 'Test 2'],
        ['Q3', 'Test 1'],
        ['Q4', 'Test 1']]

columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)

my_udf = udf(lambda s: len(s), return_type=IntegerType(), input_types=[StringType()])

df.with_column("result", my_udf(df.Value)).show()
シナリオ2

PySpark では、パラメーターなしで@udfデコレーターを使用できます。

入力

from pyspark.sql.functions import col, udf

spark = SparkSession.builder.getOrCreate()
data = [['Q1', 'Test 1'],
        ['Q2', 'Test 2'],
        ['Q3', 'Test 1'],
        ['Q4', 'Test 1']]

columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)

@udf()
def my_udf(str):
    return len(str)


df.withColumn('Len Value' ,my_udf(col('Value')) ).show()

出力

In Snowpark all the parameters of a udf decorator are required.

from snowflake.snowpark.functions import col, udf

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 'Test 1'],
        ['Q2', 'Test 2'],
        ['Q3', 'Test 1'],
        ['Q4', 'Test 1']]

columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)

#EWI: SPRKPY1073 => pyspark.sql.functions.udf decorator without parameters is not supported. See documentation for more info.

@udf()
def my_udf(str):
    return len(str)

df.withColumn('Len Value' ,my_udf(col('Value')) ).show()

推奨される修正

To fix this scenario is required to add the import for the returns types of the input and output, and then the parameters of return_type and input_types[] on the udf @udf decorator.

from snowflake.snowpark.functions import col, udf
from snowflake.snowpark.types import IntegerType, StringType

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 'Test 1'],
        ['Q2', 'Test 2'],
        ['Q3', 'Test 1'],
        ['Q4', 'Test 1']]

columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)

@udf(return_type=IntegerType(), input_types=[StringType()])
def my_udf(str):
    return len(str)

df.withColumn('Len Value' ,my_udf(col('Value')) ).show()

その他の推奨事項

SPRKPY1074

メッセージ:File has mixed indentation (spaces and tabs).

カテゴリ: 解析エラー。

説明

この問題は、ファイルにインデントが混在していることをツールが検出した場合に発生します。つまり、ファイルには空白とタブを組み合わせてインデントしたコード行があります。

シナリオ

入力

Pysparkでは、インデントレベルに空白とタブを混合することができます。

def foo():
    x = 5 # spaces
    y = 6 # tab

出力

SMA は、混合インデントマーカーを扱えません。Pythonコードファイル SMA でこれが検出されると、最初の行に EWI SPRKPY1074 が追加されます。

## EWI: SPRKPY1074 => File has mixed indentation (spaces and tabs).
## This file was not converted, so it is expected to still have references to the Spark API
def foo():
    x = 5 # spaces
    y = 6 # tabs

推奨される修正

解決策は、すべてのインデント記号を同じにすることです。

def foo():
  x = 5 # tab
  y = 6 # tab

その他の推奨事項

SPRKPY1075

カテゴリ

警告。

説明

スキーマに基づいたフィルターや検証が必要な場合は、何らかのロジックを導入することが必要になる場合があります。

入力

df.select(from_json(df.value, Schema))
df.select(from_json(schema=Schema, col=df.value))
df.select(from_json(df.value, Schema, option))

出力

#EWI: SPRKPY1075 => The parse_json does not apply schema validation, if you need to filter/validate based on schema you might need to introduce some logic.
df.select(parse_json(df.value))
#EWI: SPRKPY1075 => The parse_json does not apply schema validation, if you need to filter/validate based on schema you might need to introduce some logic.
df.select(parse_json(df.value))
#EWI: SPRKPY1075 => The parse_json does not apply schema validation, if you need to filter/validate based on schema you might need to introduce some logic.
df.select(parse_json(df.value))

from_json関数では、スキーマは推論のために渡されるのではなく、検証のために使われます。この例をご参照ください。

data = [
    ('{"name": "John", "age": 30, "city": "New York"}',),
    ('{"name": "Jane", "age": "25", "city": "San Francisco"}',)
]

df = spark.createDataFrame(data, ["json_str"])

例1: データ型を強制し、列名を変更する。

## Parse JSON column with schema
parsed_df = df.withColumn("parsed_json", from_json(col("json_str"), schema))

parsed_df.show(truncate=False)

## +------------------------------------------------------+---------------------------+
## |json_str                                              |parsed_json                |
## +------------------------------------------------------+---------------------------+
## |{"name": "John", "age": 30, "city": "New York"}       |{John, 30, New York}       |
## |{"name": "Jane", "age": "25", "city": "San Francisco"}|{Jane, null, San Francisco}|
## +------------------------------------------------------+---------------------------+
## notice that values outside of the schema were dropped and columns not matched are returned as null

例2: 特定の列を選択する。

## Define a schema with only the columns we want to use
partial_schema = StructType([
    StructField("name", StringType(), True),
    StructField("city", StringType(), True)
])

## Parse JSON column with partial schema
partial_df = df.withColumn("parsed_json", from_json(col("json_str"), partial_schema))

partial_df.show(truncate=False)

## +------------------------------------------------------+---------------------+
## |json_str                                              |parsed_json          |
## +------------------------------------------------------+---------------------+
## |{"name": "John", "age": 30, "city": "New York"}       |{John, New York}     |
## |{"name": "Jane", "age": "25", "city": "San Francisco"}|{Jane, San Francisco}|
## +------------------------------------------------------+---------------------+
## there is also an automatic filtering

推奨事項

  • For more support, you can email us at sma-support@snowflake.com. If you have a contract for support with Snowflake, reach out to your sales engineer and they can direct your support needs.

  • Useful tools PEP-8 and Reindent.

SPRKPY1076

Message: Parameters in pyspark.sql.readwriter.DataFrameReader methods are not supported. This applies to CSV, JSON and PARQUET methods.

カテゴリ: 警告。

説明

For the CSV, JSON and PARQUET methods on the pyspark.sql.readwriter.DataFrameReader object, the tool will analyze the parameters and add a transformation according to each case:

  • すべてのパラメーターは、Snowparkでの同等の名前と一致します。この場合、ツールはパラメーターを.option()呼び出しに変換します。この場合、パラメーターはこの EWI を追加しません。

  • Snowparkで同等のパラメーターと一致しない場合があります。この場合、ツールはパラメーター情報とともにこの EWI を追加し、メソッド呼び出しから削除します。

等価性のリスト:

  • CSV の等価性:

Sparkキー

Snowparkの等価性

sep

FIELD_DELIMITER

header

PARSE_HEADER

lineSep

RECORD_DELIMITER

pathGlobFilter

PATTERN

quote

FIELD_OPTIONALLY_ENCLOSED_BY

nullValue

NULL_IF

dateFormat

DATE_FORMAT

timestampFormat

TIMESTAMP_FORMAT

inferSchema

INFER_SCHEMA

delimiter

FIELD_DELIMITER

  • JSON の等価性:

Sparkキー

Snowparkの等価性

dateFormat

DATE_FORMAT

timestampFormat

TIMESTAMP_FORMAT

pathGlobFilter

PATTERN

  • PARQUET の等価性:

Sparkキー

Snowparkの等価性

pathGlobFilter

PATTERN

シナリオ

シナリオ1

入力

以下は CVS 用のいくつかの例です。

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('myapp').getOrCreate()

spark.read.csv("path3", None,None,None,None,None,None,True).show()

出力

変換後のコードでは、パラメーターはcvs関数の個々のオプションとして追加されます。

from snowflake.snowpark import Session

spark = Session.builder.app_name('myapp', True).getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})

#EWI: SPRKPY1076 => Some of the included parameters are not supported in the csv function, the supported ones will be added into a option method.
spark.read.option("FIELD_DELIMITER", None).option("PARSE_HEADER", True).option("FIELD_OPTIONALLY_ENCLOSED_BY", None).csv("path3").show()

シナリオ2

入力

以下は JSON 用のいくつかの例です。

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('myapp').getOrCreate()
spark.read.json("/myPath/jsonFile/", dateFormat='YYYY/MM/DD').show()

出力

変換後のコードでは、パラメーターはjson関数に個別のオプションとして追加されます。

from snowflake.snowpark import Session
spark = Session.builder.app_name('myapp', True).getOrCreate()
#EWI: SPRKPY1076 => Some of the included parameters are not supported in the json function, the supported ones will be added into a option method.

spark.read.option("DATE_FORMAT", 'YYYY/MM/DD').json("/myPath/jsonFile/").show()
シナリオ3

入力

以下は PARQUET 用のいくつかの例です。

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('myapp').getOrCreate()

spark.read.parquet("/path/to/my/file.parquet", pathGlobFilter="*.parquet").show()

出力

変換後のコードでは、パラメーターは個々のオプションとしてparquet関数に追加されます。

from snowflake.snowpark import Session

spark = Session.builder.app_name('myapp', True).getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})

#EWI: SPRKPY1076 => Some of the included parameters are not supported in the parquet function, the supported ones will be added into a option method.
#EWI: SPRKPY1029 => The parquet function require adjustments, in Snowpark the parquet files needs to be located in an stage. See the documentation for more info.

spark.read.option("PATTERN", "*.parquet").parquet("/path/to/my/file.parquet")

その他の推奨事項

SPRKPY1077

メッセージ: SQL embedded code cannot be processed.

カテゴリ: 警告。

説明

この問題は、Snowparkに変換できない SQL 埋め込みコードをツールが検出した場合に表示されます。

詳細情報については、 SQL 埋め込みコードのセクションをご確認ください。

シナリオ

入力

この例では、 SQL のコードを、Pyspark.sqlメソッドのパラメーターとして使用されるqueryという変数に埋め込んでいます。

query = f"SELECT * from myTable"
spark.sql(query)

出力

SMA は、 PySpark.sqlパラメーターが変数であり SQL コードではないことを検出するため、 EWI SPRKPY1077 メッセージが PySpark.sql行に追加されます。

query = f"SELECT * myTable"
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.
spark.sql(query)

その他の推奨事項

  • SQL の変換の場合、このコードは文字列値としてのみ、かつ補間なしでメソッドのパラメーターとして直接内部にある必要があります。PySpark.SQL 関数に SQL を送信し、Snowflakeでの機能を検証してください。

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1078

メッセージ:The argument of the pyspark.context.SparkContext.setLogLevel function is not a literal value and therefore could not be evaluated

カテゴリ: 警告

説明

This issue appears when the SMA detects the use of the pyspark.context.SparkContext.setLogLevel function with an argument that is not a literal value, for example, when the argument is a variable.

SMA はソースコードの静的分析を行うため、引数の内容を評価し、Snowparkで同等のものを決定することはできません。

シナリオ

入力

この例では、logLevelは変数my_log_levelで定義され、 setLogLevel メソッドのパラメーターとしてmy_log_levelが使用されます。

my_log_level = "WARN"
sparkSession.sparkContext.setLogLevel(my_log_level)

出力

SMA はログレベルパラメーターの引数を評価できないため、 EWI SPRKPY1078 は変換されたログの行の上に追加されます

my_log_level = "WARN"
#EWI: SPRKPY1078 => my_log_level is not a literal value and therefore could not be evaluated. Make sure the value of my_log_level is a valid level in Snowpark. Valid log levels are: logging.CRITICAL, logging.DEBUG, logging.ERROR, logging.INFO, logging.NOTSET, logging.WARNING
logging.basicConfig(stream = sys.stdout, level = my_log_level)

推奨される修正

Even though the SMA was unable to evaluate the argument, it will transform the pyspark.context.SparkContext.setLogLevel function into the Snowpark equivalent. Please make sure the value of the level argument in the generated output code is a valid and equivalent log level in Snowpark according to the table below:

PySpark ログレベル

Snowparkログレベル同等

ALL

logging.NOTSET

DEBUG

logging.DEBUG

ERROR

logging.ERROR

FATAL

logging.CRITICAL

INFO

logging.INFO

OFF

logging.WARNING

TRACE

logging.NOTSET

WARN

logging.WARNING

したがって、推奨される修正は次のようになります。

my_log_level = logging.WARNING
logging.basicConfig(stream = sys.stdout, level = my_log_level)

その他の推奨事項

SPRKPY1079

メッセージ:The argument of the pyspark.context.SparkContext.setLogLevel function is not a valid PySpark log level

カテゴリ: 警告

説明

This issue appears when the SMA detects the use of the pyspark.context.SparkContext.setLogLevel function with an argument that is not a valid log level in PySpark, and therefore an equivalent could not be determined in Snowpark.

シナリオ

入力

ここでは、ログレベルに「INVALID_LOG_LEVEL」が使用されていますが、これは有効なPysparkログレベルではありません。

sparkSession.sparkContext.setLogLevel("INVALID_LOG_LEVEL")

出力

SMA がログレベル「INVALID_LOG_LEVEL」を認識できない場合、 SMA が変換するにもかかわらず、 EWI SPRKPY1079 が追加され、問題が発生する可能性があることを示します。

#EWI: SPRKPY1079 => INVALID_LOG_LEVEL is not a valid PySpark log level, therefore an equivalent could not be determined in Snowpark. Valid PySpark log levels are: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
logging.basicConfig(stream = sys.stdout, level = logging.INVALID_LOG_LEVEL)

推奨される修正

Make sure that the log level used in the pyspark.context.SparkContext.setLogLevel function is a valid log level in PySpark or in Snowpark and try again.

logging.basicConfig(stream = sys.stdout, level = logging.DEBUG)

その他の推奨事項

SPRKPY1081

This issue code has been deprecated since Spark Conversion Core 4.12.0

メッセージ: pyspark.sql.readwriter.DataFrameWriter.partitionBy has a workaround.

カテゴリ: 警告

説明

The Pyspark.sql.readwriter.DataFrameWriter.partitionBy function is not supported. The workaround is to use Snowpark's copy_into_location instead. See the documentation for more info.

シナリオ

入力

This code will create a separate directories for each unique value in the FIRST_NAME column. The data is the same, but it's going to be stored in different directories based on the column.

df = session.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]], schema = ["FIRST_NAME", "LAST_NAME"])
df.write.partitionBy("FIRST_NAME").csv("/home/data")

This code will create a separate directories for each unique value in the FIRST_NAME column. The data is the same, but it's going to be stored in different directories based on the column.

出力コード

df = session.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]], schema = ["FIRST_NAME", "LAST_NAME"])
#EWI: SPRKPY1081 => The partitionBy function is not supported, but you can instead use copy_into_location as workaround. See the documentation for more info.
df.write.partitionBy("FIRST_NAME").csv("/home/data", format_type_options = dict(compression = "None"))

推奨される修正

In Snowpark, copy_into_location has a partition_by parameter that you can use instead of the partitionBy function, but it's going to require some manual adjustments, as shown in the following example:

Sparkコード:

df = session.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]], schema = ["FIRST_NAME", "LAST_NAME"])
df.write.partitionBy("FIRST_NAME").csv("/home/data")

手動で調整したSnowparkコード:

df = session.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]], schema = ["FIRST_NAME", "LAST_NAME"])
df.write.copy_into_location(location=temp_stage, partition_by=col("FIRST_NAME"), file_format_type="csv", format_type_options={"COMPRESSION": "NONE"}, header=True)

copy_into_location には以下のパラメーターがあります。

  • location: The Snowpark location only accepts cloud locations using an snowflake stage.

  • _partition_by_: これは列名または SQL 式です。したがって、colまたはsql_exprを使用して列または SQL に変換する必要があります。

その他の推奨事項

SPRKPY1082

メッセージ:The pyspark.sql.readwriter.DataFrameReader.load function is not supported.回避策としては、代わりにSnowpark DataFrameReader 形式固有のメソッド(avro csv、json、orc、parquet)を使用します。パスのパラメーターは、ステージロケーションにする必要があります。

カテゴリ: 警告

説明

The pyspark.sql.readwriter.DataFrameReader.load function is not supported. The workaround is to use Snowpark DataFrameReader methods instead.

シナリオ

The spark signature for this method DataFrameReader.load(path, format, schema, **options) does not exist in Snowpark. Therefore, any usage of the load function is going to have an EWI in the output code.

シナリオ1

入力

Below is an example that tries to load data from a CSV source.

path_csv_file = "/path/to/file.csv"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])

my_session.read.load(path_csv_file, "csv").show()
my_session.read.load(path_csv_file, "csv", schema=schemaParam).show()
my_session.read.load(path_csv_file, "csv", schema=schemaParam, lineSep="\r\n", dateFormat="YYYY/MM/DD").show()

出力

The SMA adds the EWI SPRKPY1082 to let you know that this function is not supported by Snowpark, but it has a workaround.

path_csv_file = "/path/to/file.csv"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.

my_session.read.load(path_csv_file, "csv").show()
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_csv_file, "csv", schema=schemaParam).show()
#EWI: The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_csv_file, "csv", schema=schemaParam, lineSep="\r\n", dateFormat="YYYY/MM/DD").show()

推奨される修正

As a workaround, you can use Snowpark DataFrameReader methods instead.

  • Fixing path and format parameters:

    • Replace the load method with csv method.

    • The first parameter path must be in a stage to make an equivalence with Snowpark.

Below is an example that creates a temporal stage and puts the file into it, then calls the CSV method.

path_csv_file = "/path/to/file.csv"

## Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.csv", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.csv"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])

my_session.read.csv(stage_file_path).show()
  • Fixing schema parameter:

    • The schema can be set by using the schema function as follows:

schemaParam = StructType([
        StructField("name", StringType(), True),
        StructField("city", StringType(), True)
    ])

df = my_session.read.schema(schemaParam).csv(temp_stage)
  • Fixing options parameter:

The options between spark and snowpark are not the same, in this case lineSep and dateFormat are replaced with RECORD_DELIMITER and DATE_FORMAT, the Additional recommendations section has a table with all the Equivalences.

Below is an example that creates a dictionary with RECORD_DELIMITER and DATE_FORMAT, and calls the options method with that dictionary.

optionsParam = {"RECORD_DELIMITER": "\r\n", "DATE_FORMAT": "YYYY/MM/DD"}
df = my_session.read.options(optionsParam).csv(stage)

シナリオ2

入力

Below is an example that tries to load data from a JSON source.

path_json_file = "/path/to/file.json"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])

my_session.read.load(path_json_file, "json").show()
my_session.read.load(path_json_file, "json", schema=schemaParam).show()
my_session.read.load(path_json_file, "json", schema=schemaParam, dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3").show()

出力

The SMA adds the EWI SPRKPY1082 to let you know that this function is not supported by Snowpark, but it has a workaround.

path_json_file = "/path/to/file.json"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.

my_session.read.load(path_json_file, "json").show()
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_json_file, "json", schema=schemaParam).show()
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_json_file, "json", schema=schemaParam, dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3").show()

推奨される修正

As a workaround, you can use Snowpark DataFrameReader methods instead.

  • Fixing path and format parameters:

    • Replace the load method with json method

    • The first parameter path must be in a stage to make an equivalence with Snowpark.

Below is an example that creates a temporal stage and puts the file into it, then calls the JSON method.

path_json_file = "/path/to/file.json"

## Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.json", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.json"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])

my_session.read.json(stage_file_path).show()
  • Fixing schema parameter:

    • The schema can be set by using the schema function as follows:

schemaParam = StructType([
        StructField("name", StringType(), True),
        StructField("city", StringType(), True)
    ])

df = my_session.read.schema(schemaParam).json(temp_stage)
  • Fixing options parameter:

The options between Spark and snowpark are not the same, in this case dateFormat and timestampFormat are replaced with DATE_FORMAT and TIMESTAMP_FORMAT, the Additional recommendations section has a table with all the Equivalences.

Below is an example that creates a dictionary with DATE_FORMAT and TIMESTAMP_FORMAT, and calls the options method with that dictionary.

optionsParam = {"DATE_FORMAT": "YYYY/MM/DD", "TIMESTAMP_FORMAT": "YYYY-MM-DD HH24:MI:SS.FF3"}
df = Session.read.options(optionsParam).json(stage)

シナリオ3

入力

Below is an example that tries to load data from a PARQUET source.

path_parquet_file = "/path/to/file.parquet"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])

my_session.read.load(path_parquet_file, "parquet").show()
my_session.read.load(path_parquet_file, "parquet", schema=schemaParam).show()
my_session.read.load(path_parquet_file, "parquet", schema=schemaParam, pathGlobFilter="*.parquet").show()

出力

The SMA adds the EWI SPRKPY1082 to let you know that this function is not supported by Snowpark, but it has a workaround.

path_parquet_file = "/path/to/file.parquet"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.

my_session.read.load(path_parquet_file, "parquet").show()
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_parquet_file, "parquet", schema=schemaParam).show()
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_parquet_file, "parquet", schema=schemaParam, pathGlobFilter="*.parquet").show()

推奨される修正

As a workaround, you can use Snowpark DataFrameReader methods instead.

  • Fixing path and format parameters:

    • Replace the load method with parquet method

    • The first parameter path must be in a stage to make an equivalence with Snowpark.

Below is an example that creates a temporal stage and puts the file into it, then calls the PARQUET method.

path_parquet_file = "/path/to/file.parquet"

## Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.parquet", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.parquet"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])

my_session.read.parquet(stage_file_path).show()
  • Fixing schema parameter:

    • The schema can be set by using the schema function as follows:

schemaParam = StructType([
        StructField("name", StringType(), True),
        StructField("city", StringType(), True)
    ])

df = my_session.read.schema(schemaParam).parquet(temp_stage)
  • Fixing options parameter:

The options between Spark and snowpark are not the same, in this case pathGlobFilter is replaced with PATTERN, the Additional recommendations section has a table with all the Equivalences.

Below is an example that creates a dictionary with PATTERN, and calls the options method with that dictionary.

optionsParam = {"PATTERN": "*.parquet"}
df = Session.read.options(optionsParam).parquet(stage)

その他の推奨事項

  • SparkとSnowparkの間のオプションは同じではありませんが、マッピングが可能であることを考慮します。

Sparkオプション

可能な値

Snowpark同等

説明

header

TrueまたはFalse

SKIP_HEADER = 1 / SKIP_HEADER = 0

ファイルの最初の行を列の名前として使用します。

delimiter

任意の1文字/複数文字のフィールドセパレーター

FIELD_DELIMITER

各列/フィールドのセパレーターとして単一/複数の文字を指定します。

sep

任意の1文字フィールドセパレーター

FIELD_DELIMITER

各列/フィールドのセパレーターとして1文字を指定します。

encoding

UTF-8、 UTF-16など。

ENCODING

CSV ファイルを指定されたエンコードタイプでデコードします。デフォルトのエンコーディングは UTF-8です

lineSep

任意の1文字の行セパレーター

RECORD_DELIMITER

ファイルの解析に使用する行セパレーターを定義します。

pathGlobFilter

ファイルパターン

PATTERN

パターンを定義して、そのパターンに一致するファイル名のファイルだけを読み込むようにします。

recursiveFileLookup

TrueまたはFalse

N/A

再帰的にディレクトリをスキャンしてファイルを読み込みます。このオプションのデフォルト値はFalseです。

quote

引用される1文字

FIELD_OPTIONALLY_ENCLOSED_BY

区切り文字/セパレーターが値の一部となり得るフィールドを含むフィールド/列を引用します。この文字をquoteAllオプションと併用すると、すべてのフィールドを引用します。このオプションのデフォルト値は二重引用符(")です。

nullValue

nullに置き換える文字列

NULL_IF

データフレームの読み書きの際にnull値を文字列で置き換えます。

dateFormat

有効な日付形式

DATE_FORMAT

日付形式を示す文字列を定義します。デフォルトの形式はyyyy-MM-ddです。

timestampFormat

有効なタイムスタンプ形式

TIMESTAMP_FORMAT

タイムスタンプ形式を示す文字列を定義します。デフォルトの形式はyyyy-MM-dd 'T'HH:mm:ssです。

escape

任意の1文字

ESCAPE

デフォルトのエスケープ文字(\)を上書きする、単一の文字をエスケープ文字として設定します。

inferSchema

TrueまたはFalse

INFER_SCHEMA

ファイルスキーマを自動検出します

mergeSchema

TrueまたはFalse

N/A

Snowflakeでは、infer_schemaがParquetファイル構造を決定するたびに発生するため、必要ありません。

  • For modifiedBefore / modifiedAfter option you can achieve the same result in Snowflake by using the metadata columns and then adding a filter like: df.filter(METADATA_FILE_LAST_MODIFIED > ‘some_date’).

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1083

メッセージ:The pyspark.sql.readwriter.DataFrameWriter.save function is not supported.回避策としては、代わりにSnowpark DataFrameWriter copy_into_locationメソッドを使用します。

カテゴリ: 警告

説明

The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. The workaround is to use Snowpark DataFrameWriter methods instead.

シナリオ

The spark signature for this method DataFrameWriter.save(path, format, mode, partitionBy, **options) does not exists in Snowpark. Therefore, any usage of the load function it's going to have an EWI in the output code.

シナリオ1

入力コード

Below is an example that tries to save data with CSV format.

path_csv_file = "/path/to/file.csv"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = my_session.createDataFrame(data, schema=["Name", "Age", "City"])

df.write.save(path_csv_file, format="csv")
df.write.save(path_csv_file, format="csv", mode="overwrite")
df.write.save(path_csv_file, format="csv", mode="overwrite", lineSep="\r\n", dateFormat="YYYY/MM/DD")
df.write.save(path_csv_file, format="csv", mode="overwrite", partitionBy="City", lineSep="\r\n", dateFormat="YYYY/MM/DD")

出力コード

The tool adds this EWI SPRKPY1083 on the output code to let you know that this function is not supported by Snowpark, but it has a workaround.

path_csv_file = "/path/to/file.csv"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = my_session.createDataFrame(data, schema=["Name", "Age", "City"])

#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_csv_file, format="csv")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_csv_file, format="csv", mode="overwrite")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_csv_file, format="csv", mode="overwrite", lineSep="\r\n", dateFormat="YYYY/MM/DD")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_csv_file, format="csv", mode="overwrite", partitionBy="City", lineSep="\r\n", dateFormat="YYYY/MM/DD")

推奨される修正

As a workaround you can use Snowpark DataFrameWriter methods instead.

  • Fixing path and format parameters:

    • Replace the load method with csv or copy_into_location method.

    • If you are using copy_into_location method, you need to specify the format with the file_format_type parameter.

    • The first parameter path must be in a stage to make an equivalence with Snowpark.

以下は、仮ステージを作成し、そこにファイルを入れ、上記のメソッドを呼び出す例です。

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.csv", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.csv"

## Using csv method
df.write.csv(stage_file_path)

## Using copy_into_location method
df.write.copy_into_location(stage_file_path, file_format_type="csv")

Below is an example that adds into the daisy chain the mode method with overwrite as a parameter.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Using csv method
df.write.mode("overwrite").csv(temp_stage)

## Using copy_into_location method
df.write.mode("overwrite").copy_into_location(temp_stage, file_format_type="csv")
  • Fixing partitionBy parameter:

    • Use the partition_by parameter from the CSV method, as follows:

Below is an example that used the partition_by parameter from the CSV method.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Using csv method
df.write.csv(temp_stage, partition_by="City")

## Using copy_into_location method
df.write.copy_into_location(temp_stage, file_format_type="csv", partition_by="City")
  • Fixing options parameter:

The options between spark and snowpark are not the same, in this case lineSep and dateFormat are replaced with RECORD_DELIMITER and DATE_FORMAT, the Additional recommendations section has table with all the Equivalences.

Below is an example that creates a dictionary with RECORD_DELIMITER and DATE_FORMAT, and calls the options method with that dictionary.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
optionsParam = {"RECORD_DELIMITER": "\r\n", "DATE_FORMAT": "YYYY/MM/DD"}

## Using csv method
df.write.csv(stage, format_type_options=optionsParam)

## Using copy_into_location method
df.write.csv(stage, file_format_type="csv", format_type_options=optionsParam)

シナリオ2

入力コード

Below is an example that tries to save data with JSON format.

path_json_file = "/path/to/file.json"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

df.write.save(path_json_file, format="json")
df.write.save(path_json_file, format="json", mode="overwrite")
df.write.save(path_json_file, format="json", mode="overwrite", dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3")
df.write.save(path_json_file, format="json", mode="overwrite", partitionBy="City", dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3")

出力コード

The tool adds this EWI SPRKPY1083 on the output code to let you know that this function is not supported by Snowpark, but it has a workaround.

path_json_file = "/path/to/file.json"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_json_file, format="json")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_json_file, format="json", mode="overwrite")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_json_file, format="json", mode="overwrite", dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_json_file, format="json", mode="overwrite", partitionBy="City", dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3")

推奨される修正

As a workaround you can use Snowpark DataFrameReader methods instead.

  • Fixing path and format parameters:

    • Replace the load method with json or copy_into_location method

    • If you are using copy_into_location method, you need to specify the format with the file_format_type parameter.

    • The first parameter path must be in a stage to make an equivalence with Snowpark.

以下は、仮ステージを作成し、そこにファイルを入れ、上記のメソッドを呼び出す例です。

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.json", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.json"

## Using json method
df.write.json(stage_file_path)

## Using copy_into_location method
df.write.copy_into_location(stage_file_path, file_format_type="json")

Below is an example that adds into the daisy chain the mode method with overwrite as a parameter.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Using json method
df.write.mode("overwrite").json(temp_stage)

## Using copy_into_location method
df.write.mode("overwrite").copy_into_location(temp_stage, file_format_type="json")
  • Fixing partitionBy parameter:

    • Use the partition_by parameter from the CSV method, as follows:

Below is an example that used the partition_by parameter from the CSV method.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Using json method
df.write.json(temp_stage, partition_by="City")

## Using copy_into_location method
df.write.copy_into_location(temp_stage, file_format_type="json", partition_by="City")
  • Fixing options parameter:

The options between spark and snowpark are not the same, in this case dateFormat and timestampFormat are replaced with DATE_FORMAT and TIMESTAMP_FORMAT, the Additional recommendations section has table with all the Equivalences.

Below is an example that creates a dictionary with DATE_FORMAT and TIMESTAMP_FORMAT, and calls the options method with that dictionary.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
optionsParam = {"DATE_FORMAT": "YYYY/MM/DD", "TIMESTAMP_FORMAT": "YYYY-MM-DD HH24:MI:SS.FF3"}

## Using json method
df.write.json(stage, format_type_options=optionsParam)

## Using copy_into_location method
df.write.copy_into_location(stage, file_format_type="json", format_type_options=optionsParam)

シナリオ3

入力コード

Below is an example that tries to save data with PARQUET format.

path_parquet_file = "/path/to/file.parquet"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

df.write.save(path_parquet_file, format="parquet")
df.write.save(path_parquet_file, format="parquet", mode="overwrite")
df.write.save(path_parquet_file, format="parquet", mode="overwrite", pathGlobFilter="*.parquet")
df.write.save(path_parquet_file, format="parquet", mode="overwrite", partitionBy="City", pathGlobFilter="*.parquet")

出力コード

The tool adds this EWI SPRKPY1083 on the output code to let you know that this function is not supported by Snowpark, but it has a workaround.

path_parquet_file = "/path/to/file.parquet"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_parquet_file, format="parquet")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_parquet_file, format="parquet", mode="overwrite")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_parquet_file, format="parquet", mode="overwrite", pathGlobFilter="*.parquet")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_parquet_file, format="parquet", mode="overwrite", partitionBy="City", pathGlobFilter="*.parquet")

推奨される修正

As a workaround you can use Snowpark DataFrameReader methods instead.

  • Fixing path and format parameters:

    • Replace the load method with parquet or copy_into_location method.

    • If you are using copy_into_location method, you need to specify the format with the file_format_type parameter.

    • The first parameter path must be in a stage to make an equivalence with Snowpark.

以下は、仮ステージを作成し、そこにファイルを入れ、上記のメソッドを呼び出す例です。

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.parquet", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.parquet"

## Using parquet method
df.write.parquet(stage_file_path)

## Using copy_into_location method
df.write.copy_into_location(stage, file_format_type="parquet")

Below is an example that adds into the daisy chain the mode method with overwrite as a parameter.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Using parquet method
df.write.mode("overwrite").parquet(temp_stage)

## Using copy_into_location method
df.write.mode("overwrite").copy_into_location(stage, file_format_type="parquet")
  • Fixing partitionBy parameter:

    • Use the partition_by parameter from the CSV method, as follows:

Below is an example that used the partition_by parameter from the parquet method.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Using parquet method
df.write.parquet(temp_stage, partition_by="City")

## Using copy_into_location method
df.write.copy_into_location(stage, file_format_type="parquet", partition_by="City")
  • Fixing options parameter:

The options between spark and snowpark are not the same, in this case pathGlobFilter is replaced with PATTERN, the Additional recommendations section has table with all the Equivalences.

Below is an example that creates a dictionary with PATTERN, and calls the options method with that dictionary.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
optionsParam = {"PATTERN": "*.parquet"}

## Using parquet method
df.write.parquet(stage, format_type_options=optionsParam)

## Using copy_into_location method
df.write.copy_into_location(stage, file_format_type="parquet", format_type_options=optionsParam)

その他の推奨事項

  • SparkとSnowparkの間のオプションは同じではありませんが、マッピングが可能であることを考慮します。

Sparkオプション

可能な値

Snowpark同等

説明

header

TrueまたはFalse

SKIP_HEADER = 1 / SKIP_HEADER = 0

ファイルの最初の行を列の名前として使用します。

delimiter

任意の1文字/複数文字のフィールドセパレーター

FIELD_DELIMITER

各列/フィールドのセパレーターとして単一/複数の文字を指定します。

sep

任意の1文字フィールドセパレーター

FIELD_DELIMITER

各列/フィールドのセパレーターとして1文字を指定します。

encoding

UTF-8、 UTF-16など。

ENCODING

CSV ファイルを指定されたエンコードタイプでデコードします。デフォルトのエンコーディングは UTF-8です

lineSep

任意の1文字の行セパレーター

RECORD_DELIMITER

ファイルの解析に使用する行セパレーターを定義します。

pathGlobFilter

ファイルパターン

PATTERN

パターンを定義して、そのパターンに一致するファイル名のファイルだけを読み込むようにします。

recursiveFileLookup

TrueまたはFalse

N/A

再帰的にディレクトリをスキャンしてファイルを読み込みます。このオプションのデフォルト値はFalseです。

quote

引用される1文字

FIELD_OPTIONALLY_ENCLOSED_BY

区切り文字/セパレーターが値の一部となり得るフィールドを含むフィールド/列を引用します。この文字をquoteAllオプションと併用すると、すべてのフィールドを引用します。このオプションのデフォルト値は二重引用符(")です。

nullValue

nullに置き換える文字列

NULL_IF

データフレームの読み書きの際にnull値を文字列で置き換えます。

dateFormat

有効な日付形式

DATE_FORMAT

日付形式を示す文字列を定義します。デフォルトの形式はyyyy-MM-ddです。

timestampFormat

有効なタイムスタンプ形式

TIMESTAMP_FORMAT

タイムスタンプ形式を示す文字列を定義します。デフォルトの形式はyyyy-MM-dd 'T'HH:mm:ssです。

escape

任意の1文字

ESCAPE

デフォルトのエスケープ文字(\)を上書きする、単一の文字をエスケープ文字として設定します。

inferSchema

TrueまたはFalse

INFER_SCHEMA

ファイルスキーマを自動検出します

mergeSchema

TrueまたはFalse

N/A

Snowflakeでは、infer_schemaがParquetファイル構造を決定するたびに発生するため、必要ありません。

  • For modifiedBefore / modifiedAfter option you can achieve the same result in Snowflake by using the metadata columns and then add a filter like: df.filter(METADATA_FILE_LAST_MODIFIED > ‘some_date’).

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1084

This issue code has been deprecated since Spark Conversion Core 4.12.0

メッセージ: pyspark.sql.readwriter.DataFrameWriter.option is not supported.

カテゴリ: 警告

説明

The pyspark.sql.readwriter.DataFrameWriter.option function is not supported.

シナリオ

入力コード

Below is an example using the option method, this method is used to add additional configurations when writing the data of a DataFrame.

path_csv_file = "/path/to/file.csv"
data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

df.write.option("header", True).csv(csv_file_path)
df.write.option("sep", ";").option("lineSep","-").csv(csv_file_path)

出力コード

The tool adds this EWI SPRKPY1084 on the output code to let you know that this function is not supported by Snowpark.

path_csv_file = "/path/to/file.csv"
data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

#EWI: SPRKPY1084 => The pyspark.sql.readwriter.DataFrameWriter.option function is not supported.

df.write.option("header", True).csv(csv_file_path)
#EWI: SPRKPY1084 => The pyspark.sql.readwriter.DataFrameWriter.option function is not supported.
df.write.option("sep", ";").option("lineSep","-").csv(csv_file_path)

推奨される修正

pyspark.sql.readwriter.DataFrameWriter.optionメソッドには推奨される修正がありません。

その他の推奨事項

SPRKPY1085

メッセージ: pyspark.ml.feature.VectorAssembler is not supported.

カテゴリ: 警告

説明

The pyspark.ml.feature.VectorAssembler is not supported.

シナリオ

入力コード

VectorAssembler は、複数の列を1つのベクトルに結合するために使用されます。

data = [
        (1, 10.0, 20.0),
        (2, 25.0, 30.0),
        (3, 50.0, 60.0)
    ]

df = SparkSession.createDataFrame(data, schema=["Id", "col1", "col2"])
vector = VectorAssembler(inputCols=["col1", "col2"], output="cols")

出力コード

The tool adds this EWI SPRKPY1085 on the output code to let you know that this class is not supported by Snowpark.

data = [
        (1, 10.0, 20.0),
        (2, 25.0, 30.0),
        (3, 50.0, 60.0)
    ]

df = spark.createDataFrame(data, schema=["Id", "col1", "col2"])
#EWI: SPRKPY1085 => The pyspark.ml.feature.VectorAssembler function is not supported.

vector = VectorAssembler(inputCols=["col1", "col2"], output="cols")

推奨される修正

pyspark.ml.feature.VectorAssembler には推奨される修正がありません。

その他の推奨事項

SPRKPY1086

メッセージ: pyspark.ml.linalg.VectorUDT is not supported.

カテゴリ: 警告

説明

The pyspark.ml.linalg.VectorUDT is not supported.

シナリオ

入力コード

VectorUDT は、 DataFrame のベクトル列を表すデータ型です。

data = [
        (1, Vectors.dense([10.0, 20.0])),
        (2, Vectors.dense([25.0, 30.0])),
        (3, Vectors.dense([50.0, 60.0]))
    ]

schema = StructType([
        StructField("Id", IntegerType(), True),
        StructField("VectorCol", VectorUDT(), True),
    ])

df = SparkSession.createDataFrame(data, schema=schema)

出力コード

The tool adds this EWI SPRKPY1086 on the output code to let you know that this function is not supported by Snowpark.

data = [
        (1, Vectors.dense([10.0, 20.0])),
        (2, Vectors.dense([25.0, 30.0])),
        (3, Vectors.dense([50.0, 60.0]))
    ]

#EWI: SPRKPY1086 => The pyspark.ml.linalg.VectorUDT function is not supported.
schema = StructType([
        StructField("Id", IntegerType(), True),
        StructField("VectorCol", VectorUDT(), True),
    ])

df = spark.createDataFrame(data, schema=schema)

推奨される修正

pyspark.ml.linalg.VectorUDT には推奨される修正がありません。

その他の推奨事項

SPRKPY1087

メッセージ:The pyspark.sql.dataframe.DataFrame.writeTo function is not supported, but it has a workaround.

カテゴリ: 警告。

説明

The pyspark.sql.dataframe.DataFrame.writeTo function is not supported. The workaround is to use Snowpark DataFrameWriter SaveAsTable method instead.

シナリオ

入力

Below is an example of a use of the pyspark.sql.dataframe.DataFrame.writeTo function, the dataframe df is written into a table name Personal_info.

df = spark.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]],
                                 schema=["FIRST_NAME", "LAST_NAME"])

df.writeTo("Personal_info")

出力

The SMA adds the EWI SPRKPY1087 to the output code to let you know that this function is not supported, but has a workaround.

df = spark.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]],
                                 schema=["FIRST_NAME", "LAST_NAME"])

#EWI: SPRKPY1087 => pyspark.sql.dataframe.DataFrame.writeTo is not supported, but it has a workaround.
df.writeTo("Personal_info")

推奨される修正

回避策としては、代わりにSnowpark DataFrameWriter SaveAsTable メソッドを使用します。

df = spark.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]],
                                 schema=["FIRST_NAME", "LAST_NAME"])

df.write.saveAsTable("Personal_info")

その他の推奨事項

SPRKPY1088

メッセージ:The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.

カテゴリ: 警告

説明

The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so validation might be needed to ensure that the behavior is correct.

シナリオ

サポートされているかどうかのオプションや、ファイルの書き込みに使用される形式によって、いくつかのシナリオがあります。

シナリオ1

入力

Below is an example of the usage of the method option, adding a sep option, which is currently supported.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])

df.write.option("sep", ",").csv("some_path")

出力

The tool adds the EWI SPRKPY1088 indicating that it is required validation.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.option("sep", ",").csv("some_path")

推奨される修正

Snowpark API はこのパラメーターをサポートしているため、唯一のアクションは移行後の動作を確認することです。サポートされるパラメーターについては、 等価表 をご参照ください。

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.option("sep", ",").csv("some_path")
シナリオ2

入力

Here the scenario shows the usage of option, but adds a header option, which is not supported.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])

df.write.option("header", True).csv("some_path")

出力

The tool adds the EWI SPRKPY1088 indicating that it is required validation is needed.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.option("header", True).csv("some_path")

推奨される修正

For this scenario it is recommended to evaluate the Snowpark format type options to see if it is possible to change it according to your needs. Also, check the behavior after the change.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.csv("some_path")
シナリオ3

入力

This scenario adds a sep option, which is supported and uses the JSON method.

  • Note: this scenario also applies for PARQUET.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])

df.write.option("sep", ",").json("some_path")

出力

The tool adds the EWI SPRKPY1088 indicating that it is required validation is needed.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.option("sep", ",").json("some_path")

推奨される修正

The file format JSON does not support the parameter sep, so it is recommended to evaluate the snowpark format type options to see if it is possible to change it according to your needs. Also, check the behavior after the change.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.json("some_path")

その他の推奨事項

  • Since there are some not supported parameters, it is recommended to check the table of equivalences and check the behavior after the transformation.

  • 等価表:

PySpark オプション

SnowFlake オプション

サポートされているファイル形式

説明

SEP

FIELD_DELIMITER

CSV

入力ファイルのフィールドを区切る、1バイトまたは複数バイトの文字。

LINESEP

RECORD_DELIMITER

CSV

入力ファイルの記録を区切る、1つ以上の文字。

QUOTE

FIELD_OPTIONALLY_ENCLOSED_BY

CSV

文字列を囲むのに使用される文字。

NULLVALUE

NULL_IF

CSV

SQL NULL との間の変換に使用される文字列。

DATEFORMAT

DATE_FORMAT

CSV

ロードするデータファイルの日付値の形式を定義する文字列。

TIMESTAMPFORMAT

TIMESTAMP_FORMAT

CSV

ロードするデータファイルのタイムスタンプ値の形式を定義する文字列。

使用されたパラメーターがリストにない場合、 API はエラーをスローします。

SPRKPY1089

メッセージ:The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.

カテゴリ: 警告

説明

The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so validation might be needed to ensure that the behavior is correct.

シナリオ

オプションがサポートされているかどうか、またはファイルの書き込みに使用される形式によって、いくつかのシナリオがあります。

シナリオ1

入力

Below is an example of the usage of the method options, adding the options sep and nullValue, which are currently supported.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])

df.write.options(nullValue="myVal", sep=",").csv("some_path")

出力

The tool adds the EWI SPRKPY1089 indicating that it is required validation.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.options(nullValue="myVal", sep=",").csv("some_path")

推奨される修正

Snowpark API はこれらのパラメーターをサポートしているため、唯一のアクションは移行後の動作を確認することです。サポートされるパラメーターについては、 等価表 をご参照ください。

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.options(nullValue="myVal", sep=",").csv("some_path")
シナリオ2

入力

Here the scenario shows the usage of options, but adds a header option, which is not supported.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])

df.write.options(header=True, sep=",").csv("some_path")

出力

The tool adds the EWI SPRKPY1089 indicating that it is required validation is needed.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.options(header=True, sep=",").csv("some_path")

推奨される修正

For this scenario it is recommended to evaluate the Snowpark format type options to see if it is possible to change it according to your needs. Also, check the behavior after the change.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.csv("some_path")
シナリオ3

入力

This scenario adds a sep option, which is supported and uses the JSON method.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])

df.write.options(nullValue="myVal", sep=",").json("some_path")

出力

The tool adds the EWI SPRKPY1089 indicating that it is required validation is needed.

  • Note: this scenario also applies for PARQUET.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.options(nullValue="myVal", sep=",").json("some_path")

推奨される修正

The file format JSON does not support the parameter sep, so it is recommended to evaluate the snowpark format type options to see if it is possible to change it according to your needs. Also, check the behavior after the change.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.json("some_path")

その他の推奨事項

  • Since there are some not supported parameters, it is recommended to check the table of equivalences and check the behavior after the transformation.

  • 等価表:

Snowparkはいくつかのパラメーターについて、 等価 のリストをサポートすることができます。

PySpark オプション

SnowFlake オプション

サポートされているファイル形式

説明

SEP

FIELD_DELIMITER

CSV

入力ファイルのフィールドを区切る、1バイトまたは複数バイトの文字。

LINESEP

RECORD_DELIMITER

CSV

入力ファイルの記録を区切る、1つ以上の文字。

QUOTE

FIELD_OPTIONALLY_ENCLOSED_BY

CSV

文字列を囲むのに使用される文字。

NULLVALUE

NULL_IF

CSV

SQL NULL との間の変換に使用される文字列。

DATEFORMAT

DATE_FORMAT

CSV

ロードするデータファイルの日付値の形式を定義する文字列。

TIMESTAMPFORMAT

TIMESTAMP_FORMAT

CSV

ロードするデータファイルのタイムスタンプ値の形式を定義する文字列。

使用されたパラメーターがリストにない場合、 API はエラーをスローします。

SPRKPY1101

カテゴリ

解析エラー。

説明

ツールが解析エラーを認識すると、そのエラーから回復しようとし、次の行で処理を続行します。これらの場合、行にエラーとコメントが表示されます。

この例では、スペースとタブの不一致エラーがどのように処理されるかを示しています。

入力コード

def foo():
    x = 5 # Spaces
     y = 6 # Tab

def foo2():
    x=6
    y=7

出力コード

def foo():
    x = 5 # Spaces
## EWI: SPRKPY1101 => Unrecognized or invalid CODE STATEMENT @(3, 2). Last valid token was '5' @(2, 9), failed token 'y' @(3, 2)
## y = 6 # Tab

def foo2():
    x=6
    y=7

推奨事項

  • コメントされている行を修正してみてください。

  • For more support, email us at sma-support@snowflake.com. If you have a support contract with Snowflake, reach out to your sales engineer, who can direct your support needs.