Snowpark Migration Accelerator: Problemcodes für Python

SPRKPY1000

Meldung: Die Spark-Core-Version des Quellprojekts ist xx.xx:xx.x.x, die von Snowpark unterstützte Spark-Core-Version ist 2.12:3.1.2, so dass es funktionale Unterschiede zwischen den vorhandenen Zuordnungen geben kann.

Kategorie: Warnung.

Beschreibung

Dieses Problem tritt auf, wenn die Pyspark-Version Ihres Quellcodes nicht unterstützt wird. Das bedeutet, dass es funktionale Unterschiede zwischen den bestehenden Zuordnungen geben kann.

Zusätzliche Empfehlungen

  • Die pyspark-Version, die von SMA auf Kompatibilität mit Snowpark geprüft wird, reicht von 2.12 bis 3.1.2. Wenn Sie eine Version außerhalb dieses Bereichs verwenden, kann das Tool inkonsistente Ergebnisse liefern. Sie können die Version des Quellcodes, den Sie scannen, ändern.

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1001

Message**:** This code section has parsing errors

Category**:** Parsing error.

Beschreibung

Ein Parsing-Fehler wird vom Snowpark Migration Accelerator (SMA) gemeldet, wenn er den Code in einer Datei nicht richtig lesen oder verstehen kann (er kann die Datei nicht richtig „parsen“). Dieser Problemcode wird angezeigt, wenn eine Datei einen oder mehrere Parsing-Fehler aufweist.

Szenario

Eingabe: Die EWI-Meldung wird angezeigt, wenn der Code eine ungültige Syntax hat, zum Beispiel:

def foo():
    x = %%%%%%1###1

Ausgabe: SMA Finden Sie einen Parsing-Fehler und kommentieren Sie den Parsing-Fehler, indem Sie die entsprechende EWI-Meldung hinzufügen:

def foo():
    x
## EWI: SPRKPY1101 => Unrecognized or invalid CODE STATEMENT @(2, 7). Last valid token was 'x' @(2, 5), failed token '=' @(2, 7)
##      = %%%%%%1###1

Zusätzliche Empfehlungen

  • Check that the file contains valid Python code. (You can use the issues.csv file to find all files with this EWI code to determine which file(s) were not processed by the tool due to parsing error(s).) Many parsing errors occur because only part of the code is input into the tool, so it’s bets to ensure that the code will run in the source. If it is valid, report that you encountered a parsing error using the Report an Issue option in the SMA. Include the line of code that was causing a parsing error in the description when you file this issue.

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1002

Message**:** < element > is not supported,Spark element is not supported.

Category**:** Conversion error.

Beschreibung

Dieses Problem tritt auf, wenn das Tool die Verwendung eines Elements erkennt, das in Snowpark nicht unterstützt wird und für das kein eigener Fehlercode existiert. Dies ist der allgemeine Fehlercode, der von SMA für ein nicht unterstütztes Element verwendet wird.

Zusätzliche Empfehlungen

  • Auch wenn die Option oder das Element in der Nachricht nicht unterstützt wird, bedeutet dies nicht, dass keine Lösung gefunden werden kann. Es bedeutet nur, dass das Tool selbst keine Lösung finden kann.

  • Wenn Sie auf ein nicht unterstütztes Element aus einer pyspark.ml Bibliothek gestoßen sind, sollten Sie eine Alternative in Erwägung ziehen. Es gibt weitere Leitfäden, die sich mit Problemen im Zusammenhang mit ml befassen, wie z. B. dieser von Snowflake.

  • Prüfen Sie, ob der Quellcode die richtige Syntax hat. (Sie können die Datei issues.csv verwenden, um festzustellen, wo der/die Konvertierungsfehler auftritt/auftreten) Wenn die Syntax korrekt ist, melden Sie einen Konvertierungsfehler für ein bestimmtes Element mit der Option „Problem melden“ auf SMA. Geben Sie die Codezeile, die den Fehler verursacht hat, in der Beschreibung an, wenn Sie dieses Problem melden.

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1003

Message**:** An error occurred when loading the symbol table.

Category**:** Conversion error.

Beschreibung

Dieses Problem tritt auf, wenn ein Fehler bei der Verarbeitung der Symbole in der Symboltabelle auftritt. Die Symboltabelle ist Teil der zugrunde liegenden Architektur von SMA und ermöglicht komplexere Konvertierungen. Dieser Fehler könnte auf eine unerwartete Anweisung im Quellcode zurückzuführen sein.

Zusätzliche Empfehlungen

  • This is unlikely to be an error in the source code itself, but rather is an error in how the tool processes the source code. The best resolution would be to post an issue in the SMA.

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1004

Message**:** The symbol table could not be loaded.

Category**:** Parsing error.

Beschreibung

Dieses Problem tritt auf, wenn ein unerwarteter Fehler bei der Ausführung des Tools auftritt. Da die Symboltabelle nicht geladen werden kann, kann das Tool den Bewertungs- oder Konvertierungsprozess nicht starten.

Zusätzliche Empfehlungen

SPRKPY1005

Warnung

This issue code has been deprecated since Spark Conversion Core Version 4.8.0

Message**:** pyspark.conf.SparkConf is not required

Category**:** Warning.

Beschreibung

This issue appears when the tool detects the usage of pyspark.conf.SparkConf which is not required.

Szenario

Eingabe

SparkConf kann ohne Parameter oder mit loadDefaults aufgerufen werden.

from pyspark import SparkConf

my_conf = SparkConf(loadDefaults=True)

Ausgabe

For both cases (with or without parameters) SMA creates a Snowpark Session.builder object:

#EWI: SPRKPY1005 => pyspark.conf.SparkConf is not required
#from pyspark import SparkConf
pass

#EWI: SPRKPY1005 => pyspark.conf.SparkConf is not required
my_conf = Session.builder.configs({"user" : "my_user", "password" : "my_password", "account" : "my_account", "role" : "my_role", "warehouse" : "my_warehouse", "database" : "my_database", "schema" : "my_schema"}).create()

Zusätzliche Empfehlungen

  • Hier wird ein unnötiger Parameter entfernt und ein Warnkommentar eingefügt. Der Benutzer muss nichts weiter tun.

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1006

Warnung

This issue code has been deprecated since Spark Conversion Core Version 4.8.0

Message**:** pyspark.context.SparkContext is not required

Category**:** Warning.

Beschreibung

This issue appears when the tool detects the usage of pyspark.context.SparkContext, which is not required in Snowflake.

Szenario

Eingabe

In diesem Beispiel gibt es zwei Kontexte, eine Verbindung zu einem Spark-Cluster herzustellen

from pyspark import SparkContext

sql_context1 = SparkContext(my_sc1)
sql_context2 = SparkContext(sparkContext=my_sc2)

Ausgabe

Da es auf Snowflake keine Cluster gibt, ist der Kontext nicht erforderlich. Beachten Sie, dass die Variablen my_sc1 und my_sc2, die Spark-Eigenschaften enthalten, möglicherweise nicht erforderlich sind oder angepasst werden müssen, um den Code zu korrigieren.

from snowflake.snowpark import Session
#EWI: SPRKPY1006 => pyspark.sql.context.SparkContext is not required
sql_context1 = my_sc1
#EWI: SPRKPY1006 => pyspark.sql.context.SparkContext is not required

sql_context2 = my_sc2

Zusätzliche Empfehlungen

  • Hier wird ein unnötiger Parameter entfernt und ein Warnkommentar eingefügt. Der Benutzer muss nichts unternehmen.

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1007

Warnung

This issue code has been deprecated since Spark Conversion Core Version 4.8.0

Message**:** pyspark.sql.context.SQLContext is not required

Category**:** Warning.

Beschreibung

This issue appears when the tool detects the usage of pyspark.sql.context.SQLContext, which is not required.

Szenario

Eingabe

Hier haben wir ein Beispiel mit verschiedenen SparkContext-Überlastungen.

from pyspark import SQLContext

my_sc1 = SQLContext(myMaster, myAppName, mySparkHome, myPyFiles, myEnvironment, myBatctSize, mySerializer, my_conf1)
my_sc2 = SQLContext(conf=my_conf2)
my_sc3 = SQLContext()

Ausgabe

Der Ausgabecode hat die Zeile für pyspark.SQLContext auskommentiert und ersetzt die Szenarien durch eine Referenz auf eine Konfiguration. Beachten Sie, dass die Variablen my_sc1 und my_sc2, die Spark-Eigenschaften enthalten, möglicherweise nicht benötigt werden oder angepasst werden müssen, um den Code zu korrigieren.

#EWI: SPRKPY1007 => pyspark.sql.context.SQLContext is not required
#from pyspark import SQLContext
pass

#EWI: SPRKPY1007 => pyspark.sql.context.SQLContext is not required
sql_context1 = my_sc1
#EWI: SPRKPY1007 => pyspark.sql.context.SQLContext is not required
sql_context2 = my_sc2

Zusätzliche Empfehlungen

  • Dies ist ein unnötiger Parameter und wird mit einem Warnkommentar im Quellcode entfernt. Der Benutzer muss nichts unternehmen.

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1008

Meldung: pyspark.sql.context.HiveContext ist nicht erforderlich

Kategorie: Warnung.

Beschreibung

This issue appears when the tool detects the usage of pyspark.sql.context.HiveContext, which is not required.

Szenario

Eingabe

In diesem Beispiel wird eine Verbindung zu einem Hive-Speicher erstellt.

from pyspark.sql import HiveContext
hive_context = HiveContext(sc)
df = hive_context.table("myTable")
df.show()

Ausgabe

In Snowflake there are not Hive stores, so the Hive Context is not required, You can still use parquet files on Snowflake please check this tutorial to learn how.

#EWI: SPRKPY1008 => pyspark.sql.context.HiveContext is not required
hive_context = sc
df = hive_context.table("myTable")
df.show()

the sc variable refers to a Snow Park Session Object

Empfohlene Korrektur

For the output code in the example you should add the Snow Park Session Object similar to this code:

## Here manually we can add the Snowpark Session object via a json config file called connection.json
import json
from snowflake.snowpark import Session
jsonFile = open("connection.json")
connection_parameter = json.load(jsonFile)
jsonFile.close()
sc = Session.builder.configs(connection_parameter).getOrCreate()

hive_context = sc
df = hive_context.table("myTable")
df.show()

Zusätzliche Empfehlungen

SPRKPY1009

Message**:** pyspark.sql.dataframe.DataFrame.approxQuantile has a workaround

Category**:** Warning.

Beschreibung

This issue appears when the tool detects the usage of pyspark.sql.dataframe.DataFrame.approxQuantile which has a workaround.

Szenario

Eingabe

It’s important understand that Pyspark uses two different approxQuantile functions, here we use the DataFrame approxQuantile version

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = [['Sun', 10],
        ['Mon', 64],
        ['Thr', 12],
        ['Wen', 15],
        ['Thu', 68],
        ['Fri', 14],
        ['Sat', 13]]

columns = ['Day', 'Ammount']
df = spark.createDataFrame(data, columns)
df.approxQuantile('Ammount', [0.25, 0.5, 0.75], 0)

Ausgabe

SMA gibt die EWI SPRKPY1009 über die Zeile zurück, in der approxQuantile verwendet wird, so dass Sie damit feststellen können, wo Sie korrigieren müssen.

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Sun', 10],
        ['Mon', 64],
        ['Thr', 12],
        ['Wen', 15],
        ['Thu', 68],
        ['Fri', 14],
        ['Sat', 13]]

columns = ['Day', 'Ammount']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1009 => pyspark.sql.dataframe.DataFrame.approxQuantile has a workaround, see documentation for more info
df.approxQuantile('Ammount', [0.25, 0.5, 0.75], 0)

Empfohlene Korrektur

Use Snowpark approxQuantile method. Some parameters don’t match so they require some manual adjustments. for the output code’s example a recommended fix could be:

from snowflake.snowpark import Session
...
df = spark.createDataFrame(data, columns)

df.stat.approx_quantile('Ammount', [0.25, 0.5, 0.75])

pyspark.sql.dataframe.DataFrame.approxQuantile’s relativeError-Parameter existiert nicht in SnowPark.

Zusätzliche Empfehlungen

SPRKPY1010

Meldung: pyspark.sql.dataframe.DataFrame.checkpoint hat eine Problemumgehung

Kategorie: Warnung.

Beschreibung

This issue appears when the tool detects the usage of pyspark.sql.dataframe.DataFrame.checkpoint which has a workaround.

Szenario

Eingabe

In PySpark werden Checkpoints dazu verwendet, den logischen Plan eines Datenframes abzuschneiden, um das Anwachsen eines logischen Plans zu vermeiden.

import tempfile
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = [['Q1', 300000],
        ['Q2', 60000],
        ['Q3', 500002],
        ['Q4', 130000]]

columns = ['Quarter', 'Score']
df = spark.createDataFrame(data, columns)
with tempfile.TemporaryDirectory() as d:
    spark.sparkContext.setCheckpointDir("/tmp/bb")
    df.checkpoint(False)

Ausgabe

SMA returns the EWI SPRKPY1010 over the line where approxQuantile is used, so you can use to identify where to fix. Note that also marks the setCheckpointDir as unsupported, but a checpointed directory is not required for the fix.

import tempfile
from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 300000],
        ['Q2', 60000],
        ['Q3', 500002],
        ['Q4', 130000]]

columns = ['Quarter', 'Score']
df = spark.createDataFrame(data, columns)
with tempfile.TemporaryDirectory() as d:
    #EWI: SPRKPY1002 => pyspark.context.SparkContext.setCheckpointDir is not supported
    spark.setCheckpointDir("/tmp/bb")
    #EWI: SPRKPY1010 => pyspark.sql.dataframe.DataFrame.checkpoint has a workaround, see documentation for more info
    df.checkpoint(False)

Empfohlene Korrektur

Snowpark macht explizite Checkpoints überflüssig: Dies liegt daran, dass Snowpark mit SQL-basierten Operationen arbeitet, die von der Snowflake Abfrageoptimierungs-Engine optimiert werden, so dass keine unerfüllten Berechnungen oder außer Kontrolle geratene logische Pläne erforderlich sind.

However there could be scenarios where you would require persist the result of a computation on a dataframe. In this scenarios you can save materialize the results by writing the dataframe on a Snowflake Table or in a Snowflake Temporary Table.

  • Durch die Verwendung einer permanenten Tabelle oder des berechneten Ergebnisses kann jederzeit, auch nach dem Ende der Sitzung, darauf zugegriffen werden.

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 300000],
        ['Q2', 60000],
        ['Q3', 500002],
        ['Q4', 130000]]

columns = ['Quarter', 'Score']
df = spark.createDataFrame(data, columns)
df.write.save_as_table("my_table", table_type="temporary") # Save the dataframe into Snowflake table "my_table".
df2 = Session.table("my_table") # Now I can access the stored result quering the table "my_table"
  • Eine alternative Lösung, die Verwendung einer temporären Tabelle, hat den Vorteil, dass die Tabelle nach Beendigung der Sitzung gelöscht wird:

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 300000],
        ['Q2', 60000],
        ['Q3', 500002],
        ['Q4', 130000]]

columns = ['Quarter', 'Score']
df = spark.createDataFrame(data, columns)
df.write.save_as_table("my_temp_table", table_type="temporary") # Save the dataframe into Snowflake table "my_temp_table".
df2 = Session.table("my_temp_table") # Now I can access the stored result quering the table "my_temp_table"

Zusätzliche Empfehlungen

SPRKPY1011

Meldung: pyspark.sql.dataframe.DataFrameStatFunctions.approxQuantile hat eine Problemumgehung

Kategorie: Warnung.

Beschreibung

This issue appears when the tool detects the usage of pyspark.sql.dataframe.DataFrameStatFunctions.approxQuantile which has a workaround.

Szenario

Eingabe

It’s important understand that Pyspark uses two different approxQuantile functions, here we use the DataFrameStatFunctions approxQuantile version.

import tempfile
from pyspark.sql import SparkSession, DataFrameStatFunctions
spark = SparkSession.builder.getOrCreate()
data = [['Q1', 300000],
        ['Q2', 60000],
        ['Q3', 500002],
        ['Q4', 130000]]

columns = ['Quarter', 'Gain']
df = spark.createDataFrame(data, columns)
aprox_quantille = DataFrameStatFunctions(df).approxQuantile('Gain', [0.25, 0.5, 0.75], 0)
print(aprox_quantille)

Ausgabe

SMA gibt die EWI SPRKPY1011 über die Zeile zurück, in der approxQuantile verwendet wird, so dass Sie damit feststellen können, wo Sie korrigieren müssen.

import tempfile
from snowflake.snowpark import Session, DataFrameStatFunctions
spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 300000],
        ['Q2', 60000],
        ['Q3', 500002],
        ['Q4', 130000]]

columns = ['Quarter', 'Gain']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1011 => pyspark.sql.dataframe.DataFrameStatFunctions.approxQuantile has a workaround, see documentation for more info
aprox_quantille = DataFrameStatFunctions(df).approxQuantile('Gain', [0.25, 0.5, 0.75], 0)

Empfohlene Korrektur

You can use Snowpark approxQuantile method. Some parameters don’t match so they require some manual adjustments. for the output code’s example a recommended fix could be:

from snowflake.snowpark import Session # remove DataFrameStatFunctions because is not required
...
df = spark.createDataFrame(data, columns)

aprox_quantille = df.stat.approx_quantile('Ammount', [0.25, 0.5, 0.75])

pyspark.sql.dataframe.DataFrame.approxQuantile’s relativeError-Parameter existiert nicht in SnowPark.

Zusätzliche Empfehlungen

SPRKPY1012

Warnung

Dieser Problemcode ist jetzt veraltet

Meldung: pyspark.sql.dataframe.DataFrameStatFunctions.writeTo hat eine Problemumgehung

Kategorie: Warnung.

Beschreibung

This issue appears when the tool detects the usage of pyspark.sql.dataframe.DataFrameStatFunctions.writeTo which has a workaround.

Szenario

Eingabe

In diesem Beispiel wird der Datenframe df in eine Spark-Tabelle „table“ geschrieben.

writer = df.writeTo("table")

Ausgabe

SMA gibt die EWI SPRKPY1012 über die Zeile zurück, in der DataFrameStatFunctions.writeTo verwendet wird, so dass Sie damit feststellen können, wo Sie korrigieren müssen.

#EWI: SPRKPY1012 => pyspark.sql.dataframe.DataFrameStatFunctions.writeTo has a workaround, see documentation for more info
writer = df.writeTo("table")

Empfohlene Korrektur

Verwenden Sie stattdessen df.write.SaveAsTable().

import df.write as wt
writer = df.write.save_as_table(table)

Zusätzliche Empfehlungen

SPRKPY1013

Meldung: pyspark.sql.functions.acosh hat eine Problemumgehung

Kategorie: Warnung.

Beschreibung

This issue appears when the tool detects the usage of pyspark.sql.functions.acosh which has a workaround.

Szenario

Eingabe

On this example pyspark calculates the acosh for a dataframe by using pyspark.sql.functions.acosh

from pyspark.sql import SparkSession
from pyspark.sql.functions import acosh
spark = SparkSession.builder.getOrCreate()
data = [['V1', 30],
        ['V2', 60],
        ['V3', 50],
        ['V4', 13]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_with_acosh = df.withColumn("acosh_value", acosh(df["value"]))

Ausgabe

SMA gibt die EWI SPRKPY1013 über die Zeile zurück, in der acosh verwendet wird, so dass Sie damit feststellen können, wo Sie korrigieren müssen.

from snowflake.snowpark import Session

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 30],
        ['V2', 60],
        ['V3', 50],
        ['V4', 13]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1013 => pyspark.sql.functions.acosh has a workaround, see documentation for more info
df_with_acosh = df.withColumn("acosh_value", acosh(df["value"]))

Empfohlene Korrektur

There is no direct „acosh“ implementation but „call_function“ can be used instead, using „acosh“ as the first parameter, and colName as the second one.

import snowflake.snowpark as snowpark
from snowflake.snowpark import Session
from snowflake.snowpark.functions import call_function, col

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 30],
        ['V2', 60],
        ['V3', 50],
        ['V4', 13]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_with_acosh = df.select(call_function('ACOSH', col('value')))

Zusätzliche Empfehlungen

SPRKPY1014

Meldung: pyspark.sql.functions.asinh hat eine Problemumgehung

Kategorie: Warnung.

Beschreibung

This issue appears when the tool detects the usage of pyspark.sql.functions.asinh which has a workaround.

Szenario

Eingabe

On this example pyspark calculates the asinh for a dataframe by using pyspark.sql.functions.asinh.

from pyspark.sql import SparkSession
from pyspark.sql.functions import asinh
spark = SparkSession.builder.getOrCreate()
data = [['V1', 3.0],
        ['V2', 60.0],
        ['V3', 14.0],
        ['V4', 3.1]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_result = df.withColumn("asinh_value", asinh(df["value"]))

Ausgabe

SMA gibt die EWI SPRKPY1014 über die Zeile zurück, in der asinh verwendet wird, so dass Sie damit feststellen können, wo Sie korrigieren müssen.

from snowflake.snowpark import Session

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 3.0],
        ['V2', 60.0],
        ['V3', 14.0],
        ['V4', 3.1]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1014 => pyspark.sql.functions.asinh has a workaround, see documentation for more info
df_result = df.withColumn("asinh_value", asinh(df["value"]))

Empfohlene Korrektur

There is no direct „asinh“ implementation but „call_function“ can be used instead, using „asinh“ as the first parameter, and colName as the second one.

import snowflake.snowpark as snowpark
from snowflake.snowpark import Session
from snowflake.snowpark.functions import call_function, col

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 3.0],
        ['V2', 60.0],
        ['V3', 14.0],
        ['V4', 3.1]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_result = df.select(call_function('asinh', col('value')))

Zusätzliche Empfehlungen

SPRKPY1015

Meldung: pyspark.sql.functions.atanh hat eine Problemumgehung

Kategorie: Warnung.

Beschreibung

This issue appears when the tool detects the usage of pyspark.sql.functions.atanh which has a workaround.

Szenario

Eingabe

On this example pyspark calculates the atanh for a dataframe by using pyspark.sql.functions.atanh.

from pyspark.sql import SparkSession
from pyspark.sql.functions import atanh
spark = SparkSession.builder.getOrCreate()
data = [['V1', 0.14],
        ['V2', 0.32],
        ['V3', 0.4],
        ['V4', -0.36]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_result = df.withColumn("atanh_value", atanh(df["value"]))

Ausgabe

SMA gibt die EWI SPRKPY1015 über die Zeile zurück, in der atanh verwendet wird, so dass Sie damit feststellen können, wo Sie korrigieren müssen.

from snowflake.snowpark import Session

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 0.14],
        ['V2', 0.32],
        ['V3', 0.4],
        ['V4', -0.36]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1015 => pyspark.sql.functions.atanh has a workaround, see documentation for more info
df_result = df.withColumn("atanh_value", atanh(df["value"]))

Empfohlene Korrektur

There is no direct „atanh“ implementation but „call_function“ can be used instead, using „atanh“ as the first parameter, and colName as the second one.

import snowflake.snowpark as snowpark
from snowflake.snowpark import Session
from snowflake.snowpark.functions import call_function, col

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['V1', 0.14],
        ['V2', 0.32],
        ['V3', 0.4],
        ['V4', -0.36]]

columns = ['Paremeter', 'value']
df = spark.createDataFrame(data, columns)
df_result = df.select(call_function('atanh', col('value')))

Zusätzliche Empfehlungen

SPRKPY1016

Warnung

This issue code has been deprecated since Spark Conversion Core Version 0.11.7

Meldung: pyspark.sql.functions.collect_set hat eine Problemumgehung

Kategorie: Warnung.

Beschreibung

This issue appears when the tool detects the usage of pyspark.sql.functions.collect_set which has a workaround.

Szenario

Eingabe

Using collect*set to get the elements of _colname* without duplicates:

col = collect_set(colName)

Ausgabe

SMA gibt die EWI SPRKPY1016 über die Zeile zurück, in der collect_set verwendet wird, so dass Sie damit feststellen können, wo Sie korrigieren müssen.

#EWI: SPRKPY1016 => pyspark.sql.functions.collect_set has a workaround, see documentation for more info
col = collect_set(colName)

Empfohlene Korrektur

Verwenden Sie die array_agg-Funktion, und fügen Sie ein zweites Argument mit dem True-Wert hinzu.

col = array_agg(col, True)

Zusätzliche Empfehlung

SPRKPY1017

Warnung

This issue code has been deprecated since Spark Conversion Core Version 4.8.0

pyspark.sql.functions.date_add bietet eine Problemumgehung

Kategorie: Warnung.

Beschreibung

This issue appears when the tool detects the usage of pyspark.sql.functions.date_add which has a workaround.

Szenario

Eingabe

In diesem Beispiel verwenden wir date_add, um das Datum 5 Tage nach dem aktuellen Datum für den Datenframe df zu berechnen.

col = df.select(date_add(df.colName, 5))

Ausgabe

SMA gibt die EWI SPRKPY1017 über die Zeile zurück, in der date_add verwendet wird, so dass Sie damit feststellen können, wo Sie korrigieren müssen.

#EWI: SPRKPY1017 => pyspark.sql.functions.date_add has a workaround, see documentation for more info
col = df.select(date_add(df.colName, 5))

Empfohlene Korrektur

Import snowflake.snowpark.functions, which contains an implementation for date_add (and alias dateAdd) function.

from snowflake.snowpark.functions import date_add

col = df.select(date_add(df.dt, 1))

Zusätzliche Empfehlung

SPRKPY1018

Warnung

This issue code has been deprecated since Spark Conversion Core Version 4.8.0

Meldung: pyspark.sql.functions.date_sub bietet eine Problemumgehung

Kategorie: Warnung.

Beschreibung

This issue appears when the tool detects the usage of pyspark.sql.functions.date_sub which has a workaround.

Szenario

Eingabe

In diesem Beispiel verwenden wir date_add, um das Datum 5 Tage vor dem aktuellen Datum für den Datenframe df zu berechnen.

col = df.select(date_sub(df.colName, 5))

Ausgabe

SMA gibt die EWI SPRKPY1018 über die Zeile zurück, in der date_sub verwendet wird, so dass Sie damit feststellen können, wo Sie korrigieren müssen.

#EWI: SPRKPY1018 => pyspark.sql.functions.date_sub has a workaround, see documentation for more info
col = df.select(date_sub(df.colName, 5))

Empfohlene Korrektur

Import snowflake.snowpark.functions, which contains an implementation for date_sub function.

from pyspark.sql.functions import date_sub
df.withColumn("date", date_sub(df.colName, 5))

Zusätzliche Empfehlung

SPRKPY1019

Warnung

This issue code has been deprecated since Spark Conversion Core Version 4.8.0

Meldung: pyspark.sql.functions.datediff hat eine Problemumgehung

Kategorie: Warnung.

Beschreibung

This issue appears when the tool detects the usage of pyspark.sql.functions.datediff which has a workaround.

Szenario

Eingabe

In diesem Beispiel verwenden wir datediff, um die Tagesdifferenz von „heute“ und anderen Daten zu berechnen.

contacts = (contacts
            #days since last event
            .withColumn('daysSinceLastEvent', datediff(lit(today),'lastEvent'))
            #days since deployment
            .withColumn('daysSinceLastDeployment', datediff(lit(today),'lastDeploymentEnd'))
            #days since online training
            .withColumn('daysSinceLastTraining', datediff(lit(today),'lastTraining'))
            #days since last RC login
            .withColumn('daysSinceLastRollCallLogin', datediff(lit(today),'adx_identity_lastsuccessfullogin'))
            #days since last EMS login
            .withColumn('daysSinceLastEMSLogin', datediff(lit(today),'vms_lastuserlogin'))
           )

Ausgabe

SMA gibt die EWI SPRKPY1019 über die Zeile zurück, in der datediff verwendet wird, so dass Sie damit feststellen können, wo Sie korrigieren müssen.

from pyspark.sql.functions import datediff
#EWI: SPRKPY1019 => pyspark.sql.functions.datediff has a workaround, see documentation for more info
contacts = (contacts
            #days since last event
            .withColumn('daysSinceLastEvent', datediff(lit(today),'lastEvent'))
            #days since deployment
            .withColumn('daysSinceLastDeployment', datediff(lit(today),'lastDeploymentEnd'))
            #days since online training
            .withColumn('daysSinceLastTraining', datediff(lit(today),'lastTraining'))
            #days since last RC login
            .withColumn('daysSinceLastRollCallLogin', datediff(lit(today),'adx_identity_lastsuccessfullogin'))
            #days since last EMS login
            .withColumn('daysSinceLastEMSLogin', datediff(lit(today),'vms_lastuserlogin'))
           )

SMA convert pyspark.sql.functions.datediff onto snowflake.snowpark.functions.daydiff that also calculates the diference in days between two dates.

Empfohlene Korrektur

datediff(part: string ,end: ColumnOrName, start: ColumnOrName)

Action: Import snowflake.snowpark.functions, which contains an implementation for datediff function that requires an extra parameter for date time part and allows more versatility on calculate differences between dates.

from snowflake.snowpark import Session
from snowflake.snowpark.functions import datediff
contacts = (contacts
            #days since last event
            .withColumn('daysSinceLastEvent', datediff('day', lit(today),'lastEvent'))
            #days since deployment
            .withColumn('daysSinceLastDeployment', datediff('day',lit(today),'lastDeploymentEnd'))
            #days since online training
            .withColumn('daysSinceLastTraining', datediff('day', lit(today),'lastTraining'))
            #days since last RC login
            .withColumn('daysSinceLastRollCallLogin', datediff('day', lit(today),'adx_identity_lastsuccessfullogin'))
            #days since last EMS login
            .withColumn('daysSinceLastEMSLogin', datediff('day', lit(today),'vms_lastuserlogin'))
           )

Empfehlung

SPRKPY1020

Meldung: pyspark.sql.functions.instr hat eine Problemumgehung

Kategorie: Warnung.

Beschreibung

This issue appears when the tool detects the usage of pyspark.sql.functions.instr which has a workaround.

Szenario

Eingabe

Hier ist ein einfaches Beispiel für die Verwendung von pyspark instr:

from pyspark.sql import SparkSession
from pyspark.sql.functions import instr
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('abcd',)], ['test',])
df.select(instr(df.test, 'cd').alias('result')).collect()

Ausgabe:

SMA gibt die EWI SPRKPY1020 über die Zeile zurück, in der instr verwendet wird, so dass Sie damit feststellen können, wo Sie korrigieren müssen.

from snowflake.snowpark import Session

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
df = spark.createDataFrame([('abcd',)], ['test',])
#EWI: SPRKPY1020 => pyspark.sql.functions.instr has a workaround, see documentation for more info
df.select(instr(df.test, 'cd').alias('result')).collect()

Empfohlene Korrektur

Requires a manual change by using the function charindex and changing the order of the first two parameters.

import snowflake.snowpark as snowpark
from snowflake.snowpark import Session
from snowflake.snowpark.functions import charindex, lit

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
df = spark.createDataFrame([('abcd',)], ['test',])
df.select(charindex(lit('cd'), df.test).as_('result')).show()

Zusätzliche Empfehlung

SPRKPY1021

Warnung

Dieser Problemcode ist jetzt veraltet

Meldung: pyspark.sql.functions.last hat eine Problemumgehung, siehe Dokumentation für weitere Informationen

Kategorie: Warnung

Beschreibung

This issue appears when the SMA detects a use of the pyspark.sql.functions.last function, which has a workaround.

Szenario

Eingabe

Below is an example of a use of the pyspark.sql.functions.last function that generates this EWI. In this example, the last function is used to get the last value for each name.

df = spark.createDataFrame([("Alice", 1), ("Bob", 2), ("Charlie", 3), ("Alice", 4), ("Bob", 5)], ["name", "value"])
df_grouped = df.groupBy("name").agg(last("value").alias("last_value"))

Ausgabe

The SMA adds the EWI SPRKPY1021 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([("Alice", 1), ("Bob", 2), ("Charlie", 3), ("Alice", 4), ("Bob", 5)], ["name", "value"])
#EWI: SPRKPY1021 => pyspark.sql.functions.last has a workaround, see documentation for more info
df_grouped = df.groupBy("name").agg(last("value").alias("last_value"))

Empfohlene Korrektur

As a workaround, you can use the Snowflake LAST_VALUE function. To invoke this function from Snowpark, use the snowflake.snowpark.functions.call_builtin function and pass the string last_value as the first argument and the corresponding column as the second argument. If you were using the name of the column in the last function, you should convert it into a column when calling the call_builtin function.

df = spark.createDataFrame([("Alice", 1), ("Bob", 2), ("Charlie", 3), ("Alice", 4), ("Bob", 5)], ["name", "value"])
df_grouped = df.groupBy("name").agg(call_builtin("last_value", col("value")).alias("last_value"))

Zusätzliche Empfehlungen


description: >- The mode parameter in the methods of CSV, JSON and PARQUET is transformed to overwrite


SPRKPY1022

Meldung: pyspark.sql.functions.log10 hat eine Problemumgehung, siehe Dokumentation für weitere Informationen

Kategorie: Warnung

Beschreibung

This issue appears when the SMA detects a use of the pyspark.sql.functions.log10 function, which has a workaround.

Szenario

Eingabe

Below is an example of a use of the pyspark.sql.functions.log10 function that generates this EWI. In this example, the log10 function is used to calculate the base-10 logarithm of the value column.

df = spark.createDataFrame([(1,), (10,), (100,), (1000,), (10000,)], ["value"])
df_with_log10 = df.withColumn("log10_value", log10(df["value"]))

Ausgabe

The SMA adds the EWI SPRKPY1022 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([(1,), (10,), (100,), (1000,), (10000,)], ["value"])
#EWI: SPRKPY1022 => pyspark.sql.functions.log10 has a workaround, see documentation for more info
df_with_log10 = df.withColumn("log10_value", log10(df["value"]))

Empfohlene Korrektur

As a workaround, you can use the snowflake.snowpark.functions.log function by passing the literal value 10 as the base.

df = spark.createDataFrame([(1,), (10,), (100,), (1000,), (10000,)], ["value"])
df_with_log10 = df.withColumn("log10_value", log(10, df["value"]))

Zusätzliche Empfehlungen

SPRKPY1023

Meldung: pyspark.sql.functions.log1p hat eine Problemumgehung, siehe Dokumentation für weitere Informationen

Kategorie: Warnung

Beschreibung

This issue appears when the SMA detects a use of the pyspark.sql.functions.log1p function, which has a workaround.

Szenario

Eingabe

Below is an example of a use of the pyspark.sql.functions.log1p function that generates this EWI. In this example, the log1p function is used to calculate the natural logarithm of the value column.

df = spark.createDataFrame([(0,), (1,), (10,), (100,)], ["value"])
df_with_log1p = df.withColumn("log1p_value", log1p(df["value"]))

Ausgabe

The SMA adds the EWI SPRKPY1023 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([(0,), (1,), (10,), (100,)], ["value"])
#EWI: SPRKPY1023 => pyspark.sql.functions.log1p has a workaround, see documentation for more info
df_with_log1p = df.withColumn("log1p_value", log1p(df["value"]))

Empfohlene Korrektur

As a workaround, you can use the call_function function by passing the string ln as the first argument and by adding 1 to the second argument.

df = spark.createDataFrame([(0,), (1,), (10,), (100,)], ["value"])
df_with_log1p = df.withColumn("log1p_value", call_function("ln", lit(1) + df["value"]))

Zusätzliche Empfehlungen

SPRKPY1024

Meldung: pyspark.sql.functions.log2 hat eine Problemumgehung, siehe Dokumentation für weitere Informationen

Kategorie: Warnung

Beschreibung

This issue appears when the SMA detects a use of the pyspark.sql.functions.log2 function, which has a workaround.

Szenario

Eingabe

Below is an example of a use of the pyspark.sql.functions.log2 function that generates this EWI. In this example, the log2 function is used to calculate the base-2 logarithm of the value column.

df = spark.createDataFrame([(1,), (2,), (4,), (8,), (16,)], ["value"])
df_with_log2 = df.withColumn("log2_value", log2(df["value"]))

Ausgabe

The SMA adds the EWI SPRKPY1024 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([(1,), (2,), (4,), (8,), (16,)], ["value"])
#EWI: SPRKPY1024 => pyspark.sql.functions.log2 has a workaround, see documentation for more info
df_with_log2 = df.withColumn("log2_value", log2(df["value"]))

Empfohlene Korrektur

As a workaround, you can use the snowflake.snowpark.functions.log function by passing the literal value 2 as the base.

df = session.createDataFrame([(1,), (2,), (4,), (8,), (16,)], ["value"])
df_with_log2 = df.withColumn("log2_value", log(2, df["value"]))

Zusätzliche Empfehlungen

SPRKPY1025

Warnung

Dieser Problemcode ist jetzt veraltet

Meldung: pyspark.sql.functions.ntile hat eine Problemumgehung, siehe Dokumentation für weitere Informationen

Kategorie: Warnung

Beschreibung

This issue appears when the SMA detects a use of the pyspark.sql.functions.ntile function, which has a workaround.

Szenario

Eingabe

Below is an example of a use of the pyspark.sql.functions.ntile function that generates this EWI. In this example, the ntile function is used to divide the rows into 3 buckets.

df = spark.createDataFrame([("Alice", 50), ("Bob", 30), ("Charlie", 60), ("David", 90), ("Eve", 70), ("Frank", 40)], ["name", "score"])
windowSpec = Window.orderBy("score")
df_with_ntile = df.withColumn("bucket", ntile(3).over(windowSpec))

Ausgabe

The SMA adds the EWI SPRKPY1025 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([("Alice", 50), ("Bob", 30), ("Charlie", 60), ("David", 90), ("Eve", 70), ("Frank", 40)], ["name", "score"])
windowSpec = Window.orderBy("score")
#EWI: SPRKPY1025 => pyspark.sql.functions.ntile has a workaround, see documentation for more info
df_with_ntile = df.withColumn("bucket", ntile(3).over(windowSpec))

Empfohlene Korrektur

Snowpark has an equivalent ntile function, however, the argument pass to it should be a column. As a workaround, you can convert the literal argument into a column using the snowflake.snowpark.functions.lit function.

df = spark.createDataFrame([("Alice", 50), ("Bob", 30), ("Charlie", 60), ("David", 90), ("Eve", 70), ("Frank", 40)], ["name", "score"])
windowSpec = Window.orderBy("score")
df_with_ntile = df.withColumn("bucket", ntile(lit(3)).over(windowSpec))

Zusätzliche Empfehlungen

SPRKPY1026

Warnung

This issue code has been deprecated since Spark Conversion Core 4.3.2

Meldung: pyspark.sql.readwriter.DataFrameReader. csv hat eine Problemumgehung, siehe Dokumentation für weitere Informationen

Kategorie: Warnung

Beschreibung

This issue appears when the SMA detects a use of the pyspark.sql.readwriter.DataFrameReader.csv function, which has a workaround.

Szenario

Eingabe

Below is an example of a use of the pyspark.sql.readwriter.DataFrameReader.csv function that generates this EWI. In this example, the csv function is used to read multiple .csv files with a given schema and uses some extra options such as encoding, header and sep to fine-tune the behavior of reading the files.

file_paths = [
  "path/to/your/file1.csv",
  "path/to/your/file2.csv",
  "path/to/your/file3.csv",
]

df = session.read.csv(
  file_paths,
  schema=my_schema,
  encoding="UTF-8",
  header=True,
  sep=","
)

Ausgabe

The SMA adds the EWI SPRKPY1026 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

file_paths = [
  "path/to/your/file1.csv",
  "path/to/your/file2.csv",
  "path/to/your/file3.csv",
]

#EWI: SPRKPY1026 => pyspark.sql.readwriter.DataFrameReader.csv has a workaround, see documentation for more info
df = session.read.csv(
  file_paths,
  schema=my_schema,
  encoding="UTF-8",
  header=True,
  sep=","
)

Empfohlene Korrektur

In this section, we explain how to configure the path parameter, the schema parameter and some options to make them work in Snowpark.

1. Pfad Parameter

Snowpark requires the path parameter to be a stage location so, as a workaround, you can create a temporary stage and add each .csv file to that stage using the prefix file://.

2. Schema Parameter

Snowpark does not allow defining the schema as a parameter of the csv function. As a workaround, you can use the snowflake.snowpark.DataFrameReader.schema function.

3. Optionen Parameter

Snowpark does not allow defining the extra options as parameters of the csv function. As a workaround, for many of them you can use the snowflake.snowpark.DataFrameReader.option function to specify those parameters as options of the DataFrameReader.

Bemerkung

Die folgenden Optionen werden von Snowpark nicht unterstützt:

  • columnNameOfCorruptRecord

  • emptyValue

  • enforceSchema

  • Header

  • ignoreLeadingWhiteSpace

  • ignoreTrailingWhiteSpace

  • inferSchema

  • locale

  • maxCharsPerColumn

  • maxColumns

  • Modus (mode)

  • mehrzeilig

  • nanValue

  • negativInf

  • nullValue

  • positivInf

  • quoteAll

  • samplingRatio

  • timestampNTZFormat

  • unescapedQuoteHandling

Nachfolgend sehen Sie ein vollständiges Beispiel dafür, wie der Eingabecode nach Anwendung der oben genannten Vorschläge aussehen sollte, damit er in Snowpark funktioniert:

stage = f'{session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {stage}')

session.file.put(f"file:///path/to/your/file1.csv", f"@{stage}")
session.file.put(f"file:///path/to/your/file2.csv", f"@{stage}")
session.file.put(f"file:///path/to/your/file3.csv", f"@{stage}")

df = session.read.schema(my_schema).option("encoding", "UTF-8").option("sep", ",").csv(stage)

Zusätzliche Empfehlungen

SPRKPY1027

Warnung

This issue code has been deprecated since Spark Conversion Core 4.5.2

Meldung: pyspark.sql.readwriter.DataFrameReader. json hat eine Problemumgehung, siehe Dokumentation für weitere Informationen

Kategorie: Warnung

Beschreibung

This issue appears when the SMA detects a use of the pyspark.sql.readwriter.DataFrameReader.json function, which has a workaround.

Szenario

Eingabe

Below is an example of a use of the pyspark.sql.readwriter.DataFrameReader.json function that generates this EWI. In this example, the json function is used to read multiple .json files with a given schema and uses some extra options such as primitiveAsString and dateFormat to fine-tune the behavior of reading the files.

file_paths = [
  "path/to/your/file1.json",
  "path/to/your/file2.json",
  "path/to/your/file3.json",
]

df = session.read.json(
  file_paths,
  schema=my_schema,
  primitiveAsString=True,
  dateFormat="2023-06-20"
)

Ausgabe

The SMA adds the EWI SPRKPY1027 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

file_paths = [
  "path/to/your/file1.json",
  "path/to/your/file2.json",
  "path/to/your/file3.json",
]

#EWI: SPRKPY1027 => pyspark.sql.readwriter.DataFrameReader.json has a workaround, see documentation for more info
df = session.read.json(
  file_paths,
  schema=my_schema,
  primitiveAsString=True,
  dateFormat="2023-06-20"
)

Empfohlene Korrektur

In this section, we explain how to configure the path parameter, the schema parameter and some options to make them work in Snowpark.

1. Pfad Parameter

Snowpark requires the path parameter to be a stage location so, as a workaround, you can create a temporary stage and add each .json file to that stage using the prefix file://.

2. Schema Parameter

Snowpark does not allow defining the schema as a parameter of the json function. As a workaround, you can use the snowflake.snowpark.DataFrameReader.schema function.

3. Optionen Parameter

Snowpark does not allow defining the extra options as parameters of the json function. As a workaround, for many of them you can use the snowflake.snowpark.DataFrameReader.option function to specify those parameters as options of the DataFrameReader.

Bemerkung

Die folgenden Optionen werden von Snowpark nicht unterstützt:

  • allowBackslashEscapingAnyCharacter

  • allowComments

  • allowNonNumericNumbers

  • allowNumericLeadingZero

  • allowSingleQuotes

  • allowUnquotedControlChars

  • allowUnquotedFieldNames

  • columnNameOfCorruptRecord

  • dropFiledIfAllNull

  • Codierung

  • ignoreNullFields

  • lineSep

  • locale

  • Modus (mode)

  • mehrzeilig

  • prefersDecimal

  • primitiveAsString

  • samplingRatio

  • timestampNTZFormat

  • timeZone

Nachfolgend sehen Sie ein vollständiges Beispiel dafür, wie der Eingabecode nach Anwendung der oben genannten Vorschläge aussehen sollte, damit er in Snowpark funktioniert:

stage = f'{session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {stage}')

session.file.put(f"file:///path/to/your/file1.json", f"@{stage}")
session.file.put(f"file:///path/to/your/file2.json", f"@{stage}")
session.file.put(f"file:///path/to/your/file3.json", f"@{stage}")

df = session.read.schema(my_schema).option("dateFormat", "2023-06-20").json(stage)

Zusätzliche Empfehlungen

SPRKPY1028

Meldung: pyspark.sql.readwriter.DataFrameReader.orc bietet eine Problemumgehung, siehe Dokumentation für weitere Informationen

Kategorie: Warnung

Beschreibung

This issue appears when the SMA detects a use of the pyspark.sql.readwriter.DataFrameReader.orc function, which has a workaround.

Szenario

Eingabe

Below is an example of a use of the pyspark.sql.readwriter.DataFrameReader.orc function that generates this EWI. In this example, the orc function is used to read multiple .orc files and uses some extra options such as mergeSchema and recursiveFileLookup to fine-tune the behavior of reading the files.

file_paths = [
  "path/to/your/file1.orc",
  "path/to/your/file2.orc",
  "path/to/your/file3.orc",
]

df = session.read.orc(
  file_paths,
  mergeSchema="True",
  recursiveFileLookup="True"
)

Ausgabe

The SMA adds the EWI SPRKPY1028 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

file_paths = [
  "path/to/your/file1.orc",
  "path/to/your/file2.orc",
  "path/to/your/file3.orc",
]

#EWI: SPRKPY1028 => pyspark.sql.readwriter.DataFrameReader.orc has a workaround, see documentation for more info
df = session.read.orc(
  file_paths,
  mergeSchema="True",
  recursiveFileLookup="True"
)

Empfohlene Korrektur

In this section, we explain how to configure the path parameter and the extra options to make them work in Snowpark.

1. Pfad Parameter

Snowpark requires the path parameter to be a stage location so, as a workaround, you can create a temporary stage and add each .orc file to that stage using the prefix file://.

2. Optionen Parameter

Snowpark does not allow defining the extra options as parameters of the orc function. As a workaround, for many of them you can use the snowflake.snowpark.DataFrameReader.option function to specify those parameters as options of the DataFrameReader.

Bemerkung

Die folgenden Optionen werden von Snowpark nicht unterstützt:

  • Kompression

  • mergeSchema

Nachfolgend sehen Sie ein vollständiges Beispiel dafür, wie der Eingabecode nach Anwendung der oben genannten Vorschläge aussehen sollte, damit er in Snowpark funktioniert:

stage = f'{session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {stage}')

session.file.put(f"file:///path/to/your/file1.orc", f"@{stage}")
session.file.put(f"file:///path/to/your/file2.orc", f"@{stage}")
session.file.put(f"file:///path/to/your/file3.orc", f"@{stage}")

df = session.read.option(recursiveFileLookup, "True").orc(stage)

Zusätzliche Empfehlungen

SPRKPY1029

Meldung: Dieses Problem tritt auf, wenn das Tool die Verwendung von pyspark.sql.readwriter.DataFrameReader.parquet erkennt Diese Funktion wird unterstützt, aber einige der Unterschiede zwischen Snowpark und Spark API erfordern möglicherweise einige manuelle Änderungen.

Kategorie: Warnung

Beschreibung

This issue appears when the SMA detects a use of the pyspark.sql.readwriter.DataFrameReader.parquet function. This function is supported by Snowpark, however, there are some differences that would require some manual changes.

Szenario

Eingabe

Below is an example of a use of the pyspark.sql.readwriter.DataFrameReader.parquet function that generates this EWI.

file_paths = [
  "path/to/your/file1.parquet",
  "path/to/your/file2.parquet",
  "path/to/your/file3.parquet",
]

df = session.read.parquet(
  *file_paths,
  mergeSchema="true",
  pathGlobFilter="*file*",
  recursiveFileLookup="true",
  modifiedBefore="2024-12-31T00:00:00",
  modifiedAfter="2023-12-31T00:00:00"
)

Ausgabe

The SMA adds the EWI SPRKPY1029 to the output code to let you know that this function is supported by Snowpark, but it requires some manual adjustments. Please note that the options supported by Snowpark are transformed into option function calls and those that are not supported are removed. This is explained in more detail in the next sections.

file_paths = [
  "path/to/your/file1.parquet",
  "path/to/your/file2.parquet",
  "path/to/your/file3.parquet"
]

#EWI: SPRKPY1076 => Some of the included parameters are not supported in the parquet function, the supported ones will be added into a option method.
#EWI: SPRKPY1029 => This issue appears when the tool detects the usage of pyspark.sql.readwriter.DataFrameReader.parquet. This function is supported, but some of the differences between Snowpark and the Spark API might require making some manual changes.
df = session.read.option("PATTERN", "*file*").parquet(
  *file_paths
)

Empfohlene Korrektur

In this section, we explain how to configure the paths and options parameters to make them work in Snowpark.

1. paths-Parameter

In Spark, this parameter can be a local or cloud location. Snowpark only accepts cloud locations using a snowflake stage. So, you can create a temporal stage and add each file into it using the prefix file://.

2. Optionen Parameter

Snowpark does not allow defining the different options as parameters of the parquet function. As a workaround, you can use the option or options functions to specify those parameters as extra options of the DataFrameReader.

Bitte beachten Sie, dass die Snowpark options nicht genau mit den PySpark options übereinstimmen, so dass einige manuelle Änderungen erforderlich sein könnten. Nachfolgend finden Sie eine genauere Erklärung, wie Sie die gängigsten PySpark-Optionen in Snowpark konfigurieren.

2.1 mergeSchema Option

Parquet supports schema evolution, allowing users to start with a simple schema and gradually add more columns as needed. This can result in multiple parquet files with different but compatible schemas. In Snowflake, thanks to the infer_schema capabilities you don’t need to do that and therefore the mergeSchema option can just be removed.

2.2 pathGlobFilter Option

If you want to load only a subset of files from the stage, you can use the pattern option to specify a regular expression that matches the files you want to load. The SMA already automates this as you can see in the output of this scenario.

2.3 recursiveFileLookupstr Option

This option is not supported by Snowpark. The best recommendation is to use a regular expression like with the pathGlobFilter option to achieve something similar.

2.4 modifiedBefore / modifiedAfter-Option

You can achieve the same result in Snowflake by using the metadata columns.

Bemerkung

Die folgenden Optionen werden von Snowpark nicht unterstützt:

  • Kompression

  • datetimeRebaseMode

  • int96RebaseMode

  • mergeSchema

Nachfolgend finden Sie ein vollständiges Beispiel dafür, wie der Eingabecode umgewandelt werden sollte, damit er in Snowpark funktioniert:

from snowflake.snowpark.column import METADATA_FILE_LAST_MODIFIED, METADATA_FILENAME

temp_stage = f'{session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}')

session.file.put(f"file:///path/to/your/file1.parquet", f"@{temp_stage}")
session.file.put(f"file:///path/to/your/file2.parquet", f"@{temp_stage}")
session.file.put(f"file:///path/to/your/file3.parquet", f"@{temp_stage}")

df = session.read \
  .option("PATTERN", ".*file.*") \
  .with_metadata(METADATA_FILENAME, METADATA_FILE_LAST_MODIFIED) \
  .parquet(temp_stage) \
  .where(METADATA_FILE_LAST_MODIFIED < '2024-12-31T00:00:00') \
  .where(METADATA_FILE_LAST_MODIFIED > '2023-12-31T00:00:00')

Zusätzliche Empfehlungen

SPRKPY1030

Warnung

Dieser Problemcode ist jetzt veraltet

Meldung: pyspark.sql.session.SparkSession.Builder.appName hat eine Problemumgehung, siehe Dokumentation für weitere Informationen

Kategorie: Warnung

Beschreibung

This issue appears when the SMA detects a use of the pyspark.sql.session.SparkSession.Builder.appName function, which has a workaround.

Szenario

Eingabe

Below is an example of a use of the pyspark.sql.session.SparkSession.Builder.appName function that generates this EWI. In this example, the appName function is used to set MyApp as the name of the application.

session = SparkSession.builder.appName("MyApp").getOrCreate()

Ausgabe

The SMA adds the EWI SPRKPY1030 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

#EWI: SPRKPY1030 => pyspark.sql.session.SparkSession.Builder.appName has a workaround, see documentation for more info
session = Session.builder.appName("MyApp").getOrCreate()

Empfohlene Korrektur

As a workaround, you can import the snowpark_extensions package which provides an extension for the appName function.

import snowpark_extensions
session = SessionBuilder.appName("MyApp").getOrCreate()

Zusätzliche Empfehlungen

SPRKPY1031

Warnung

This issue code has been deprecated since Spark Conversion Core 2.7.0

Meldung: pyspark.sql.column.Column.contains hat eine Problemumgehung, siehe Dokumentation für weitere Informationen

Kategorie: Warnung

Beschreibung

This issue appears when the SMA detects a use of the pyspark.sql.column.Column.contains function, which has a workaround.

Szenario

Eingabe

Below is an example of a use of the pyspark.sql.column.Column.contains function that generates this EWI. In this example, the contains function is used to filter the rows where the ‚City‘ column contains the substring ‚New‘.

df = spark.createDataFrame([("Alice", "New York"), ("Bob", "Los Angeles"), ("Charlie", "Chicago")], ["Name", "City"])
df_filtered = df.filter(col("City").contains("New"))

Ausgabe

The SMA adds the EWI SPRKPY1031 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([("Alice", "New York"), ("Bob", "Los Angeles"), ("Charlie", "Chicago")], ["Name", "City"])
#EWI: SPRKPY1031 => pyspark.sql.column.Column.contains has a workaround, see documentation for more info
df_filtered = df.filter(col("City").contains("New"))

Empfohlene Korrektur

As a workaround, you can use the snowflake.snowpark.functions.contains function by passing the column as the first argument and the element to search as the second argument. If the element to search is a literal value then it should be converted into a column expression using the lit function.

from snowflake.snowpark import functions as f
df = spark.createDataFrame([("Alice", "New York"), ("Bob", "Los Angeles"), ("Charlie", "Chicago")], ["Name", "City"])
df_filtered = df.filter(f.contains(col("City"), f.lit("New")))

Zusätzliche Empfehlungen

SPRKPY1032

Message: *spark element* is not defined

Kategorie: Konvertierungsfehler

Beschreibung

Dieses Problem tritt auf, wenn SMA keinen geeigneten Zuordnungsstatus für das angegebene Element ermitteln konnte. Das bedeutet, dass SMA noch nicht weiß, ob dieses Element von Snowpark unterstützt wird oder nicht. Bitte beachten Sie, dass dies ein allgemeiner Fehlercode ist, der von SMA für jedes nicht definierte Element verwendet wird.

Szenario

Eingabe

Below is an example of a function for which the SMA could not determine an appropriate mapping status. In this case, you should assume that not_defined_function() is a valid PySpark function and the code runs.

sc.parallelize(["a", "b", "c", "d", "e"], 3).not_defined_function().collect()

Ausgabe

The SMA adds the EWI SPRKPY1032 to the output code to let you know that this element is not defined.

#EWI: SPRKPY1032 => pyspark.rdd.RDD.not_defined_function is not defined
sc.parallelize(["a", "b", "c", "d", "e"], 3).not_defined_function().collect()

Empfohlene Korrektur

Um zu versuchen, das Problem zu identifizieren, können Sie die folgenden Überprüfungen durchführen:

  • Prüfen Sie, ob der Quellcode die richtige Syntax hat und richtig geschrieben ist.

  • Check if you are using a PySpark version supported by the SMA. To know which PySpark version is supported by the SMA at the moment of running the SMA, you can review the first page of the DetailedReport.docx file.

If this is a valid PySpark element, please report that you encountered a conversion error on that particular element using the Report an Issue option of the SMA and include any additional information that you think may be helpful.

Please note that if an element is not defined, it does not mean that it is not supported by Snowpark. You should check the Snowpark Documentation to verify if an equivalent element exist.

Zusätzliche Empfehlungen

SPRKPY1033

Warnung

Dieser Problemcode ist jetzt veraltet

Meldung: pyspark.sql.functions.asc hat eine Problemumgehung, siehe Dokumentation für weitere Informationen

Kategorie: Warnung

Beschreibung

This issue appears when the SMA detects a use of the pyspark.sql.functions.asc function, which has a workaround.

Szenarien

The pyspark.sql.functions.asc function takes either a column object or the name of the column as a string as its parameter. Both scenarios are not supported by Snowpark so this EWI is generated.

Szenario 1

Eingabe

Below is an example of a use of the pyspark.sql.functions.asc function that takes a column object as parameter.

df.orderBy(asc(col))

Ausgabe

The SMA adds the EWI SPRKPY1033 to the output code to let you know that the asc function with a column object parameter is not directly supported by Snowpark, but it has a workaround.

#EWI: SPRKPY1033 => pyspark.sql.functions.asc has a workaround, see documentation for more info
df.orderBy(asc(col))

Empfohlene Korrektur

As a workaround, you can call the snowflake.snowpark.Column.asc function from the column parameter.

df.orderBy(col.asc())
Szenario 2

Eingabe

Below is an example of a use of the pyspark.sql.functions.asc function that takes the name of the column as parameter.

df.orderBy(asc("colName"))

Ausgabe

The SMA adds the EWI SPRKPY1033 to the output code to let you know that the asc function with a column name parameter is not directly supported by Snowpark, but it has a workaround.

#EWI: SPRKPY1033 => pyspark.sql.functions.asc has a workaround, see documentation for more info
df.orderBy(asc("colName"))

Empfohlene Korrektur

As a workaround, you can convert the string parameter into a column object using the snowflake.snowpark.functions.col function and then call the snowflake.snowpark.Column.asc function.

df.orderBy(col("colName").asc())

Zusätzliche Empfehlungen

SPRKPY1034

Warnung

Dieser Problemcode ist jetzt veraltet

Meldung: pyspark.sql.functions.desc hat eine Problemumgehung, siehe Dokumentation für weitere Informationen

Kategorie: Warnung

Beschreibung

This issue appears when the SMA detects a use of the pyspark.sql.functions.desc function, which has a workaround.

Szenarien

The pyspark.sql.functions.desc function takes either a column object or the name of the column as a string as its parameter. Both scenarios are not supported by Snowpark so this EWI is generated.

Szenario 1

Eingabe

Below is an example of a use of the pyspark.sql.functions.desc function that takes a column object as parameter.

df.orderBy(desc(col))

Ausgabe

The SMA adds the EWI SPRKPY1034 to the output code to let you know that the desc function with a column object parameter is not directly supported by Snowpark, but it has a workaround.

#EWI: SPRKPY1034 => pyspark.sql.functions.desc has a workaround, see documentation for more info
df.orderBy(desc(col))

Empfohlene Korrektur

As a workaround, you can call the snowflake.snowpark.Column.desc function from the column parameter.

df.orderBy(col.desc())
Szenario 2

Eingabe

Below is an example of a use of the pyspark.sql.functions.desc function that takes the name of the column as parameter.

df.orderBy(desc("colName"))

Ausgabe

The SMA adds the EWI SPRKPY1034 to the output code to let you know that the desc function with a column name parameter is not directly supported by Snowpark, but it has a workaround.

#EWI: SPRKPY1034 => pyspark.sql.functions.desc has a workaround, see documentation for more info
df.orderBy(desc("colName"))

Empfohlene Korrektur

As a workaround, you can convert the string parameter into a column object using the snowflake.snowpark.functions.col function and then call the snowflake.snowpark.Column.desc function.

df.orderBy(col("colName").desc())

Zusätzliche Empfehlungen

SPRKPY1035

Warnung

Dieser Problemcode ist jetzt veraltet

Meldung: pyspark.sql.functions.reverse hat eine Problemumgehung, siehe Dokumentation für weitere Informationen

Kategorie: Warnung

Beschreibung

This issue appears when the SMA detects a use of the pyspark.sql.functions.reverse function, which has a workaround.

Szenario

Eingabe

Below is an example of a use of the pyspark.sql.functions.reverse function that generates this EWI. In this example, the reverse function is used to reverse each string of the word column.

df = spark.createDataFrame([("hello",), ("world",)], ["word"])
df_reversed = df.withColumn("reversed_word", reverse(df["word"]))
df_reversed = df.withColumn("reversed_word", reverse("word"))

Ausgabe

The SMA adds the EWI SPRKPY1035 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([("hello",), ("world",)], ["word"])
#EWI: SPRKPY1035 => pyspark.sql.functions.reverse has a workaround, see documentation for more info
df_reversed = df.withColumn("reversed_word", reverse(df["word"]))
#EWI: SPRKPY1035 => pyspark.sql.functions.reverse has a workaround, see documentation for more info
df_reversed = df.withColumn("reversed_word", reverse("word"))

Empfohlene Korrektur

As a workaround, you can import the snowpark_extensions package which provides an extension for the reverse function.

import snowpark_extensions

df = spark.createDataFrame([("hello",), ("world",)], ["word"])
df_reversed = df.withColumn("reversed_word", reverse(df["word"]))
df_reversed = df.withColumn("reversed_word", reverse("word"))

Zusätzliche Empfehlungen

SPRKPY1036

Warnung

Dieser Problemcode ist jetzt veraltet

Meldung: pyspark.sql.column.Column.getField hat eine Problemumgehung, siehe Dokumentation für weitere Informationen

Kategorie: Warnung

Beschreibung

This issue appears when the SMA detects a use of the pyspark.sql.column.Column.getField function, which has a workaround.

Szenario

Eingabe

Below is an example of a use of the pyspark.sql.column.Column.getField function that generates this EWI. In this example, the getField function is used to extract the name from the info column.

df = spark.createDataFrame([(1, {"name": "John", "age": 30}), (2, {"name": "Jane", "age": 25})], ["id", "info"])
df_with_name = df.withColumn("name", col("info").getField("name"))

Ausgabe

The SMA adds the EWI SPRKPY1036 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([(1, {"name": "John", "age": 30}), (2, {"name": "Jane", "age": 25})], ["id", "info"])
#EWI: SPRKPY1036 => pyspark.sql.column.Column.getField has a workaround, see documentation for more info
df_with_name = df.withColumn("name", col("info").getField("name"))

Empfohlene Korrektur

As a workaround, you can use the Snowpark column indexer operator with the name of the field as the index.

df = spark.createDataFrame([(1, {"name": "John", "age": 30}), (2, {"name": "Jane", "age": 25})], ["id", "info"])
df_with_name = df.withColumn("name", col("info")["name"])

Zusätzliche Empfehlungen

SPRKPY1037

Warnung

Dieser Problemcode ist jetzt veraltet

Meldung: pyspark.sql.functions.sort_array hat eine Problemumgehung, siehe Dokumentation für weitere Informationen

Kategorie: Warnung

Beschreibung

This issue appears when the SMA detects a use of the pyspark.sql.functions.sort_array function, which has a workaround.

Szenario

Eingabe

Below is an example of a use of the pyspark.sql.functions.sort_array function that generates this EWI. In this example, the sort_array function is used to sort the numbers array in ascending and descending order.

df = spark.createDataFrame([(1, [3, 1, 2]), (2, [10, 5, 8]), (3, [6, 4, 7])], ["id", "numbers"])
df_sorted_asc = df.withColumn("sorted_numbers_asc", sort_array("numbers", asc=True))
df_sorted_desc = df.withColumn("sorted_numbers_desc", sort_array("numbers", asc=False))

Ausgabe

The SMA adds the EWI SPRKPY1037 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([(1, [3, 1, 2]), (2, [10, 5, 8]), (3, [6, 4, 7])], ["id", "numbers"])
#EWI: SPRKPY1037 => pyspark.sql.functions.sort_array has a workaround, see documentation for more info
df_sorted_asc = df.withColumn("sorted_numbers_asc", sort_array("numbers", asc=True))
#EWI: SPRKPY1037 => pyspark.sql.functions.sort_array has a workaround, see documentation for more info
df_sorted_desc = df.withColumn("sorted_numbers_desc", sort_array("numbers", asc=False))

Empfohlene Korrektur

As a workaround, you can import the snowpark_extensions package which provides an extension for the sort_array function.

import snowpark_extensions

df = spark.createDataFrame([(1, [3, 1, 2]), (2, [10, 5, 8]), (3, [6, 4, 7])], ["id", "numbers"])
df_sorted_asc = df.withColumn("sorted_numbers_asc", sort_array("numbers", asc=True))
df_sorted_desc = df.withColumn("sorted_numbers_desc", sort_array("numbers", asc=False))

Zusätzliche Empfehlungen

SPRKPY1038

Message: *spark element* is not yet recognized

Kategorie: Konvertierungsfehler

Beschreibung

Dieses Problem tritt auf, wenn es ein PySpark-Element in Ihrem Quellcode gibt, das von SMA nicht erkannt wurde. Dies kann verschiedene Gründe haben, wie zum Beispiel:

  • Ein Element, das in PySpark nicht existiert.

  • Ein Element, das in einer Version von PySpark hinzugefügt wurde, die von SMA noch nicht unterstützt wird.

  • Ein interner Fehler des SMA bei der Verarbeitung des Elements.

Dies ist ein allgemeiner Fehlercode, der von SMA für jedes nicht erkannte Element verwendet wird.

Szenario

Eingabe

Nachfolgend finden Sie ein Beispiel für die Verwendung einer Funktion, die von SMA nicht erkannt werden konnte, weil sie in PySpark nicht existiert.

from pyspark.sql import functions as F
F.unrecognized_function()

Ausgabe

The SMA adds the EWI SPRKPY1038 to the output code to let you know that this element could not be recognized.

from snowflake.snowpark import functions as F
#EWI: SPRKPY1038 => pyspark.sql.functions.non_existent_function is not yet recognized
F.unrecognized_function()

Empfohlene Korrektur

Um zu versuchen, das Problem zu identifizieren, können Sie die folgenden Überprüfungen durchführen:

  • Prüfen Sie, ob das Element in PySpark existiert.

  • Prüfen Sie, ob das Element richtig geschrieben ist.

  • Check if you are using a PySpark version supported by the SMA. To know which PySpark version is supported by the SMA at the moment of running the SMA, you can review the first page of the DetailedReport.docx file.

If it is a valid PySpark element, please report that you encountered a conversion error on that particular element using the Report an Issue option of the SMA and include any additional information that you think may be helpful.

Please note that if an element could not be recognized by the SMA, it does not mean that it is not supported by Snowpark. You should check the Snowpark Documentation to verify if an equivalent element exist.

Zusätzliche Empfehlungen

SPRKPY1039

Warnung

Dieser Problemcode ist jetzt veraltet

Meldung: pyspark.sql.column.Column.getItem hat eine Problemumgehung, siehe Dokumentation für weitere Informationen

Kategorie: Warnung

Beschreibung

This issue appears when the SMA detects a use of the pyspark.sql.column.Column.getItem function, which has a workaround.

Szenario

Eingabe

Below is an example of a use of the pyspark.sql.column.Column.getItem function that generates this EWI. In this example, the getItem function is used to get an item by position and by key.

df = spark.createDataFrame([(1, ["apple", "banana", "orange"]), (2, ["carrot", "avocado", "banana"])], ["id", "fruits"])
df.withColumn("first_fruit", col("fruits").getItem(0))

df = spark.createDataFrame([(1, {"apple": 10, "banana": 20}), (2, {"carrot": 15, "grape": 25}), (3, {"pear": 30, "apple": 35})], ["id", "fruit_quantities"])
df.withColumn("apple_quantity", col("fruit_quantities").getItem("apple"))

Ausgabe

The SMA adds the EWI SPRKPY1039 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([(1, ["apple", "banana", "orange"]), (2, ["carrot", "avocado", "banana"])], ["id", "fruits"])
#EWI: SPRKPY1039 => pyspark.sql.column.Column.getItem has a workaround, see documentation for more info
df.withColumn("first_fruit", col("fruits").getItem(0))

df = spark.createDataFrame([(1, {"apple": 10, "banana": 20}), (2, {"carrot": 15, "grape": 25}), (3, {"pear": 30, "apple": 35})], ["id", "fruit_quantities"])
#EWI: SPRKPY1039 => pyspark.sql.column.Column.getItem has a workaround, see documentation for more info
df.withColumn("apple_quantity", col("fruit_quantities").getItem("apple"))

Empfohlene Korrektur

Als Problemumgehung können Sie den Snowpark-Spaltenindexierungsoperator mit dem Namen oder der Position des Feldes als Index verwenden.

df = spark.createDataFrame([(1, ["apple", "banana", "orange"]), (2, ["carrot", "avocado", "banana"])], ["id", "fruits"])
df.withColumn("first_fruit", col("fruits")[0])

df = spark.createDataFrame([(1, {"apple": 10, "banana": 20}), (2, {"carrot": 15, "grape": 25}), (3, {"pear": 30, "apple": 35})], ["id", "fruit_quantities"])
df.withColumn("apple_quantity", col("fruit_quantities")["apple"])

Zusätzliche Empfehlungen

SPRKPY1040

Warnung

Dieser Problemcode ist jetzt veraltet

Meldung: pyspark.sql.functions.explode hat eine Problemumgehung, siehe Dokumentation für weitere Informationen

Kategorie: Warnung

Beschreibung

This issue appears when the SMA detects a use of the pyspark.sql.functions.explode function, which has a workaround.

Szenario

Eingabe

Below is an example of a use of the pyspark.sql.functions.explode function that generates this EWI. In this example, the explode function is used to generate one row per array item for the numbers column.

df = spark.createDataFrame([("Alice", [1, 2, 3]), ("Bob", [4, 5]), ("Charlie", [6, 7, 8, 9])], ["name", "numbers"])
exploded_df = df.select("name", explode(df.numbers).alias("number"))

Ausgabe

The SMA adds the EWI SPRKPY1040 to the output code to let you know that this function is not directly supported by Snowpark, but it has a workaround.

df = spark.createDataFrame([("Alice", [1, 2, 3]), ("Bob", [4, 5]), ("Charlie", [6, 7, 8, 9])], ["name", "numbers"])
#EWI: SPRKPY1040 => pyspark.sql.functions.explode has a workaround, see documentation for more info
exploded_df = df.select("name", explode(df.numbers).alias("number"))

Empfohlene Korrektur

As a workaround, you can import the snowpark_extensions package which provides an extension for the explode function.

import snowpark_extensions

df = spark.createDataFrame([("Alice", [1, 2, 3]), ("Bob", [4, 5]), ("Charlie", [6, 7, 8, 9])], ["name", "numbers"])
exploded_df = df.select("name", explode(df.numbers).alias("number"))

Zusätzliche Empfehlungen

SPRKPY1041

Warnung

This issue code has been deprecated since Spark Conversion Core Version 2.9.0

Meldung: pyspark.sql.functions.explode_outer hat eine Problemumgehung

Kategorie: Warnung

Beschreibung

This issue appears when the tool detects the usage of pyspark.sql.functions.explode_outer which has a workaround.

Szenario

Eingabe

Das Beispiel zeigt die Verwendung der explode_outer-Methode in einem Auswahl-Aufruf.

df = spark.createDataFrame(
    [(1, ["foo", "bar"], {"x": 1.0}),
     (2, [], {}),
     (3, None, None)],
    ("id", "an_array", "a_map")
)

df.select("id", "an_array", explode_outer("a_map")).show()

Ausgabe

The tool adds the EWI SPRKPY1041 indicating that a workaround can be implemented.

df = spark.createDataFrame(
    [(1, ["foo", "bar"], {"x": 1.0}),
     (2, [], {}),
     (3, None, None)],
    ("id", "an_array", "a_map")
)

#EWI: SPRKPY1041 => pyspark.sql.functions.explode_outer has a workaround, see documentation for more info
df.select("id", "an_array", explode_outer("a_map")).show()

Empfohlene Korrektur

As a workaround, you can import the snowpark_extensions package, which contains a helper for the explode_outer function.

import snowpark_extensions

df = spark.createDataFrame(
    [(1, ["foo", "bar"], {"x": 1.0}),
     (2, [], {}),
     (3, None, None)],
    ("id", "an_array", "a_map")
)

df.select("id", "an_array", explode_outer("a_map")).show()

Zusätzliche Empfehlungen

SPRKPY1042

Meldung: pyspark.sql.functions.posexplode hat eine Problemumgehung

Kategorie: Warnung

Beschreibung

This issue appears when the tool detects the usage of pyspark.sql.functions.posexplode which has a workaround.

Szenarien

There are a couple of scenarios that this method can handle depending on the type of column it is passed as a parameter, it can be a list of values or a map/directory (keys/values).

Szenario 1

Eingabe

Below is an example of the usage of posexplode passing as a parameter of a list of values.

df = spark.createDataFrame(
    [Row(a=1,
         intlist=[1, 2, 3])])

df.select(posexplode(df.intlist)).collect()

Ausgabe

The tool adds the EWI SPRKPY1042 indicating that a workaround can be implemented.

df = spark.createDataFrame(
    [Row(a=1,
         intlist=[100, 200, 300])])
#EWI: SPRKPY1042 => pyspark.sql.functions.posexplode has a workaround, see documentation for more info

df.select(posexplode(df.intlist)).show()

Empfohlene Korrektur

For having the same behavior, use the method functions.flatten, drop extra columns, and rename index and value column names.

df = spark.createDataFrame(
  [Row(a=1,
       intlist=[1, 2, 3])])

df.select(
    flatten(df.intlist))\
    .drop("DATA", "SEQ", "KEY", "PATH", "THIS")\
    .rename({"INDEX": "pos", "VALUE": "col"}).show()
Szenario 2

Eingabe

Below is another example of the usage of posexplode passing as a parameter a map/dictionary (keys/values)

df = spark.createDataFrame([
    [1, [1, 2, 3], {"Ashi Garami": "Single Leg X"}, "Kimura"],
    [2, [11, 22], {"Sankaku": "Triangle"}, "Coffee"]
],
schema=["idx", "lists", "maps", "strs"])

df.select(posexplode(df.maps)).show()

Ausgabe

The tool adds the EWI SPRKPY1042 indicating that a workaround can be implemented.

df = spark.createDataFrame([
    [1, [1, 2, 3], {"Ashi Garami": "Single Leg X"}, "Kimura"],
    [2, [11, 22], {"Sankaku": "Triangle"}, "Coffee"]
],
schema=["idx", "lists", "maps", "strs"])
#EWI: SPRKPY1042 => pyspark.sql.functions.posexplode has a workaround, see documentation for more info

df.select(posexplode(df.maps)).show()

Empfohlene Korrektur

As a workaround, you can use functions.row_number to get the position and functions.explode with the name of the field to get the value the key/value for dictionaries.

df = spark.createDataFrame([
    [10, [1, 2, 3], {"Ashi Garami": "Single Leg X"}, "Kimura"],
    [11, [11, 22], {"Sankaku": "Triangle"}, "Coffee"]
],
    schema=["idx", "lists", "maps", "strs"])

window = Window.orderBy(col("idx").asc())

df.select(
    row_number().over(window).alias("pos"),
    explode(df.maps).alias("key", "value")).show()

Hinweis: mit „row_number“ ist nicht vollständig äquivalent, da es mit 1 beginnt (nicht mit Null wie die Spark-Methode)

Zusätzliche Empfehlungen

SPRKPY1043

Meldung: pyspark.sql.functions.posexplode_outer hat eine Problemumgehung

Kategorie: Warnung

Beschreibung

This issue appears when the tool detects the usage of pyspark.sql.functions.posexplode_outer which has a workaround.

Szenarien

There are a couple of scenarios that this method can handle depending on the type of column it is passed as a parameter, it can be a list of values or a map/directory (keys/values).

Szenario 1

Eingabe

Below is an example that shows the usage of posexplode_outer passing a list of values.

df = spark.createDataFrame(
    [
        (1, ["foo", "bar"]),
        (2, []),
        (3, None)],
    ("id", "an_array"))

df.select("id", "an_array", posexplode_outer("an_array")).show()

Ausgabe

The tool adds the EWI SPRKPY1043 indicating that a workaround can be implemented.

df = spark.createDataFrame(
    [
        (1, ["foo", "bar"]),
        (2, []),
        (3, None)],
    ("id", "an_array"))
#EWI: SPRKPY1043 => pyspark.sql.functions.posexplode_outer has a workaround, see documentation for more info

df.select("id", "an_array", posexplode_outer("an_array")).show()

Empfohlene Korrektur

For having the same behavior, use the method functions.flatten sending the outer parameter in True, drop extra columns, and rename index and value column names.

df = spark.createDataFrame(
    [
        (1, ["foo", "bar"]),
        (2, []),
        (3, None)],
    ("id", "an_array"))

df.select(
    flatten(df.an_array, outer=True))\
    .drop("DATA", "SEQ", "KEY", "PATH", "THIS")\
    .rename({"INDEX": "pos", "VALUE": "col"}).show()
Szenario 2

Eingabe

Nachfolgend ein weiteres Beispiel für die Verwendung von posexplode_outer, das map/dictionary (keys/values) übergibt

df = spark.createDataFrame(
    [
        (1, {"x": 1.0}),
        (2, {}),
        (3, None)],
    ("id", "a_map"))

df.select(posexplode_outer(df.a_map)).show()

Ausgabe

The tool adds the EWI SPRKPY1043 indicating that a workaround can be implemented.

df = spark.createDataFrame(
    [
        (1, {"x": "Ashi Garami"}),
        (2, {}),
        (3, None)],
    ("id", "a_map"))
#EWI: SPRKPY1043 => pyspark.sql.functions.posexplode_outer has a workaround, see documentation for more info

df.select(posexplode_outer(df.a_map)).show()

Empfohlene Korrektur

As a workaround, you can use functions.row_number to get the position and functions.explode_outer with the name of the field to get the value of the key/value for dictionaries.

df = spark.createDataFrame(
    [
        (1, {"x": "Ashi Garami"}),
        (2,  {}),
        (3, None)],
    ("id", "a_map"))

window = Window.orderBy(col("id").asc())

df.select(
    row_number().over(window).alias("pos"),
          explode_outer(df.a_map)).show()

Hinweis: mit „row_number“ ist nicht vollständig äquivalent, da es mit 1 beginnt (nicht mit Null wie die Spark-Methode)

Zusätzliche Empfehlungen

SPRKPY1044

Warnung

This issue code has been deprecated since Spark Conversion Core Version 2.4.0

Meldung: pyspark.sql.functions.split hat eine Problemumgehung

Kategorie: Warnung.

Beschreibung

This issue appears when the tool detects the usage of pyspark.sql.functions.split which has a workaround.

Szenarien

Es gibt mehrere Szenarien, die von der Anzahl der an die Methode übergebenen Parameter abhängen.

Szenario 1

Eingabe

Below is an example when the function split has just the str and pattern parameters

F.split('col', '\\|')

Ausgabe

The tool shows the EWI SPRKPY1044 indicating there is a workaround.

#EWI: SPRKPY1044 => pyspark.sql.functions.split has a workaround, see the documentation for more info
F.split('col', '\\|')

Empfohlene Korrektur

As a workaround, you can call the function snowflake.snowpark.functions.lit with the pattern parameter and send it into the split.

F.split('col', lit('\\|'))
## the result of lit will be sent to the split function

Szenario 2

Eingabe

Below is another example when the function split has the str, pattern, and limit parameters.

F.split('col', '\\|', 2)

Ausgabe

The tool shows the EWI SPRKPY1044 indicating there is a workaround.

#EWI: SPRKPY1044 => pyspark.sql.functions.split has a workaround, see the documentation for more info
F.split('col', '\\|', 2)

Empfohlene Korrektur

Dieses spezielle Szenario wird nicht unterstützt.

Zusätzliche Empfehlungen

SPRKPY1045

Meldung: pyspark.sql.functions.map_values hat eine Problemumgehung

Kategorie: Warnung.

Beschreibung

Diese Funktion wird verwendet, um die Liste der Werte aus einer Spalte zu extrahieren, die eine map/dictionary (keys/values) enthält.

The issue appears when the tool detects the usage of pyspark.sql.functions.map_values which has a workaround.

Szenario

Eingabe

Below is an example of the usage of the method map_values.

df = spark.createDataFrame(
    [(1, {'Apple': 'Fruit', 'Potato': 'Vegetable'})],
    ("id", "a_map"))

df.select(map_values("a_map")).show()

Ausgabe

The tool adds the EWI SPRKPY1045 indicating that a workaround can be implemented.

df = spark.createDataFrame(
    [(1, {'Apple': 'Fruit', 'Potato': 'Vegetable'})],
    ("id", "a_map"))
#EWI: SPRKPY1045 => pyspark.sql.functions.map_values has a workaround, see documentation for more info

df.select(map_values("a_map")).show()

Empfohlene Korrektur

As a workaround, you can create an udf to get the values for a column. The below example shows how to create the udf, then assign it to F.map_values, and then make use of it.

from snowflake.snowpark import functions as F
from snowflake.snowpark.types import ArrayType, MapType

map_values_udf=None

def map_values(map):
    global map_values_udf
    if not map_values_udf:
        def _map_values(map: dict)->list:
            return list(map.values())
        map_values_udf = F.udf(_map_values,return_type=ArrayType(),input_types=[MapType()],name="map_values",is_permanent=False,replace=True)
    return map_values_udf(map)

F.map_values = map_values

df.select(map_values(colDict))

Zusätzliche Empfehlungen

SPRKPY1046

Warnung

This issue code has been deprecated since Spark Conversion Core Version 2.1.22

Meldung: pyspark.sql.functions.monotonically_increasing_id hat eine Problemumgehung

Kategorie: Warnung.

Beschreibung

This issue appears when the tool detects the usage of pyspark.sql.functions.monotonically_increasing_id which has a workaround.

Szenario

Eingabe

Below is an example of the usage of the method monotonically_increasing_id.

from pyspark.sql import functions as F

spark.range(0, 10, 1, 2).select(F.monotonically_increasing_id()).show()

Ausgabe

The tool adds the EWI SPRKPY1046 indicating that a workaround can be implemented.

from pyspark.sql import functions as F
#EWI: SPRKPY1046 => pyspark.sql.functions.monotonically_increasing_id has a workaround, see documentation for more info
spark.range(0, 10, 1, 2).select(F.monotonically_increasing_id()).show()

Empfohlene Korrektur

Aktualisieren Sie die Toolversion.

Zusätzliche Empfehlungen

SPRKPY1047

Warnung

This issue code has been deprecated since Spark Conversion Core Version 4.6.0

Beschreibung

This issue appears when the tool detects the usage of pyspark.context.SparkContext.setLogLevel which has a workaround.

Szenario

Eingabe

Below is an example of the usage of the method setLogLevel.

sparkSession.sparkContext.setLogLevel("WARN")

Ausgabe

The tool adds the EWI SPRKPY1047 indicating that a workaround can be implemented.

#EWI: SPRKPY1047 => pyspark.context.SparkContext.setLogLevel has a workaround, see documentation for more info
sparkSession.sparkContext.setLogLevel("WARN")

Empfohlene Korrektur

Replace the setLogLevel function usage with logging.basicConfig that provides a set of convenience functions for simple logging usage. In order to use it, we need to import two modules, „logging“ and „sys“, and the level constant should be replaced using the „Level equivalent table“:

import logging
import sys
logging.basicConfig(stream=sys.stdout, level=logging.WARNING)
  • Ebenen-Äquivalent-Tabelle

Ebenen-Quellen-Parameter

Ebenen-Ziel-Parameter

„ALL“

<mark style=“color:red;“>**This has no equivalent**</mark>

„DEBUG“

Protokollieren.DEBUG

„ERROR“

Protokollieren.ERROR

„FATAL“

Protokollieren.CRITICAL

„INFO“

Protokollieren.INFO

„OFF“

Protokollieren.NOTSET

„TRACE“

<mark style=“color:red;“>**This has no equivalent**</mark>

„WARN“

Protokollieren.WARNING

Zusätzliche Empfehlungen

SPRKPY1048

Warnung

This issue code has been deprecated since Spark Conversion Core Version 2.4.0

Meldung: pyspark.sql.session.SparkSession. conf hat eine Problemumgehung

Kategorie: Warnung.

Beschreibung

This issue appears when the tool detects the usage of pyspark.sql.session.SparkSession.conf which has a workaround.

Szenario

Eingabe

Below is an example of how to set a configuration into the property conf .

spark.conf.set("spark.sql.crossJoin.enabled", "true")

Ausgabe

The tool adds the EWI SPRKPY1048 indicating that a workaround can be implemented.

#EWI: SPRKPY1048 => pyspark.sql.session.SparkSession.conf has a workaround, see documentation for more info
spark.conf.set("spark.sql.crossJoin.enabled", "true")

Empfohlene Korrektur

SparkSession.conf wird verwendet, um einige spezielle Einstellungen zu übergeben, die nur von Pyspark verwendet werden und nicht für Snowpark gelten. Sie können den Code entfernen oder kommentieren

#spark.conf.set("spark.sql.crossJoin.enabled", "true")

Zusätzliche Empfehlungen

SPRKPY1049

Warnung

This issue code has been deprecated since Spark Conversion Core Version 2.1.9

Meldung: pyspark.sql.session.SparkSession.sparkContext hat eine Problemumgehung

Kategorie: Warnung.

Beschreibung

This issue appears when the tool detects the usage of pyspark.sql.session.SparkSession.sparkContext which has a workaround.

Szenario

Eingabe

Below is an example that creates a spark session and then uses the SparkContext property to print the appName.

print("APP Name :"+spark.sparkContext.appName())

Ausgabe

The tool adds the EWI SPRKPY1049 indicating that a workaround can be implemented.

#EWI: SPRKPY1049 => pyspark.sql.session.SparkSession.sparkContext has a workaround, see documentation for more info
print("APP Name :"+spark.sparkContext.appName())

Empfohlene Korrektur

SparkContext wird in SnowPark nicht unterstützt, aber Sie können auf die Methoden und Eigenschaften von SparkContext direkt von der Sitzungsinstanz aus zugreifen.

## Pyspark
print("APP Name :"+spark.sparkContext.appName())
can be used in SnowPark removing the sparkContext as:
#Manual adjustment in SnowPark
print("APP Name :"+spark.appName());

Zusätzliche Empfehlungen

SPRKPY1050

Meldung: pyspark.conf.SparkConf. set hat eine Problemumgehung

Kategorie: Warnung.

Beschreibung

This issue appears when the tool detects the usage of pyspark.conf.SparkConf.set which has a workaround.

Szenario

Eingabe

Below is an example that sets a variable using conf.set.

conf = SparkConf().setAppName('my_app')

conf.set("spark.storage.memoryFraction", "0.5")

Ausgabe

The tool adds the EWI SPRKPY1050 indicating that a workaround can be implemented.

conf = SparkConf().setAppName('my_app')

#EWI: SPRKPY1050 => pyspark.conf.SparkConf.set has a workaround, see documentation for more info
conf.set("spark.storage.memoryFraction", "0.5")

Empfohlene Korrektur

SparkConf.set wird verwendet, um eine Konfigurationseinstellung zu setzen, die nur von Pyspark verwendet wird und nicht für Snowpark gilt. Sie können den Code entfernen oder kommentieren

#conf.set("spark.storage.memoryFraction", "0.5")

Zusätzliche Empfehlungen

SPRKPY1051

Warnung

This issue code has been deprecated since Spark Conversion Core Version 2.4.0

Meldung: pyspark.sql.session.SparkSession.Builder.master hat eine Problemumgehung

Kategorie: Warnung.

Beschreibung

This issue appears when the tool detects pyspark.sql.session.SparkSession.Builder.master usage which has a workaround.

Szenario

Eingabe

Below is an example of the usage of the method builder.master to set the Spark Master URL to connect to local using 1 core.

spark = SparkSession.builder.master("local[1]")

Ausgabe

The tool adds the EWI SPRKPY1051 indicating that a workaround can be implemented.

#EWI: SPRKPY1051 => pyspark.sql.session.SparkSession.Builder.master has a workaround, see documentation for more info
spark = Session.builder.master("local[1]")

Empfohlene Korrektur

pyspark.sql.session.SparkSession.Builder.master is used to set up a Spark Cluster. Snowpark doesn’t use Spark Clusters so you can remove or comment the code.

## spark = Session.builder.master("local[1]")

Zusätzliche Empfehlungen

SPRKPY1052

Warnung

This issue code has been deprecated since Spark Conversion Core Version 2.8.0

Meldung: pyspark.sql.session.SparkSession.Builder.enableHiveSupport hat eine Problemumgehung

Kategorie: Warnung.

Beschreibung

This issue appears when the tool detects the usage of pyspark.sql.session.SparkSession.Builder.enableHiveSupport which has a workaround.

Szenario

Eingabe

Below is an example that configures the SparkSession and enables the hive support using the method enableHiveSupport.

spark = Session.builder.appName("Merge_target_table")\
        .config("spark.port.maxRetries","100") \
        .enableHiveSupport().getOrCreate()

Ausgabe

The tool adds the EWI SPRKPY1052 indicating that a workaround can be implemented.

#EWI: SPRKPY1052 => pyspark.sql.session.SparkSession.Builder.enableHiveSupport has a workaround, see documentation for more info
spark = Session.builder.appName("Merge_target_table")\
        .config("spark.port.maxRetries","100") \
        .enableHiveSupport().getOrCreate()

Empfohlene Korrektur

Remove the use of enableHiveSupport function because it is not needed in Snowpark.

spark = Session.builder.appName("Merge_target_table")\
        .config("spark.port.maxRetries","100") \
        .getOrCreate()

Zusätzliche Empfehlungen

SPRKPY1053

Meldung: Beim Extrahieren der dbc-Dateien ist ein Fehler aufgetreten.

Kategorie: Warnung.

Beschreibung

Dieses Problem tritt auf, wenn eine dbc-Datei nicht extrahiert werden kann. Diese Warnung könnte durch einen oder mehrere der folgenden Gründe verursacht werden: Zu schwer, unzugänglich, schreibgeschützt, usw.

Zusätzliche Empfehlungen

  • Als Problemumgehung können Sie die Größe der Datei überprüfen, wenn sie zu groß ist, um verarbeitet zu werden. Analysieren Sie auch, ob das Tool darauf zugreifen kann, um Zugriffsprobleme zu vermeiden.

  • Wenn Sie weitere Unterstützung benötigen, können Sie uns eine E-Mail an snowconvert-info@snowflake.com senden. Wenn Sie einen Supportvertrag mit Snowflake abgeschlossen haben, wenden Sie sich an Ihren Vertriebsingenieur, der Ihren Supportbedarf ermitteln kann.

SPRKPY1080

Meldung: Der Wert von SparkContext wird durch die Variable ‚session‘ ersetzt.

Kategorie: Warnung

Beschreibung

Der Spark-Kontext wird in einer Variablen namens „session“ gespeichert, die eine Snowpark-Sitzung erstellt.

Szenario

Eingabe

Dieser Codeausschnitt beschreibt ein SparkContext

## Input Code
from pyspark import SparkContext
from pyspark.sql import SparkSession

def example1():

    sc = SparkContext("local[*]", "TestApp")

    sc.setLogLevel("ALL")
    sc.setLogLevel("DEBUG")

Ausgabe

In diesem Ausgabecode hat SMA die PySpark.SparkContext durch eine SparkSession ersetzt. Beachten Sie, dass SMA auch eine Vorlage hinzufügt, um die Verbindung in der „connection.json“-Datei zu ersetzen und diese Konfiguration dann in die connection_parameter-Variable zu laden.

## Output Code
import logging
import sys
import json
from snowflake.snowpark import Session
from snowflake.snowpark import Session

def example1():
    jsonFile = open("connection.json")
    connection_parameter = json.load(jsonFile)
    jsonFile.close()
    #EWI: SPRKPY1080 => The value of SparkContext is replaced with 'session' variable.
    sc = Session.builder.configs(connection_parameter).getOrCreate()
    sc.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
    logging.basicConfig(stream = sys.stdout, level = logging.NOTSET)
    logging.basicConfig(stream = sys.stdout, level = logging.DEBUG)

Empfohlene Korrektur

Die „connection.json“-Konfigurationsdatei muss mit den erforderlichen Verbindungsinformationen aktualisiert werden:

{
  "user": "my_user",
  "password": "my_password",
  "account": "my_account",
  "role": "my_role",
  "warehouse": "my_warehouse",
  "database": "my_database",
  "schema": "my_schema"
}

Zusätzliche Empfehlungen

SPRKPY1054

Meldung: pyspark.sql.readwriter.DataFrameReader.format wird nicht unterstützt.

Kategorie: Warnung.

Beschreibung

This issue appears when the pyspark.sql.readwriter.DataFrameReader.format has an argument that is not supported by Snowpark.

Szenarien

There are some scenarios depending on the type of format you are trying to load. It can be a supported , or non-supported format.

Szenario 1

Eingabe

Das Tool analysiert den Typ des Formats, das Sie zu laden versuchen, die unterstützten Formate sind:

  • Csv

  • JSON

  • Parquet

  • Orc

The below example shows how the tool transforms the format method when passing a Csv value.

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df1 = spark.read.format('csv').load('/path/to/file')

Ausgabe

The tool transforms the format method into a Csv method call.

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()

df1 = spark.read.csv('/path/to/file')

Empfohlene Korrektur

In diesem Fall zeigt das Tool die EWI nicht an, d. h. es ist keine Korrektur erforderlich.

Szenario 2

Eingabe

The below example shows how the tool transforms the format method when passing a Jdbc value.

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()

df2 = spark.read.format('jdbc') \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("dbtable", "employee") \
    .option("user", "root") \
    .option("password", "root") \
    .load()

Ausgabe

The tool shows the EWI SPRKPY1054 indicating that the value „jdbc“ is not supported.

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()

#EWI: SPRKPY1054 => pyspark.sql.readwriter.DataFrameReader.format with argument value "jdbc" is not supported.
#EWI: SPRKPY1002 => pyspark.sql.readwriter.DataFrameReader.load is not supported

df2 = spark.read.format('jdbc') \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("dbtable", "employee") \
    .option("user", "root") \
    .option("password", "root") \
    .load()

Empfohlene Korrektur

For the not supported scenarios, there is no specific fix since it depends on the files that are trying to be read.

Szenario 3

Eingabe

The below example shows how the tool transforms the format method when passing a CSV, but using a variable instead.

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()

myFormat = 'csv'
df3 = spark.read.format(myFormat).load('/path/to/file')

Ausgabe

Since the tool can not determine the value of the variable in runtime, shows the EWI SPRKPY1054 indicating that the value „“ is not supported.

from snowflake.snowpark import Session
spark = Session.builder.getOrCreate()

myFormat = 'csv'
#EWI: SPRKPY1054 => pyspark.sql.readwriter.DataFrameReader.format with argument value "" is not supported.
#EWI: SPRKPY1002 => pyspark.sql.readwriter.DataFrameReader.load is not supported
df3 = spark.read.format(myFormat).load('/path/to/file')

Empfohlene Korrektur

As a workaround, you can check the value of the variable and add it as a string to the format call.

Zusätzliche Empfehlungen

SPRKPY1055

Meldung: pyspark.sql.readwriter.DataFrameReader.option key value wird nicht unterstützt.

Kategorie: Warnung.

Beschreibung

This issue appears when the pyspark.sql.readwriter.DataFrameReader.option key value is not supported by SnowFlake.

Das Tool analysiert die Parameter des Optionsaufrufs, und je nach Methode (CSV oder JSON oder PARQUET) kann der Schlüsselwert eine Äquivalenz in Snowpark haben oder nicht. Wenn alle Parameter eine Äquivalenz haben, fügt das Tool die EWI nicht hinzu und ersetzt den Schlüsselwert durch sein Äquivalentes, andernfalls fügt das Tool die EWI hinzu.

Liste der Äquivalenzen:

  • Äquivalenzen für CSV:

Spark-Optionstasten

Snowpark-Äquivalenzen

sep

FIELD_DELIMITER

Header

PARSE_HEADER

lineSep

RECORD_DELIMITER

pathGlobFilter

PATTERN

Anführungszeichen

FIELD_OPTIONALLY_ENCLOSED_BY

nullValue

NULL_IF

dateFormat

DATE_FORMAT

timestampFormat

TIMESTAMP_FORMAT

inferSchema

INFER_SCHEMA

Trennzeichen

FIELD_DELIMITER

  • Äquivalenzen für JSON:

Spark-Optionstasten

Snowpark-Äquivalenzen

dateFormat

DATE_FORMAT

timestampFormat

TIMESTAMP_FORMAT

pathGlobFilter

PATTERN

  • Äquivalenzen für PARQUET:

Spark-Optionstasten

Snowpark-Äquivalenzen

pathGlobFilter

PATTERN

Alle anderen Schlüsseloptionen, die nicht in einer der oben genannten Tabellen enthalten sind, werden nicht unterstützt oder haben keine Äquivalenten in Snowpark. Wenn das der Fall ist, fügt das Tool die EWI mit den Parameterinformationen hinzu und entfernt sie aus der Kette.

Szenarien

Die folgenden Szenarien gelten für CSV, JSON und PARQUET.

There are a couple of scenarios depending on the value of the key used in the option method.

Szenario 1

Eingabe

Below is an example of a option call using a equivalent key.

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

## CSV example:
spark.read.option("header", True).csv(csv_file_path)

## Json example:
spark.read.option("dateFormat", "dd-MM-yyyy").json(json_file_path)

## Parquet example:
spark.read.option("pathGlobFilter", "*.parquet").parquet(parquet_file_path)

Ausgabe

Das Tool wandelt den Schlüssel in das richtige Äquivalent um.

from snowflake.snowpark import Session

spark = Session.builder.getOrCreate()

## CSV example:
spark.read.option("PARSE_HEADER", True).csv(csv_file_path)

## Json example:
spark.read.option("DATE_FORMAT", "dd-MM-yyyy").json(json_file_path)

## Parquet example:
spark.read.option("PATTERN", "*.parquet").parquet(parquet_file_path)

Empfohlene Korrektur

Da das Tool den Wert des Schlüssels umwandelt, ist eine Korrektur nicht erforderlich.

Szenario 2

Eingabe

Below is an example of a option call using a non-equivalent key.

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

## CSV example:
spark.read.option("anotherKeyValue", "myVal").csv(csv_file_path)

## Json example:
spark.read.option("anotherKeyValue", "myVal").json(json_file_path)

## Parquet example:
spark.read.option("anotherKeyValue", "myVal").parquet(parquet_file_path)

Ausgabe

The tool adds the EWI SPRKPY1055 indicating the key is not supported and removes the option call.

from snowflake.snowpark import Session

spark = Session.builder.getOrCreate()

## CSV example:
#EWI: SPRKPY1055 => pyspark.sql.readwriter.DataFrameReader.option with key value "anotherKeyValue" is not supported.
spark.read.csv(csv_file_path)

## Json example:
#EWI: SPRKPY1055 => pyspark.sql.readwriter.DataFrameReader.option with key value "anotherKeyValue" is not supported.
spark.read.json(json_file_path)

## Parquet example:
#EWI: SPRKPY1055 => pyspark.sql.readwriter.DataFrameReader.option with key value "anotherKeyValue" is not supported.
spark.read.parquet(parquet_file_path)

Empfohlene Korrektur

Es wird empfohlen, die Verhaltensweise nach der Umwandlung zu überprüfen.

Zusätzliche Empfehlungen

  • Wenn nicht-äquivalente Parameter vorhanden sind, empfiehlt es sich, die Verhaltensweise nach der Umwandlung zu überprüfen.

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1056

Warnung

Dieser Problemcode ist jetzt veraltet

Meldung: pyspark.sql.readwriter.DataFrameReader.option argument _ <argument_name> _ ist kein Literal und kann nicht ausgewertet werden

Kategorie: Warnung

Beschreibung

This issue appears when the argument’s key or value of the pyspark.sql.readwriter.DataFrameReader.option function is not a literal value (for example a variable). The SMA does a static analysis of your source code, and therefore it is not possible to evaluate the content of the argument.

Szenario

Eingabe

Below is an example of a use of the pyspark.sql.readwriter.DataFrameReader.option function that generates this EWI.

my_value = ...
my_option = ...

df1 = spark.read.option("dateFormat", my_value).format("csv").load('filename.csv')
df2 = spark.read.option(my_option, "false").format("csv").load('filename.csv')

Ausgabe

The SMA adds the EWI SPRKPY1056 to the output code to let you know that the argument of this function is not a literal value, and therefore it could not be evaluated by the SMA.

my_value = ...
my_option = ...

#EWI: SPRKPY1056 => pyspark.sql.readwriter.DataFrameReader.option argument "dateFormat" is not a literal and can't be evaluated
df1 = spark.read.option("dateFormat", my_value).format("csv").load('filename.csv')
#EWI: SPRKPY1056 => pyspark.sql.readwriter.DataFrameReader.option argument key is not a literal and can't be evaluated
df2 = spark.read.option(my_option, "false").format("csv").load('filename.csv')

Empfohlene Korrektur

Even though the SMA was unable to evaluate the argument, it does not mean that it is not supported by Snowpark. Please make sure that the value of the argument is valid and equivalent in Snowpark by checking the documentation.

Zusätzliche Empfehlungen

SPRKPY1057

Warnung

This Issue Code has been deprecated since Spark Conversion Core Version 4.8.0

Message: PySpark Dataframe Option argument contains a value that is not a literal, therefore cannot be evaluated

Kategorie: Warnung.

Beschreibung

Dieser Ausgabecode ist veraltet. Wenn Sie eine ältere Version verwenden, führen Sie ein Upgrade auf die neueste Version aus.

Zusätzliche Empfehlungen

SPRKPY1058

Meldung: < Methode > mit < Schlüssel > Plattformspezifischer Schlüssel wird nicht unterstützt.

Kategorie: ConversionError

Beschreibung

The get and set methods from pyspark.sql.conf.RuntimeConfig are not supported with a Platform specific key.

Szenarien

Not all usages of get or set methods are going to have an EWI in the output code. This EWI appears when the tool detects the usage of these methods with a Platform specific key which is not supported.

Szenario 1

Eingabe

Below is an example of the get or set methods with supported keys in Snowpark.

session.conf.set("use_constant_subquery_alias", False)
spark.conf.set("sql_simplifier_enabled", True)

session.conf.get("use_constant_subquery_alias")
session.conf.get("use_constant_subquery_alias")

Ausgabe

Da die Schlüssel in Snowpark unterstützt werden, fügt das Tool die EWI nicht in den Ausgabecode ein.

session.conf.set("use_constant_subquery_alias", True)
session.conf.set("sql_simplifier_enabled", False)

session.conf.get("use_constant_subquery_alias")
session.conf.get("sql_simplifier_enabled")

Empfohlene Korrektur

Für dieses Szenario gibt es keine empfohlene Lösung.

Szenario 2

Eingabe

Nachfolgend finden Sie ein Beispiel mit nicht unterstützten Schlüsseln.

data =
    [
      ("John", 30, "New York"),
      ("Jane", 25, "San Francisco")
    ]

session.conf.set("spark.sql.shuffle.partitions", "50")
spark.conf.set("spark.yarn.am.memory", "1g")

session.conf.get("spark.sql.shuffle.partitions")
session = spark.conf.get("spark.yarn.am.memory")

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

Ausgabe

The tool adds this EWI SPRKPY1058 on the output code to let you know that these methods are not supported with a Platform specific key.

data =
    [
      ("John", 30, "New York"),
      ("Jane", 25, "San Francisco")
    ]

#EWI: SPRKPY1058 => pyspark.sql.conf.RuntimeConfig.set method with this "spark.sql.shuffle.partitions" Platform specific key is not supported.
spark.conf.set("spark.sql.shuffle.partitions", "50")
#EWI: SPRKPY1058 => pyspark.sql.conf.RuntimeConfig.set method with this "spark.yarn.am.memory" Platform specific key is not supported.
spark.conf.set("spark.yarn.am.memory", "1g")

#EWI: SPRKPY1058 => pyspark.sql.conf.RuntimeConfig.get method with this "spark.sql.shuffle.partitions" Platform specific key is not supported.
spark.conf.get("spark.sql.shuffle.partitions")
#EWI: SPRKPY1058 => pyspark.sql.conf.RuntimeConfig.get method with this "spark.yarn.am.memory" Platform specific key is not supported.
spark.conf.get("spark.yarn.am.memory")

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

Empfohlene Korrektur

Die empfohlene Korrektur ist, diese Methoden zu entfernen.

data =
    [
      ("John", 30, "New York"),
      ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

Zusätzliche Empfehlungen

SPRKPY1059

Warnung

This issue code has been deprecated since Spark Conversion Core Version 2.45.1

Message: pyspark.storagelevel.StorageLevel has a workaround, see documentation.

Kategorie: Warnung

Beschreibung

Currently, the use of StorageLevel is not required in Snowpark since Snowflake controls the storage. For more information, you can refer to the EWI SPRKPY1072

Zusätzliche Empfehlungen

SPRKPY1060

Meldung: Der Authentifizierungsmechanismus ist connection.json (Vorlage vorhanden).

Kategorie: Warnung.

Beschreibung

This issue appears when the tool detects the usage of pyspark.conf.SparkConf.

Szenario

Eingabe

Da der Authentifizierungsmechanismus in Snowpark anders ist, entfernt das Tool die Verwendungen und erstellt stattdessen eine Verbindungskonfigurationsdatei (connection.json).

from pyspark import SparkConf

my_conf = SparkConf(loadDefaults=True)

Ausgabe

The tool adds the EWI SPRKPY1060 indicating that the authentication mechanism is different.

#EWI: SPRKPY1002 => pyspark.conf.SparkConf is not supported
#EWI: SPRKPY1060 => The authentication mechanism is connection.json (template provided).
#my_conf = Session.builder.configs(connection_parameter).getOrCreate()

my_conf = None

Empfohlene Korrektur

To create a connection it is necessary that you fill in the information in the connection.json file.

{
  "user": "<USER>",
  "password": "<PASSWORD>",
  "account": "<ACCOUNT>",
  "role": "<ROLE>",
  "warehouse": "<WAREHOUSE>",
  "database": "<DATABASE>",
  "schema": "<SCHEMA>"
}

Zusätzliche Empfehlungen

SPRKPY1061

Meldung: Snowpark unterstützt keine unix_timestamp-Funktionen

Kategorie: Warnung

Beschreibung

In Snowpark, the first parameter is mandatory; the issue appears when the tool detects the usage of pyspark.sql.functions.unix_timestamp with no parameters.

Szenario

Eingabe

Below an example that calls the unix_timestamp method without parameters.

data = [["2015-04-08", "10"],["2015-04-10", "15"]]

df = spark.createDataFrame(data, ['dt', 'val'])
df.select(unix_timestamp()).show()

Ausgabe

The Snowpark signature for this function unix_timestamp(e: ColumnOrName, fmt: Optional["Column"] = None), as you can notice the first parameter it’s required.

The tool adds this EWI SPRKPY1061 to let you know that function unix_timestamp with no parameters it’s not supported in Snowpark.

data = [["2015-04-08", "10"],["2015-04-10", "15"]]

df = spark.createDataFrame(data, ['dt', 'val'])
#EWI: SPRKPY1061 => Snowpark does not support unix_timestamp functions with no parameters. See documentation for more info.
df.select(unix_timestamp()).show()

Empfohlene Korrektur

Als Problemumgehung können Sie zumindest den Namen oder die Spalte der Zeitstempelzeichenfolge hinzufügen.

data = [["2015-04-08", "10"],["2015-04-10", "15"]]

df = spark.createDataFrame(data, ["dt", "val"])
df.select(unix_timestamp("dt")).show()

Zusätzliche Empfehlungen

SPRKPY1062

Meldung: Snowpark unterstützt GroupedData.pivot nicht ohne den Parameter „values“.

Kategorie: Warnung

Beschreibung

This issue appears when the SMA detects the usage of the pyspark.sql.group.GroupedData.pivot function without the „values“ parameter (the list of values to pivot on).

Im Moment erfordert die Snowpark Python Pivot-Funktion, dass Sie explizit die Liste der unterschiedlichen Werte angeben, an denen ein Pivot ausgeführt werden soll werden soll.

Szenarien

Szenario 1

Eingabe

The SMA detects an expression that matches the pattern dataFrame.groupBy("columnX").pivot("columnY") and the pivot does not have the values parameter.

df.groupBy("date").pivot("category").sum("amount")

Ausgabe

Die SMA fügt eine EWI Meldung hinzu, die darauf hinweist, dass die Pivot-Funktion ohne den Parameter „values“ nicht unterstützt wird.

Außerdem fügt es als zweiten Parameter der Pivot-Funktion eine Listenkomprimierung hinzu, das die Liste der Werte berechnet, die in Spalten umgewandelt werden sollen. Denken Sie daran, dass dieser Vorgang bei großen Datensätzen nicht effizient ist, und es ist ratsam, die Werte explizit anzugeben.

#EWI: SPRKPY1062 => pyspark.sql.group.GroupedData.pivot without parameter 'values' is not supported. See documentation for more info.
df.groupBy("date").pivot("category", [v[0] for v in df.select("category").distinct().limit(10000).collect()]]).sum("amount")

Empfohlene Korrektur

Für dieses Szenario fügt SMA als zweiten Parameter der Pivot-Funktion eine Listenkomprimierung hinzu, das die Liste der Werte berechnet, die in Spalten umgewandelt werden, aber Sie können auch eine Liste eindeutiger Werte zur Pivot-Ausführung angeben. Gehen Sie wie folgt vor:

df = spark.createDataFrame([
      Row(category="Client_ID", date=2012, amount=10000),
      Row(category="Client_name",   date=2012, amount=20000)
  ])

df.groupBy("date").pivot("category", ["dotNET", "Java"]).sum("amount")
Szenario 2

Eingabe

The SMA couldn’t detect an expression that matches the pattern dataFrame.groupBy("columnX").pivot("columnY") and the pivot does not have the values parameter.

df1.union(df2).groupBy("date").pivot("category").sum("amount")

Ausgabe

Die SMA fügt eine EWI Meldung hinzu, die darauf hinweist, dass die Pivot-Funktion ohne den Parameter „values“ nicht unterstützt wird.

#EWI: SPRKPY1062 => pyspark.sql.group.GroupedData.pivot without parameter 'values' is not supported. See documentation for more info.
df1.union(df2).groupBy("date").pivot("category").sum("amount")

Empfohlene Korrektur

Fügen Sie eine Liste eindeutiger Werte hinzu, nach denen ein Pivot ausgeführt werden soll. Gehen Sie wie folgt vor:

df = spark.createDataFrame([
      Row(course="dotNET", year=2012, earnings=10000),
      Row(course="Java",   year=2012, earnings=20000)
  ])

df.groupBy("year").pivot("course", ["dotNET", "Java"]).sum("earnings").show()

Zusätzliche Empfehlungen

  • Die Berechnung der Liste eindeutiger Werte für die Pivot-Funktion ist bei großen Datensätzen nicht effizient und kann zu einem blockierenden Aufruf werden. Bitte überlegen Sie, ob Sie die Liste der unterschiedlichen Werte, auf die Sie ein Pivot ausführen möchten, explizit angeben möchten.

  • Wenn Sie die Liste der eindeutigen Werte für die Pivot-Funktion nicht explizit angeben möchten (was nicht ratsam ist), können Sie den folgenden Code als zweites Argument der Pivot-Funktion hinzufügen, um die Werte zur Laufzeit zu ermitteln*

[v[0] for v in <df>.select(<column>).distinct().limit(<count>).collect()]]

****Replace*** :code:`<df>` with the corresponding DataFrame, with the column to pivot and with the number of rows to select.

SPRKPY1063

Meldung: pyspark.sql.pandas.functions.pandas_udf bietet eine Problemumgehung.

Kategorie: Warnung

Beschreibung

This issue appears when the tool detects the usage of pyspark.sql.pandas.functions.pandas_udf which has a workaround.

Szenario

Eingabe

Die Funktion pandas_udf wird verwendet, um eine benutzerdefinierte Funktion zu erstellen, die mit großen Datenmengen arbeitet.

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def modify_df(pdf):
    return pd.DataFrame({'result': pdf['col1'] + pdf['col2'] + 1})
df = spark.createDataFrame([(1, 2), (3, 4), (1, 1)], ["col1", "col2"])
new_df = df.groupby().apply(modify_df)

Ausgabe

Die SMA fügt eine EWI-Meldung hinzu, die darauf hinweist, dass pandas_udf über eine Problemumgehung verfügt.

#EWI: SPRKPY1062 => pyspark.sql.pandas.functions.pandas_udf has a workaround, see documentation for more info
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)

def modify_df(pdf):
    return pd.DataFrame({'result': pdf['col1'] + pdf['col2'] + 1})

df = spark.createDataFrame([(1, 2), (3, 4), (1, 1)], ["col1", "col2"])

new_df = df.groupby().apply(modify_df)

Empfohlene Korrektur

Specify explicitly the parameters types as a new parameter input_types, and remove functionType parameter if applies. Created function must be called inside a select statement.

@pandas_udf(
    return_type = schema,
    input_types = [PandasDataFrameType([IntegerType(), IntegerType()])]
)

def modify_df(pdf):
    return pd.DataFrame({'result': pdf['col1'] + pdf['col2'] + 1})

df = spark.createDataFrame([(1, 2), (3, 4), (1, 1)], ["col1", "col2"])

new_df = df.groupby().apply(modify_df) # You must modify function call to be a select and not an apply

Zusätzliche Empfehlungen

SPRKPY1064

Message: The *Spark element* does not apply since snowflake uses snowpipe mechanism instead.

Kategorie: Warnung

Beschreibung

Dieses Problem tritt auf, wenn das Tool die Verwendung eines Elements aus der pyspark.streaming-Bibliothek erkennt:

Szenario

Eingabe

Nachfolgend sehen Sie ein Beispiel mit einem der Elemente, die diesen EWI auslösen.

from pyspark.streaming.listener import StreamingListener

var = StreamingListener.Java
var.mro()

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])
df.show()

Ausgabe

The SMA adds the EWI SPRKPY1064 on the output code to let you know that this function does not apply.

#EWI: SPRKPY1064 => The element does not apply since snowflake uses snowpipe mechanism instead.

var = StreamingListener.Java
var.mro()

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])
df.show()

Empfohlene Korrektur

The SMA removes the import statement and adds the issue to the Issues.csv inventory, remove any usages of the Spark element.

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])
df.show()

Zusätzliche Empfehlungen

SPRKPY1065

Meldung: Der pyspark.context.SparkContext.broadcast trifft nicht zu, da Snowflake einen Mechanismus für das Clustering von Daten verwendet, um die Daten zu berechnen.

Kategorie: Warnung

Beschreibung

This issue appears when the tool detects the usage of element pyspark.context.SparkContext.broadcast, which is not necessary due to the use of data-clustering of Snowflake.

Eingabecode

In diesem Beispiel wird eine Broadcast-Variable erstellt. Diese Variablen ermöglichen eine effizientere gemeinsame Nutzung von Daten durch alle Knotenpunkte.

sc = SparkContext(conf=conf_spark)

mapping = {1: 10001, 2: 10002}

bc = sc.broadcast(mapping)

Ausgabecode

Die SMA fügt eine EWI-Meldung hinzu, die besagt, dass die Übertragung nicht erforderlich ist.

sc = conf_spark

mapping = {1: 10001, 2: 10002}
#EWI: SPRKPY1065 => The element does not apply since snowflake use data-clustering mechanism to compute the data.

bc = sc.broadcast(mapping)

Empfohlene Korrektur

Entfernen Sie alle Verwendungen von pyspark.context.SparkContext.broadcast.

sc = conf_spark

mapping = {1: 10001, 2: 10002}

Zusätzliche Empfehlungen

SPRKPY1066

Meldung: Das Spark-Element trifft nicht zu, da Snowflake automatisch einen Mechanismus zur Mikropartitionierung verwendet.

Kategorie: Warnung

Beschreibung

Dieses Problem tritt auf, wenn das Tool die Verwendung von Elementen in Verbindung mit Partitionen erkennt:

Those elements do not apply due the use of micro-partitions of Snowflake.

Eingabecode

In this example sortWithinPartitions it’s used to create a partition in a DataFrame sorted by the specified column.

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
df.sortWithinPartitions("age", ascending=False)

Ausgabecode

Die SMA fügt eine EWI-Meldung hinzu, die darauf hinweist, dass das Spark-Element nicht erforderlich ist.

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
#EWI: SPRKPY1066 => The element does not apply since snowflake use micro-partitioning mechanism are created automatically.
df.sortWithinPartitions("age", ascending=False)

Empfohlene Korrektur

Entfernen Sie die Verwendung des Elements.

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])

Zusätzliche Empfehlungen

SPRKPY1067

Meldung: Die Funktion pyspark.sql.functions.split hat Parameter, die in Snowpark nicht unterstützt werden.

Kategorie: Warnung

Beschreibung

This issue appears when the tool detects the usage of pyspark.sql.functions.split with more than two parameters or a regex pattern as a parameter; both cases are not supported.

Szenarien

Szenario 1

Eingabecode

In diesem Beispiel hat die Split-Funktion mehr als zwei Parameter.

df.select(split(columnName, ",", 5))

Ausgabecode

Das Tool fügt diese EWI in den Ausgabecode ein, um Sie darauf hinzuweisen, dass diese Funktion nicht unterstützt wird, wenn sie mehr als zwei Parameter hat.

#EWI: SPRKPY1067 => Snowpark does not support split functions with more than two parameters or containing regex pattern. See documentation for more info.
df.select(split(columnName, ",", 5))

Empfohlene Korrektur

Behalten Sie die Split-Funktion mit nur zwei Parametern bei.

df.select(split(columnName, ","))
Szenario 2

Eingabecode

In diesem Beispiel hat die Split-Funktion ein Regex-Muster als Parameter.

df.select(split(columnName, "^([\d]+-[\d]+-[\d])"))

Ausgabecode

Das Tool fügt diese EWI in den Ausgabecode ein, um Sie darauf hinzuweisen, dass diese Funktion nicht unterstützt wird, wenn sie ein Regex-Muster als Parameter hat.

#EWI: SPRKPY1067 => Snowpark does not support split functions with more than two parameters or containing regex pattern. See documentation for more info.
df.select(split(columnName, "^([\d]+-[\d]+-[\d])"))

Empfohlene Korrektur

The spark signature for this method functions.split(str: ColumnOrName, pattern: str, limit: int = - 1) not exactly match with the method in Snowpark functions.split(str: Union[Column, str], pattern: Union[Column, str]) so for now the scenario using regular expression does not have a recommended fix.

Zusätzliche Empfehlungen

SPRKPY1068

Meldung: toPandas enthält Spalten vom Typ ArrayType, der nicht unterstützt wird und für den es eine Problemumgehung gibt.

Kategorie: Warnung

Beschreibung

pyspark.sql.DataFrame.toPandas doesn’t work properly If there are columns of type ArrayType. The workaround for these cases is converting those columns into a Python Dictionary by using json.loads method.

Szenario

Eingabe

ToPandas gibt die Daten der ursprünglichen DataFrame als Pandas DataFrame zurück.

sparkDF = spark.createDataFrame([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0))
])

pandasDF = sparkDF.toPandas()

Ausgabe

Das Tool fügt diese EWI hinzu, um Sie darauf hinzuweisen, dass toPandas nicht unterstützt wird, wenn es Spalten des Typs ArrayType gibt, bietet aber eine Problemumgehung.

sparkDF = spark.createDataFrame([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0))
])
#EWI: SPRKPY1068 => toPandas doesn't work properly If there are columns of type ArrayType. The workaround for these cases is converting those columns into a Python Dictionary by using json.loads method. example: df[colName] = json.loads(df[colName]).
pandasDF = sparkDF.toPandas()

Empfohlene Korrektur

pandas_df = sparkDF.toPandas()

## check/convert all resulting fields from calling toPandas when they are of
## type ArrayType,
## they will be reasigned by converting them into a Python Dictionary
## using json.loads method​

for field in pandas_df.schema.fields:
    if isinstance(field.datatype, ArrayType):
        pandas_df[field.name] = pandas_df[field.name].apply(lambda x: json.loads(x) if x is not None else x)

Zusätzliche Empfehlungen

SPRKPY1069

Meldung: Wenn der partitionBy-Parameter eine Liste ist, wird Snowpark einen Fehler ausgeben.

Kategorie: Warnung

Beschreibung

When there is a usage of pyspark.sql.readwriter.DataFrameWriter.parquet method where it comes to the parameter partitionBy, the tool shows the EWI.

This is because in Snowpark the DataFrameWriter.parquet only supports a ColumnOrSqlExpr as a partitionBy parameter.

Szenarien

Szenario 1

Eingabecode:

In diesem Szenario ist der partitionBy-Parameter keine Liste.

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])

df.write.parquet(file_path, partitionBy="age")

Ausgabecode:

The tool adds the EWI SPRKPY1069 to let you know that Snowpark throws an error if parameter is a list.

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])

#EWI: SPRKPY1069 => If partitionBy parameter is a list, Snowpark will throw and error.
df.write.parquet(file_path, partition_by = "age", format_type_options = dict(compression = "None"))

Empfohlene Korrektur

There is not a recommended fix for this scenario because the tool always adds this EWI just in case the partitionBy parameter is a list. Remember that in Snowpark, only accepts cloud locations using a snowflake stage.

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])

stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
Session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {stage}').show()
Session.file.put(f"file:///path/to/data/file.parquet", f"@{stage}")

df.write.parquet(stage, partition_by = "age", format_type_options = dict(compression = "None"))
Szenario 2

Eingabecode:

In diesem Szenario ist der partitionBy-Parameter eine Liste.

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])

df.write.parquet(file_path, partitionBy=["age", "name"])

Ausgabecode:

The tool adds the EWI SPRKPY1069 to let you know that Snowpark throws an error if parameter is a list.

df = spark.createDataFrame([(25, "Alice", "150"), (30, "Bob", "350")], schema=["age", "name", "value"])

#EWI: SPRKPY1069 => If partitionBy parameter is a list, Snowpark will throw and error.
df.write.parquet(file_path, partition_by = ["age", "name"], format_type_options = dict(compression = "None"))

Empfohlene Korrektur

If the value of the parameter is a list, then replace it with a ColumnOrSqlExpr.

df.write.parquet(file_path, partition_by = sql_expr("age || name"), format_type_options = dict(compression = "None"))

Zusätzliche Empfehlungen

SPRKPY1070

Message: The mode argument is transformed to overwrite, check the variable value and set the corresponding bool value.

Kategorie: Warnung

Beschreibung

Wenn es eine Verwendung von gibt:

The tool analyzes the parameter mode to determinate if the value is overwrite.

Szenarien

Szenario 1

Eingabecode

Für dieses Szenario erkennt das Tool, dass der mode-Parameter den entsprechenden boolschen Wert setzen kann.

df.write.csv(file_path, mode="overwrite")

Ausgabecode:

The SMA tool analyzes the mode parameter, determinate that the value is overwrite and set the corresponding bool value

df.write.csv(file_path, format_type_options = dict(compression = "None"), overwrite = True)

Empfohlene Korrektur

Für dieses Szenario gibt es keine empfohlene Korrektur, da das Tool die entsprechende Transformation durchgeführt hat.

Szenario 2:

Eingabecode

In this scenario the tool can not validate the value is overwrite.

df.write.csv(file_path, mode=myVal)

Ausgabecode:

Die SMA fügt eine EWI-Meldung hinzu, die darauf hinweist, dass der mode-Parameter in ‚overwrite‘ umgewandelt wurde. Sie soll Sie aber auch darauf hinweisen, dass es besser ist, den Variablenwert zu überprüfen und den richtigen boolschen Wert zu setzen.

#EWI: SPRKPY1070 => The 'mode' argument is transformed to 'overwrite', check the variable value and set the corresponding bool value.
df.write.csv(file_path, format_type_options = dict(compression = "None"), overwrite = myVal)

Empfohlene Korrektur

Check for the value of the parameter mode and add the correct value for the parameter overwrite.

df.write.csv(file_path, format_type_options = dict(compression = "None"), overwrite = True)

Zusätzliche Empfehlungen

SPRKPY1071

Meldung: Die Funktion pyspark.rdd.RDD.getNumPartitions ist in Snowpark nicht erforderlich. Sie sollten also alle Referenzen entfernen.

Kategorie: Warnung

Beschreibung

This issue appears when the tool finds the use of the pyspark.rdd.RDD.getNumPartitions function. Snowflake uses micro-partitioning mechanism, so the use of this function is not required.

Szenario

Eingabe

Die getNumPartitions gibt die Anzahl der Partitionen auf einer RDD zurück.

df = spark.createDataFrame([('2015-04-08',), ('5',), [Row(a=1, b="b")]], ['dt', 'num', 'row'])

print(df.getNumPartitions())

Ausgabe

Das Tool fügt diese EWI hinzu, um Sie darauf hinzuweisen, dass getNumPartitions nicht erforderlich ist.

df = spark.createDataFrame([('2015-04-08',), ('5',), [Row(a=1, b="b")]], ['dt', 'num', 'row'])
#EWI: SPRKPY1071 => The getNumPartitions are not required in Snowpark. So, you should remove all references.

print(df.getNumPartitions())

Empfohlene Korrektur

Entfernen Sie alle Verwendungen dieser Funktion.

df = spark.createDataFrame([('2015-04-08',), ('5',), [Row(a=1, b="b")]], ['dt', 'num', 'row'])

Zusätzliche Empfehlungen

SPRKPY1072

Meldung: Die Verwendung von StorageLevel ist in Snowpark nicht erforderlich.

Kategorie: Warnung.

Beschreibung

This issue appears when the tool finds the use of the StorageLevel class, which works like „flags“ to set the storage level. Since Snowflake controls the storage, the use of this function is not required.

Zusätzliche Empfehlungen

SPRKPY1073

Meldung: pyspark.sql.functions.udf ohne Parameter oder Rückgabeparameter werden nicht unterstützt

Kategorie: Warnung.

Beschreibung

This issue appears when the tool detects the usage of pyspark.sql.functions.udf as function or decorator and is not supported in two specifics cases, when it has no parameters or return type parameter.

Szenarien

Szenario 1

Eingabe

In Pyspark können Sie eine benutzerdefinierte Funktion ohne Eingabe- oder Rückgabeparameter erstellen:

from pyspark.sql import SparkSession, DataFrameStatFunctions
from pyspark.sql.functions import col, udf

spark = SparkSession.builder.getOrCreate()
data = [['Q1', 'Test 1'],
        ['Q2', 'Test 2'],
        ['Q3', 'Test 1'],
        ['Q4', 'Test 1']]

columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)

my_udf = udf(lambda s: len(s))
df.withColumn('Len Value' ,my_udf(col('Value')) ).show()

Ausgabe

Snowpark benötigt die Eingabe- und Rückgabetypen für die Udf-Funktion. Denn sie werden nicht bereitgestellt, und SMA kann diese Parameter nicht bereitstellen.

from snowflake.snowpark import Session, DataFrameStatFunctions
from snowflake.snowpark.functions import col, udf

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 'Test 1'],
        ['Q2', 'Test 2'],
        ['Q3', 'Test 1'],
        ['Q4', 'Test 1']]

columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)
#EWI: SPRKPY1073 => pyspark.sql.functions.udf function without the return type parameter is not supported. See documentation for more info.
my_udf = udf(lambda s: len(s))

df.withColumn('Len Value' ,my_udf(col('Value')) ).show()

Empfohlene Korrektur

To fix this scenario is required to add the import for the returns types of the input and output, and then the parameters of return*type and input_types[] on the udf function _my_udf*.

from snowflake.snowpark import Session, DataFrameStatFunctions
from snowflake.snowpark.functions import col, udf
from snowflake.snowpark.types import IntegerType, StringType

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 'Test 1'],
        ['Q2', 'Test 2'],
        ['Q3', 'Test 1'],
        ['Q4', 'Test 1']]

columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)

my_udf = udf(lambda s: len(s), return_type=IntegerType(), input_types=[StringType()])

df.with_column("result", my_udf(df.Value)).show()
Szenario 2

In PySpark können Sie einen @udf-Decorator ohne Parameter verwenden

Eingabe

from pyspark.sql.functions import col, udf

spark = SparkSession.builder.getOrCreate()
data = [['Q1', 'Test 1'],
        ['Q2', 'Test 2'],
        ['Q3', 'Test 1'],
        ['Q4', 'Test 1']]

columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)

@udf()
def my_udf(str):
    return len(str)


df.withColumn('Len Value' ,my_udf(col('Value')) ).show()

Ausgabe

In Snowpark all the parameters of a udf decorator are required.

from snowflake.snowpark.functions import col, udf

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 'Test 1'],
        ['Q2', 'Test 2'],
        ['Q3', 'Test 1'],
        ['Q4', 'Test 1']]

columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)

#EWI: SPRKPY1073 => pyspark.sql.functions.udf decorator without parameters is not supported. See documentation for more info.

@udf()
def my_udf(str):
    return len(str)

df.withColumn('Len Value' ,my_udf(col('Value')) ).show()

Empfohlene Korrektur

To fix this scenario is required to add the import for the returns types of the input and output, and then the parameters of return_type and input_types[] on the udf @udf decorator.

from snowflake.snowpark.functions import col, udf
from snowflake.snowpark.types import IntegerType, StringType

spark = Session.builder.getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})
data = [['Q1', 'Test 1'],
        ['Q2', 'Test 2'],
        ['Q3', 'Test 1'],
        ['Q4', 'Test 1']]

columns = ['Quadrant', 'Value']
df = spark.createDataFrame(data, columns)

@udf(return_type=IntegerType(), input_types=[StringType()])
def my_udf(str):
    return len(str)

df.withColumn('Len Value' ,my_udf(col('Value')) ).show()

Zusätzliche Empfehlungen

SPRKPY1074

Meldung: Datei hat gemischte Einrückung (Leerzeichen und Tabulatoren).

Kategorie: Parsing-Fehler.

Beschreibung

Dieses Problem tritt auf, wenn das Tool erkennt, dass die Datei eine gemischte Einrückung hat. Das bedeutet, dass die Datei eine Kombination aus Leerzeichen und Tabulatoren enthält, um Codezeilen einzurücken.

Szenario

Eingabe

In Pyspark können Sie Leerzeichen und Tabulatoren für die Identifikationsebene mischen.

def foo():
    x = 5 # spaces
    y = 6 # tab

Ausgabe

SMA kann nicht mit gemischten Einrückungsmarkierungen umgehen. Wenn dies in einer Python-Code-Datei erkannt wird, fügt SMA die EWI SPRKPY1074 in der ersten Zeile hinzu.

## EWI: SPRKPY1074 => File has mixed indentation (spaces and tabs).
## This file was not converted, so it is expected to still have references to the Spark API
def foo():
    x = 5 # spaces
    y = 6 # tabs

Empfohlene Korrektur

Die Lösung besteht darin, alle Einrückungssymbole anzugleichen.

def foo():
  x = 5 # tab
  y = 6 # tab

Zusätzliche Empfehlungen

SPRKPY1075

Kategorie

Warnung.

Beschreibung

Parse_json wendet keine Schema-Validierung an. Wenn Sie auf der Grundlage eines Schemas filtern/validieren möchten, müssen Sie möglicherweise eine Logik einführen.

Beispiel

Eingabe

df.select(from_json(df.value, Schema))
df.select(from_json(schema=Schema, col=df.value))
df.select(from_json(df.value, Schema, option))

Ausgabe

#EWI: SPRKPY1075 => The parse_json does not apply schema validation, if you need to filter/validate based on schema you might need to introduce some logic.
df.select(parse_json(df.value))
#EWI: SPRKPY1075 => The parse_json does not apply schema validation, if you need to filter/validate based on schema you might need to introduce some logic.
df.select(parse_json(df.value))
#EWI: SPRKPY1075 => The parse_json does not apply schema validation, if you need to filter/validate based on schema you might need to introduce some logic.
df.select(parse_json(df.value))

Bei der Funktion from_json wird das Schema nicht wirklich zur Inferenz übergeben, sondern zur Validierung verwendet. Siehe diese Beispiele:

data = [
    ('{"name": "John", "age": 30, "city": "New York"}',),
    ('{"name": "Jane", "age": "25", "city": "San Francisco"}',)
]

df = spark.createDataFrame(data, ["json_str"])

Beispiel 1: Datentypen erzwingen und Spaltennamen ändern:

## Parse JSON column with schema
parsed_df = df.withColumn("parsed_json", from_json(col("json_str"), schema))

parsed_df.show(truncate=False)

## +------------------------------------------------------+---------------------------+
## |json_str                                              |parsed_json                |
## +------------------------------------------------------+---------------------------+
## |{"name": "John", "age": 30, "city": "New York"}       |{John, 30, New York}       |
## |{"name": "Jane", "age": "25", "city": "San Francisco"}|{Jane, null, San Francisco}|
## +------------------------------------------------------+---------------------------+
## notice that values outside of the schema were dropped and columns not matched are returned as null

Beispiel 2: Bestimmte Spalten auswählen:

## Define a schema with only the columns we want to use
partial_schema = StructType([
    StructField("name", StringType(), True),
    StructField("city", StringType(), True)
])

## Parse JSON column with partial schema
partial_df = df.withColumn("parsed_json", from_json(col("json_str"), partial_schema))

partial_df.show(truncate=False)

## +------------------------------------------------------+---------------------+
## |json_str                                              |parsed_json          |
## +------------------------------------------------------+---------------------+
## |{"name": "John", "age": 30, "city": "New York"}       |{John, New York}     |
## |{"name": "Jane", "age": "25", "city": "San Francisco"}|{Jane, San Francisco}|
## +------------------------------------------------------+---------------------+
## there is also an automatic filtering

Empfehlungen

  • For more support, you can email us at sma-support@snowflake.com. If you have a contract for support with Snowflake, reach out to your sales engineer and they can direct your support needs.

  • Useful tools PEP-8 and Reindent.

SPRKPY1076

Message: Parameters in pyspark.sql.readwriter.DataFrameReader methods are not supported. This applies to CSV, JSON and PARQUET methods.

Kategorie: Warnung.

Beschreibung

For the CSV, JSON and PARQUET methods on the pyspark.sql.readwriter.DataFrameReader object, the tool will analyze the parameters and add a transformation according to each case:

  • Alle Parameter entsprechen ihrem äquivalenten Namen in Snowpark: In diesem Fall wandelt das Tool den Parameter in einen .option()-Aufruf um. In diesem Fall fügt der Parameter diese EWI nicht hinzu.

  • Einige Parameter stimmen nicht mit der Äquivalenz in Snowpark überein: In diesem Fall fügt das Tool diese EWI mit den Parameterinformationen hinzu und entfernt sie aus dem Methodenaufruf.

Liste der Äquivalenzen:

  • Äquivalenzen für CSV:

Spark-Tasten

Snowpark-Äquivalenzen

sep

FIELD_DELIMITER

Header

PARSE_HEADER

lineSep

RECORD_DELIMITER

pathGlobFilter

PATTERN

Anführungszeichen

FIELD_OPTIONALLY_ENCLOSED_BY

nullValue

NULL_IF

dateFormat

DATE_FORMAT

timestampFormat

TIMESTAMP_FORMAT

inferSchema

INFER_SCHEMA

Trennzeichen

FIELD_DELIMITER

  • Äquivalenzen für JSON:

Spark-Tasten

Snowpark-Äquivalenzen

dateFormat

DATE_FORMAT

timestampFormat

TIMESTAMP_FORMAT

pathGlobFilter

PATTERN

  • Äquivalenzen für PARQUET:

Spark-Tasten

Snowpark-Äquivalenzen

pathGlobFilter

PATTERN

Szenarien

Szenario 1

Eingabe

Unter CVS finden Sie einige Beispiele:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('myapp').getOrCreate()

spark.read.csv("path3", None,None,None,None,None,None,True).show()

Ausgabe

Im konvertierten Code werden die Parameter als einzelne Optionen zur cvs-Funktion hinzugefügt

from snowflake.snowpark import Session

spark = Session.builder.app_name('myapp', True).getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})

#EWI: SPRKPY1076 => Some of the included parameters are not supported in the csv function, the supported ones will be added into a option method.
spark.read.option("FIELD_DELIMITER", None).option("PARSE_HEADER", True).option("FIELD_OPTIONALLY_ENCLOSED_BY", None).csv("path3").show()

Szenario 2

Eingabe

Unter JSON finden Sie einige Beispiele:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('myapp').getOrCreate()
spark.read.json("/myPath/jsonFile/", dateFormat='YYYY/MM/DD').show()

Ausgabe

Im konvertierten Code werden die Parameter als einzelne Optionen zur json-Funktion hinzugefügt

from snowflake.snowpark import Session
spark = Session.builder.app_name('myapp', True).getOrCreate()
#EWI: SPRKPY1076 => Some of the included parameters are not supported in the json function, the supported ones will be added into a option method.

spark.read.option("DATE_FORMAT", 'YYYY/MM/DD').json("/myPath/jsonFile/").show()
Szenario 3

Eingabe

Unter PARQUET finden Sie einige Beispiele:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('myapp').getOrCreate()

spark.read.parquet("/path/to/my/file.parquet", pathGlobFilter="*.parquet").show()

Ausgabe

Im konvertierten Code werden die Parameter als individuelle Optionen zur Parkettfunktion hinzugefügt

from snowflake.snowpark import Session

spark = Session.builder.app_name('myapp', True).getOrCreate()
spark.update_query_tag({"origin":"sf_sit","name":"sma","version":{"major":0,"minor":0,"patch":0},"attributes":{"language":"Python"}})

#EWI: SPRKPY1076 => Some of the included parameters are not supported in the parquet function, the supported ones will be added into a option method.
#EWI: SPRKPY1029 => The parquet function require adjustments, in Snowpark the parquet files needs to be located in an stage. See the documentation for more info.

spark.read.option("PATTERN", "*.parquet").parquet("/path/to/my/file.parquet")

Zusätzliche Empfehlungen

SPRKPY1077

Meldung: SQL eingebetteter Code kann nicht verarbeitet werden.

Kategorie: Warnung.

Beschreibung

Dieses Problem tritt auf, wenn das Tool einen SQL-eingebetteten Code erkennt, der nicht in Snowpark konvertiert werden kann.

Weitere Informationen finden Sie im Abschnitt „SQL-eingebetteter Code“.

Szenario

Eingabe

In diesem Beispiel ist der SQL-Code in eine Variable namens „Abfrage“ (query) eingebettet, die als Parameter für die Pyspark.sql-Methode verwendet wird.

query = f"SELECT * from myTable"
spark.sql(query)

Ausgabe

SMA erkennt, dass es sich bei dem Parameter PySpark.sql um eine Variable und nicht um einen SQL-Code handelt, so dass die Meldung EWI SPRKPY1077 zur Zeile PySpark.sql hinzugefügt wird.

query = f"SELECT * myTable"
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.
spark.sql(query)

Zusätzliche Empfehlungen

  • Für die Transformation von SQL muss dieser Code direkt als Parameter der Methode nur als Zeichenfolgewerte und ohne Interpolation enthalten sein. Bitte prüfen Sie die SQL, die an PySpark.SQL gesendet wird, um die Funktionalität auf Snowflake zu validieren.

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1078

Meldung: Das Argument der Funktion pyspark.context.SparkContext.setLogLevel ist kein Literalwert und konnte daher nicht ausgewertet werden

Kategorie: Warnung

Beschreibung

This issue appears when the SMA detects the use of the pyspark.context.SparkContext.setLogLevel function with an argument that is not a literal value, for example, when the argument is a variable.

SMA führt eine statische Analyse Ihres Quellcodes durch. Daher ist es nicht möglich, den Inhalt dieses Arguments auszuwerten und eine Äquivalenz in Snowpark zu ermitteln.

Szenario

Eingabe

In diesem Beispiel wird der LogLevel in der Variablen my_log_level definiert und my_log_level als Parameter für die setLogLevel-Methode verwendet.

my_log_level = "WARN"
sparkSession.sparkContext.setLogLevel(my_log_level)

Ausgabe

SMA ist nicht in der Lage, das Argument für den Protokolliergrad-Parameter auszuwerten, daher wird die EWI SPRKPY1078 über der Zeile der transformierten Protokollierung hinzugefügt:

my_log_level = "WARN"
#EWI: SPRKPY1078 => my_log_level is not a literal value and therefore could not be evaluated. Make sure the value of my_log_level is a valid level in Snowpark. Valid log levels are: logging.CRITICAL, logging.DEBUG, logging.ERROR, logging.INFO, logging.NOTSET, logging.WARNING
logging.basicConfig(stream = sys.stdout, level = my_log_level)

Empfohlene Korrektur

Even though the SMA was unable to evaluate the argument, it will transform the pyspark.context.SparkContext.setLogLevel function into the Snowpark equivalent. Please make sure the value of the level argument in the generated output code is a valid and equivalent log level in Snowpark according to the table below:

PySpark-Protokolliergrad

Snowpark-Protokolliergrad äquivalent

ALL

Protokollieren.NOTSET

DEBUG

Protokollieren.DEBUG

ERROR

Protokollieren.ERROR

FATAL

Protokollieren.CRITICAL

INFO

Protokollieren.INFO

OFF

Protokollieren.WARNING

TRACE

Protokollieren.NOTSET

WARN

Protokollieren.WARNING

Die empfohlene Lösung sieht also so aus:

my_log_level = logging.WARNING
logging.basicConfig(stream = sys.stdout, level = my_log_level)

Zusätzliche Empfehlungen

SPRKPY1079

Meldung: Das Argument der Funktion pyspark.context.SparkContext.setLogLevel ist kein gültiger PySpark-Protokolliergrad

Kategorie: Warnung

Beschreibung

This issue appears when the SMA detects the use of the pyspark.context.SparkContext.setLogLevel function with an argument that is not a valid log level in PySpark, and therefore an equivalent could not be determined in Snowpark.

Szenario

Eingabe

hier verwendet der Protokolliergrad „INVALID_LOG_LEVEL “, der kein gültiger Pyspark-Protokolliergrad ist.

sparkSession.sparkContext.setLogLevel("INVALID_LOG_LEVEL")

Ausgabe

SMA den Protokolliergrad „INVALID_LOG_LEVEL“ nicht erkennen kann, obwohl SMA die Konvertierung vornimmt, wird EWI SPRKPY1079 hinzugefügt, um ein mögliches Problem anzuzeigen.

#EWI: SPRKPY1079 => INVALID_LOG_LEVEL is not a valid PySpark log level, therefore an equivalent could not be determined in Snowpark. Valid PySpark log levels are: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
logging.basicConfig(stream = sys.stdout, level = logging.INVALID_LOG_LEVEL)

Empfohlene Korrektur

Make sure that the log level used in the pyspark.context.SparkContext.setLogLevel function is a valid log level in PySpark or in Snowpark and try again.

logging.basicConfig(stream = sys.stdout, level = logging.DEBUG)

Zusätzliche Empfehlungen

SPRKPY1081

This issue code has been deprecated since Spark Conversion Core 4.12.0

Meldung: pyspark.sql.readwriter.DataFrameWriter.partitionBy hat eine Problemumgehung.

Kategorie: Warnung

Beschreibung

The Pyspark.sql.readwriter.DataFrameWriter.partitionBy function is not supported. The workaround is to use Snowpark’s copy_into_location instead. See the documentation for more info.

Szenario

Eingabe

This code will create a separate directories for each unique value in the FIRST_NAME column. The data is the same, but it’s going to be stored in different directories based on the column.

df = session.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]], schema = ["FIRST_NAME", "LAST_NAME"])
df.write.partitionBy("FIRST_NAME").csv("/home/data")

This code will create a separate directories for each unique value in the FIRST_NAME column. The data is the same, but it’s going to be stored in different directories based on the column.

Ausgabecode

df = session.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]], schema = ["FIRST_NAME", "LAST_NAME"])
#EWI: SPRKPY1081 => The partitionBy function is not supported, but you can instead use copy_into_location as workaround. See the documentation for more info.
df.write.partitionBy("FIRST_NAME").csv("/home/data", format_type_options = dict(compression = "None"))

Empfohlene Korrektur

In Snowpark, copy_into_location has a partition_by parameter that you can use instead of the partitionBy function, but it’s going to require some manual adjustments, as shown in the following example:

Spark-Code:

df = session.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]], schema = ["FIRST_NAME", "LAST_NAME"])
df.write.partitionBy("FIRST_NAME").csv("/home/data")

Snowpark-Code manuell angepasst:

df = session.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]], schema = ["FIRST_NAME", "LAST_NAME"])
df.write.copy_into_location(location=temp_stage, partition_by=col("FIRST_NAME"), file_format_type="csv", format_type_options={"COMPRESSION": "NONE"}, header=True)

copy_into_location hat die folgenden Parameter

  • location: The Snowpark location only accepts cloud locations using an snowflake stage.

  • _partition_by_: Es kann ein Spaltenname oder ein SQL-Ausdruck sein. Sie müssen also in eine Spalte oder SQL konvertieren, indem Sie col oder sql_expr verwenden.

Zusätzliche Empfehlungen

SPRKPY1082

Meldung: Die pyspark.sql.readwriter.DataFrameReader.load-Funktion wird nicht unterstützt. Eine Problemumgehung besteht darin, stattdessen die formatabhängige Snowpark DataFrameReader-Methode zu verwenden (avro csv, json, orc, parquet). Der Pfadparameter sollte ein Stagingbereich sein.

Kategorie: Warnung

Beschreibung

The pyspark.sql.readwriter.DataFrameReader.load function is not supported. The workaround is to use Snowpark DataFrameReader methods instead.

Szenarien

The spark signature for this method DataFrameReader.load(path, format, schema, **options) does not exist in Snowpark. Therefore, any usage of the load function is going to have an EWI in the output code.

Szenario 1

Eingabe

Below is an example that tries to load data from a CSV source.

path_csv_file = "/path/to/file.csv"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])

my_session.read.load(path_csv_file, "csv").show()
my_session.read.load(path_csv_file, "csv", schema=schemaParam).show()
my_session.read.load(path_csv_file, "csv", schema=schemaParam, lineSep="\r\n", dateFormat="YYYY/MM/DD").show()

Ausgabe

The SMA adds the EWI SPRKPY1082 to let you know that this function is not supported by Snowpark, but it has a workaround.

path_csv_file = "/path/to/file.csv"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.

my_session.read.load(path_csv_file, "csv").show()
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_csv_file, "csv", schema=schemaParam).show()
#EWI: The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_csv_file, "csv", schema=schemaParam, lineSep="\r\n", dateFormat="YYYY/MM/DD").show()

Empfohlene Korrektur

As a workaround, you can use Snowpark DataFrameReader methods instead.

  • Fixing path and format parameters:

    • Replace the load method with csv method.

    • The first parameter path must be in a stage to make an equivalence with Snowpark.

Below is an example that creates a temporal stage and puts the file into it, then calls the CSV method.

path_csv_file = "/path/to/file.csv"

## Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.csv", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.csv"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])

my_session.read.csv(stage_file_path).show()
  • Fixing schema parameter:

    • The schema can be set by using the schema function as follows:

schemaParam = StructType([
        StructField("name", StringType(), True),
        StructField("city", StringType(), True)
    ])

df = my_session.read.schema(schemaParam).csv(temp_stage)
  • Fixing options parameter:

The options between spark and snowpark are not the same, in this case lineSep and dateFormat are replaced with RECORD_DELIMITER and DATE_FORMAT, the Additional recommendations section has a table with all the Equivalences.

Below is an example that creates a dictionary with RECORD_DELIMITER and DATE_FORMAT, and calls the options method with that dictionary.

optionsParam = {"RECORD_DELIMITER": "\r\n", "DATE_FORMAT": "YYYY/MM/DD"}
df = my_session.read.options(optionsParam).csv(stage)

Szenario 2

Eingabe

Below is an example that tries to load data from a JSON source.

path_json_file = "/path/to/file.json"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])

my_session.read.load(path_json_file, "json").show()
my_session.read.load(path_json_file, "json", schema=schemaParam).show()
my_session.read.load(path_json_file, "json", schema=schemaParam, dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3").show()

Ausgabe

The SMA adds the EWI SPRKPY1082 to let you know that this function is not supported by Snowpark, but it has a workaround.

path_json_file = "/path/to/file.json"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.

my_session.read.load(path_json_file, "json").show()
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_json_file, "json", schema=schemaParam).show()
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_json_file, "json", schema=schemaParam, dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3").show()

Empfohlene Korrektur

As a workaround, you can use Snowpark DataFrameReader methods instead.

  • Fixing path and format parameters:

    • Replace the load method with json method

    • The first parameter path must be in a stage to make an equivalence with Snowpark.

Below is an example that creates a temporal stage and puts the file into it, then calls the JSON method.

path_json_file = "/path/to/file.json"

## Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.json", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.json"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])

my_session.read.json(stage_file_path).show()
  • Fixing schema parameter:

    • The schema can be set by using the schema function as follows:

schemaParam = StructType([
        StructField("name", StringType(), True),
        StructField("city", StringType(), True)
    ])

df = my_session.read.schema(schemaParam).json(temp_stage)
  • Fixing options parameter:

The options between Spark and snowpark are not the same, in this case dateFormat and timestampFormat are replaced with DATE_FORMAT and TIMESTAMP_FORMAT, the Additional recommendations section has a table with all the Equivalences.

Below is an example that creates a dictionary with DATE_FORMAT and TIMESTAMP_FORMAT, and calls the options method with that dictionary.

optionsParam = {"DATE_FORMAT": "YYYY/MM/DD", "TIMESTAMP_FORMAT": "YYYY-MM-DD HH24:MI:SS.FF3"}
df = Session.read.options(optionsParam).json(stage)

Szenario 3

Eingabe

Below is an example that tries to load data from a PARQUET source.

path_parquet_file = "/path/to/file.parquet"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])

my_session.read.load(path_parquet_file, "parquet").show()
my_session.read.load(path_parquet_file, "parquet", schema=schemaParam).show()
my_session.read.load(path_parquet_file, "parquet", schema=schemaParam, pathGlobFilter="*.parquet").show()

Ausgabe

The SMA adds the EWI SPRKPY1082 to let you know that this function is not supported by Snowpark, but it has a workaround.

path_parquet_file = "/path/to/file.parquet"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.

my_session.read.load(path_parquet_file, "parquet").show()
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_parquet_file, "parquet", schema=schemaParam).show()
#EWI: SPRKPY1082 => The pyspark.sql.readwriter.DataFrameReader.load function is not supported. A workaround is to use Snowpark DataFrameReader format specific method instead (avro csv, json, orc, parquet). The path parameter should be a stage location.
my_session.read.load(path_parquet_file, "parquet", schema=schemaParam, pathGlobFilter="*.parquet").show()

Empfohlene Korrektur

As a workaround, you can use Snowpark DataFrameReader methods instead.

  • Fixing path and format parameters:

    • Replace the load method with parquet method

    • The first parameter path must be in a stage to make an equivalence with Snowpark.

Below is an example that creates a temporal stage and puts the file into it, then calls the PARQUET method.

path_parquet_file = "/path/to/file.parquet"

## Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.parquet", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.parquet"

schemaParam = StructType([
        StructField("Name", StringType(), True),
        StructField("Superhero", StringType(), True)
    ])

my_session.read.parquet(stage_file_path).show()
  • Fixing schema parameter:

    • The schema can be set by using the schema function as follows:

schemaParam = StructType([
        StructField("name", StringType(), True),
        StructField("city", StringType(), True)
    ])

df = my_session.read.schema(schemaParam).parquet(temp_stage)
  • Fixing options parameter:

The options between Spark and snowpark are not the same, in this case pathGlobFilter is replaced with PATTERN, the Additional recommendations section has a table with all the Equivalences.

Below is an example that creates a dictionary with PATTERN, and calls the options method with that dictionary.

optionsParam = {"PATTERN": "*.parquet"}
df = Session.read.options(optionsParam).parquet(stage)

Zusätzliche Empfehlungen

  • Berücksichtigen Sie, dass die Optionen zwischen Spark und Snowpark nicht die gleichen sind, aber sie können zugeordnet werden:

Spark-Optionen

Möglicher Wert

Snowpark-Äquivalent

Beschreibung

Header

„True“ oder „False“

SKIP_HEADER = 1 / SKIP_HEADER = 0

Um die erste Zeile einer Datei als Spaltennamen zu verwenden.

Trennzeichen

Beliebiges ein-/mehrstelliges Feldtrennzeichen

FIELD_DELIMITER

Um einzelne/mehrere Zeichen als Trennzeichen für jede Spalte/jedes Feld festzulegen.

sep

Ein beliebiges einstelliges Feldtrennzeichen

FIELD_DELIMITER

Um ein einzelnes Zeichen als Trennzeichen für jede Spalte/jedes Feld anzugeben.

Codierung

UTF-8, UTF-16 usw…

ENCODING

Zum Decodieren der CSV-Dateien nach dem angegebenen Codierungstyp. Die Standardcodierung ist UTF-8

lineSep

Ein beliebiges einzelnes Zeilentrennzeichen

RECORD_DELIMITER

Um das Zeilentrennzeichen zu definieren, das für das Parsen von Dateien verwendet werden soll.

pathGlobFilter

Dateimuster

PATTERN

So definieren Sie ein Muster, um nur Dateien zu lesen, deren Dateinamen dem Muster entsprechen.

recursiveFileLookup

„True“ oder „False“

N/A

Zum rekursiven Durchsuchen eines Verzeichnisses, um Dateien zu lesen. Der Standardwert für diese Option ist „False“.

Anführungszeichen

Einzelnes Zeichen, das in Anführungszeichen gesetzt werden soll

FIELD_OPTIONALLY_ENCLOSED_BY

Um Felder/Spalte, die Felder enthalten, in Anführungszeichen zu setzen, und das Trennzeichen dabei Teil des Wertes sein kann. Dieses Zeichen „To quote all fields“, wenn es mit quoteAll-Option verwendet wird. Der Standardwert dieser Option ist das doppelte Anführungszeichen(„).

nullValue

Zeichenfolge zum Ersetzen von „null“

NULL_IF

Ersetzen von Nullwerten durch die Zeichenfolge beim Lesen und Schreiben von Datenframe.

dateFormat

Gültiges Datumsformat

DATE_FORMAT

Um eine Zeichenfolge zu definieren, die ein Datumsformat angibt. Das Standardformat ist jjjj-MM-tt.

timestampFormat

Gültiges Format des Zeitstempels

TIMESTAMP_FORMAT

Zur Definition einer Zeichenfolge, die ein Zeitstempelformat angibt. Das Standardformat ist jjjj-MM-tt ‚T’HH: mm:ss.

Escape-Zeichen

Jedes einzelne Zeichen

ESCAPE

So legen Sie ein einzelnes Zeichen als Escape-Zeichen fest, um das Standard-Escape-Zeichen (\) zu überschreiben.

inferSchema

„True“ oder „False“

INFER_SCHEMA

Erkennt automatisch das Dateischema

mergeSchema

„True“ oder „False“

N/A

Wird in Snowflake nicht benötigt, da dies immer dann geschieht, wenn das infer_schema die Parquet-Dateistruktur bestimmt

  • For modifiedBefore / modifiedAfter option you can achieve the same result in Snowflake by using the metadata columns and then adding a filter like: df.filter(METADATA_FILE_LAST_MODIFIED > ‘some_date’).

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1083

Meldung: Die pyspark.sql.readwriter.DataFrameWriter.save-Funktion wird nicht unterstützt. Eine Problemumgehung besteht darin, stattdessen die Snowpark DataFrameWriter copy_into_location-Methode zu verwenden.

Kategorie: Warnung

Beschreibung

The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. The workaround is to use Snowpark DataFrameWriter methods instead.

Szenarien

The spark signature for this method DataFrameWriter.save(path, format, mode, partitionBy, **options) does not exists in Snowpark. Therefore, any usage of the load function it’s going to have an EWI in the output code.

Szenario 1

Eingabecode

Below is an example that tries to save data with CSV format.

path_csv_file = "/path/to/file.csv"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = my_session.createDataFrame(data, schema=["Name", "Age", "City"])

df.write.save(path_csv_file, format="csv")
df.write.save(path_csv_file, format="csv", mode="overwrite")
df.write.save(path_csv_file, format="csv", mode="overwrite", lineSep="\r\n", dateFormat="YYYY/MM/DD")
df.write.save(path_csv_file, format="csv", mode="overwrite", partitionBy="City", lineSep="\r\n", dateFormat="YYYY/MM/DD")

Ausgabecode

The tool adds this EWI SPRKPY1083 on the output code to let you know that this function is not supported by Snowpark, but it has a workaround.

path_csv_file = "/path/to/file.csv"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = my_session.createDataFrame(data, schema=["Name", "Age", "City"])

#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_csv_file, format="csv")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_csv_file, format="csv", mode="overwrite")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_csv_file, format="csv", mode="overwrite", lineSep="\r\n", dateFormat="YYYY/MM/DD")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_csv_file, format="csv", mode="overwrite", partitionBy="City", lineSep="\r\n", dateFormat="YYYY/MM/DD")

Empfohlene Korrektur

As a workaround you can use Snowpark DataFrameWriter methods instead.

  • Fixing path and format parameters:

    • Replace the load method with csv or copy_into_location method.

    • If you are using copy_into_location method, you need to specify the format with the file_format_type parameter.

    • The first parameter path must be in a stage to make an equivalence with Snowpark.

Nachfolgend finden Sie ein Beispiel, in dem ein zeitlicher Stagingbereich erstellt und die Datei darin abgelegt wird. Anschließend wird eine der oben genannten Methoden aufgerufen.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.csv", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.csv"

## Using csv method
df.write.csv(stage_file_path)

## Using copy_into_location method
df.write.copy_into_location(stage_file_path, file_format_type="csv")

Below is an example that adds into the daisy chain the mode method with overwrite as a parameter.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Using csv method
df.write.mode("overwrite").csv(temp_stage)

## Using copy_into_location method
df.write.mode("overwrite").copy_into_location(temp_stage, file_format_type="csv")
  • Fixing partitionBy parameter:

    • Use the partition_by parameter from the CSV method, as follows:

Below is an example that used the partition_by parameter from the CSV method.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Using csv method
df.write.csv(temp_stage, partition_by="City")

## Using copy_into_location method
df.write.copy_into_location(temp_stage, file_format_type="csv", partition_by="City")
  • Fixing options parameter:

The options between spark and snowpark are not the same, in this case lineSep and dateFormat are replaced with RECORD_DELIMITER and DATE_FORMAT, the Additional recommendations section has table with all the Equivalences.

Below is an example that creates a dictionary with RECORD_DELIMITER and DATE_FORMAT, and calls the options method with that dictionary.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
optionsParam = {"RECORD_DELIMITER": "\r\n", "DATE_FORMAT": "YYYY/MM/DD"}

## Using csv method
df.write.csv(stage, format_type_options=optionsParam)

## Using copy_into_location method
df.write.csv(stage, file_format_type="csv", format_type_options=optionsParam)

Szenario 2

Eingabecode

Below is an example that tries to save data with JSON format.

path_json_file = "/path/to/file.json"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

df.write.save(path_json_file, format="json")
df.write.save(path_json_file, format="json", mode="overwrite")
df.write.save(path_json_file, format="json", mode="overwrite", dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3")
df.write.save(path_json_file, format="json", mode="overwrite", partitionBy="City", dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3")

Ausgabecode

The tool adds this EWI SPRKPY1083 on the output code to let you know that this function is not supported by Snowpark, but it has a workaround.

path_json_file = "/path/to/file.json"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_json_file, format="json")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_json_file, format="json", mode="overwrite")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_json_file, format="json", mode="overwrite", dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_json_file, format="json", mode="overwrite", partitionBy="City", dateFormat="YYYY/MM/DD", timestampFormat="YYYY-MM-DD HH24:MI:SS.FF3")

Empfohlene Korrektur

As a workaround you can use Snowpark DataFrameReader methods instead.

  • Fixing path and format parameters:

    • Replace the load method with json or copy_into_location method

    • If you are using copy_into_location method, you need to specify the format with the file_format_type parameter.

    • The first parameter path must be in a stage to make an equivalence with Snowpark.

Nachfolgend finden Sie ein Beispiel, in dem ein zeitlicher Stagingbereich erstellt und die Datei darin abgelegt wird. Anschließend wird eine der oben genannten Methoden aufgerufen.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.json", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.json"

## Using json method
df.write.json(stage_file_path)

## Using copy_into_location method
df.write.copy_into_location(stage_file_path, file_format_type="json")

Below is an example that adds into the daisy chain the mode method with overwrite as a parameter.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Using json method
df.write.mode("overwrite").json(temp_stage)

## Using copy_into_location method
df.write.mode("overwrite").copy_into_location(temp_stage, file_format_type="json")
  • Fixing partitionBy parameter:

    • Use the partition_by parameter from the CSV method, as follows:

Below is an example that used the partition_by parameter from the CSV method.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Using json method
df.write.json(temp_stage, partition_by="City")

## Using copy_into_location method
df.write.copy_into_location(temp_stage, file_format_type="json", partition_by="City")
  • Fixing options parameter:

The options between spark and snowpark are not the same, in this case dateFormat and timestampFormat are replaced with DATE_FORMAT and TIMESTAMP_FORMAT, the Additional recommendations section has table with all the Equivalences.

Below is an example that creates a dictionary with DATE_FORMAT and TIMESTAMP_FORMAT, and calls the options method with that dictionary.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
optionsParam = {"DATE_FORMAT": "YYYY/MM/DD", "TIMESTAMP_FORMAT": "YYYY-MM-DD HH24:MI:SS.FF3"}

## Using json method
df.write.json(stage, format_type_options=optionsParam)

## Using copy_into_location method
df.write.copy_into_location(stage, file_format_type="json", format_type_options=optionsParam)

Szenario 3

Eingabecode

Below is an example that tries to save data with PARQUET format.

path_parquet_file = "/path/to/file.parquet"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

df.write.save(path_parquet_file, format="parquet")
df.write.save(path_parquet_file, format="parquet", mode="overwrite")
df.write.save(path_parquet_file, format="parquet", mode="overwrite", pathGlobFilter="*.parquet")
df.write.save(path_parquet_file, format="parquet", mode="overwrite", partitionBy="City", pathGlobFilter="*.parquet")

Ausgabecode

The tool adds this EWI SPRKPY1083 on the output code to let you know that this function is not supported by Snowpark, but it has a workaround.

path_parquet_file = "/path/to/file.parquet"

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_parquet_file, format="parquet")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_parquet_file, format="parquet", mode="overwrite")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_parquet_file, format="parquet", mode="overwrite", pathGlobFilter="*.parquet")
#EWI: SPRKPY1083 => The pyspark.sql.readwriter.DataFrameWriter.save function is not supported. A workaround is to use Snowpark DataFrameWriter copy_into_location method instead.
df.write.save(path_parquet_file, format="parquet", mode="overwrite", partitionBy="City", pathGlobFilter="*.parquet")

Empfohlene Korrektur

As a workaround you can use Snowpark DataFrameReader methods instead.

  • Fixing path and format parameters:

    • Replace the load method with parquet or copy_into_location method.

    • If you are using copy_into_location method, you need to specify the format with the file_format_type parameter.

    • The first parameter path must be in a stage to make an equivalence with Snowpark.

Nachfolgend finden Sie ein Beispiel, in dem ein zeitlicher Stagingbereich erstellt und die Datei darin abgelegt wird. Anschließend wird eine der oben genannten Methoden aufgerufen.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Stage creation

temp_stage = f'{Session.get_fully_qualified_current_schema()}.{_generate_prefix("TEMP_STAGE")}'
my_session.sql(f'CREATE TEMPORARY STAGE IF NOT EXISTS {temp_stage}').show()
my_session.file.put(f"file:///path/to/file.parquet", f"@{temp_stage}")
stage_file_path = f"{temp_stage}file.parquet"

## Using parquet method
df.write.parquet(stage_file_path)

## Using copy_into_location method
df.write.copy_into_location(stage, file_format_type="parquet")

Below is an example that adds into the daisy chain the mode method with overwrite as a parameter.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Using parquet method
df.write.mode("overwrite").parquet(temp_stage)

## Using copy_into_location method
df.write.mode("overwrite").copy_into_location(stage, file_format_type="parquet")
  • Fixing partitionBy parameter:

    • Use the partition_by parameter from the CSV method, as follows:

Below is an example that used the partition_by parameter from the parquet method.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

## Using parquet method
df.write.parquet(temp_stage, partition_by="City")

## Using copy_into_location method
df.write.copy_into_location(stage, file_format_type="parquet", partition_by="City")
  • Fixing options parameter:

The options between spark and snowpark are not the same, in this case pathGlobFilter is replaced with PATTERN, the Additional recommendations section has table with all the Equivalences.

Below is an example that creates a dictionary with PATTERN, and calls the options method with that dictionary.

data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]
df = spark.createDataFrame(data, schema=["Name", "Age", "City"])
optionsParam = {"PATTERN": "*.parquet"}

## Using parquet method
df.write.parquet(stage, format_type_options=optionsParam)

## Using copy_into_location method
df.write.copy_into_location(stage, file_format_type="parquet", format_type_options=optionsParam)

Zusätzliche Empfehlungen

  • Beachten Sie, dass die Optionen zwischen Spark und Snowpark nicht die gleichen sind, aber sie können zugeordnet werden:

Spark-Optionen

Möglicher Wert

Snowpark-Äquivalent

Beschreibung

Header

„True“ oder „False“

SKIP_HEADER = 1 / SKIP_HEADER = 0

Um die erste Zeile einer Datei als Spaltennamen zu verwenden.

Trennzeichen

Beliebiges ein-/mehrstelliges Feldtrennzeichen

FIELD_DELIMITER

Um einzelne/mehrere Zeichen als Trennzeichen für jede Spalte/jedes Feld festzulegen.

sep

Ein beliebiges einstelliges Feldtrennzeichen

FIELD_DELIMITER

Um ein einzelnes Zeichen als Trennzeichen für jede Spalte/jedes Feld anzugeben.

Codierung

UTF-8, UTF-16 usw…

ENCODING

Zum Decodieren der CSV-Dateien nach dem angegebenen Codierungstyp. Die Standardcodierung ist UTF-8

lineSep

Ein beliebiges einzelnes Zeilentrennzeichen

RECORD_DELIMITER

Um das Zeilentrennzeichen zu definieren, das für das Parsen von Dateien verwendet werden soll.

pathGlobFilter

Dateimuster

PATTERN

So definieren Sie ein Muster, um nur Dateien zu lesen, deren Dateinamen dem Muster entsprechen.

recursiveFileLookup

„True“ oder „False“

N/A

Zum rekursiven Durchsuchen eines Verzeichnisses, um Dateien zu lesen. Der Standardwert für diese Option ist „False“.

Anführungszeichen

Einzelnes Zeichen, das in Anführungszeichen gesetzt werden soll

FIELD_OPTIONALLY_ENCLOSED_BY

Um Felder/Spalte, die Felder enthalten, in Anführungszeichen zu setzen, und das Trennzeichen dabei Teil des Wertes sein kann. Dieses Zeichen „To quote all fields“, wenn es mit quoteAll-Option verwendet wird. Der Standardwert dieser Option ist das doppelte Anführungszeichen(„).

nullValue

Zeichenfolge zum Ersetzen von „null“

NULL_IF

Ersetzen von Nullwerten durch die Zeichenfolge beim Lesen und Schreiben von Datenframe.

dateFormat

Gültiges Datumsformat

DATE_FORMAT

Um eine Zeichenfolge zu definieren, die ein Datumsformat angibt. Das Standardformat ist jjjj-MM-tt.

timestampFormat

Gültiges Format des Zeitstempels

TIMESTAMP_FORMAT

Zur Definition einer Zeichenfolge, die ein Zeitstempelformat angibt. Das Standardformat ist jjjj-MM-tt ‚T’HH: mm:ss.

Escape-Zeichen

Jedes einzelne Zeichen

ESCAPE

So legen Sie ein einzelnes Zeichen als Escape-Zeichen fest, um das Standard-Escape-Zeichen (\) zu überschreiben.

inferSchema

„True“ oder „False“

INFER_SCHEMA

Erkennt automatisch das Dateischema

mergeSchema

„True“ oder „False“

N/A

Wird in Snowflake nicht benötigt, da dies immer dann geschieht, wenn das infer_schema die Parquet-Dateistruktur bestimmt

  • For modifiedBefore / modifiedAfter option you can achieve the same result in Snowflake by using the metadata columns and then add a filter like: df.filter(METADATA_FILE_LAST_MODIFIED > ‘some_date’).

  • For more support, you can email us at sma-support@snowflake.com or post an issue in the SMA.

SPRKPY1084

This issue code has been deprecated since Spark Conversion Core 4.12.0

Meldung: pyspark.sql.readwriter.DataFrameWriter.option wird nicht unterstützt.

Kategorie: Warnung

Beschreibung

The pyspark.sql.readwriter.DataFrameWriter.option function is not supported.

Szenario

Eingabecode

Below is an example using the option method, this method is used to add additional configurations when writing the data of a DataFrame.

path_csv_file = "/path/to/file.csv"
data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

df.write.option("header", True).csv(csv_file_path)
df.write.option("sep", ";").option("lineSep","-").csv(csv_file_path)

Ausgabecode

The tool adds this EWI SPRKPY1084 on the output code to let you know that this function is not supported by Snowpark.

path_csv_file = "/path/to/file.csv"
data = [
        ("John", 30, "New York"),
        ("Jane", 25, "San Francisco")
    ]

df = spark.createDataFrame(data, schema=["Name", "Age", "City"])

#EWI: SPRKPY1084 => The pyspark.sql.readwriter.DataFrameWriter.option function is not supported.

df.write.option("header", True).csv(csv_file_path)
#EWI: SPRKPY1084 => The pyspark.sql.readwriter.DataFrameWriter.option function is not supported.
df.write.option("sep", ";").option("lineSep","-").csv(csv_file_path)

Empfohlene Korrektur

Für die pyspark.sql.readwriter.DataFrameWriter.option-Methode gibt es keine empfohlene Lösung.

Zusätzliche Empfehlungen

SPRKPY1085

Meldung: pyspark.ml.feature.VectorAssembler wird nicht unterstützt.

Kategorie: Warnung

Beschreibung

The pyspark.ml.feature.VectorAssembler is not supported.

Szenario

Eingabecode

VectorAssembler wird verwendet, um mehrere Spalten zu einem einzigen Vektor zusammenzufassen.

data = [
        (1, 10.0, 20.0),
        (2, 25.0, 30.0),
        (3, 50.0, 60.0)
    ]

df = SparkSession.createDataFrame(data, schema=["Id", "col1", "col2"])
vector = VectorAssembler(inputCols=["col1", "col2"], output="cols")

Ausgabecode

The tool adds this EWI SPRKPY1085 on the output code to let you know that this class is not supported by Snowpark.

data = [
        (1, 10.0, 20.0),
        (2, 25.0, 30.0),
        (3, 50.0, 60.0)
    ]

df = spark.createDataFrame(data, schema=["Id", "col1", "col2"])
#EWI: SPRKPY1085 => The pyspark.ml.feature.VectorAssembler function is not supported.

vector = VectorAssembler(inputCols=["col1", "col2"], output="cols")

Empfohlene Korrektur

Für das pyspark.ml.feature.VectorAssembler gibt es keine empfohlene Korrektur.

Zusätzliche Empfehlungen

SPRKPY1086

Meldung: pyspark.ml.linalg.VectorUDT wird nicht unterstützt.

Kategorie: Warnung

Beschreibung

The pyspark.ml.linalg.VectorUDT is not supported.

Szenario

Eingabecode

VectorUDT ist ein Datentyp zur Darstellung von Vektorspalten in einem DataFrame.

data = [
        (1, Vectors.dense([10.0, 20.0])),
        (2, Vectors.dense([25.0, 30.0])),
        (3, Vectors.dense([50.0, 60.0]))
    ]

schema = StructType([
        StructField("Id", IntegerType(), True),
        StructField("VectorCol", VectorUDT(), True),
    ])

df = SparkSession.createDataFrame(data, schema=schema)

Ausgabecode

The tool adds this EWI SPRKPY1086 on the output code to let you know that this function is not supported by Snowpark.

data = [
        (1, Vectors.dense([10.0, 20.0])),
        (2, Vectors.dense([25.0, 30.0])),
        (3, Vectors.dense([50.0, 60.0]))
    ]

#EWI: SPRKPY1086 => The pyspark.ml.linalg.VectorUDT function is not supported.
schema = StructType([
        StructField("Id", IntegerType(), True),
        StructField("VectorCol", VectorUDT(), True),
    ])

df = spark.createDataFrame(data, schema=schema)

Empfohlene Korrektur

Für die pyspark.ml.linalg.VectorUDT gibt es keine empfohlene Lösung.

Zusätzliche Empfehlungen

SPRKPY1087

Meldung: Die pyspark.sql.dataframe.DataFrame.writeTo-Funktion wird nicht unterstützt, aber es eine Problemumgehung gibt.

Kategorie: Warnung.

Beschreibung

The pyspark.sql.dataframe.DataFrame.writeTo function is not supported. The workaround is to use Snowpark DataFrameWriter SaveAsTable method instead.

Szenario

Eingabe

Below is an example of a use of the pyspark.sql.dataframe.DataFrame.writeTo function, the dataframe df is written into a table name Personal_info.

df = spark.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]],
                                 schema=["FIRST_NAME", "LAST_NAME"])

df.writeTo("Personal_info")

Ausgabe

The SMA adds the EWI SPRKPY1087 to the output code to let you know that this function is not supported, but has a workaround.

df = spark.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]],
                                 schema=["FIRST_NAME", "LAST_NAME"])

#EWI: SPRKPY1087 => pyspark.sql.dataframe.DataFrame.writeTo is not supported, but it has a workaround.
df.writeTo("Personal_info")

Empfohlene Korrektur

Die Problemumgehung besteht darin, stattdessen die Snowpark DataFrameWriter SaveAsTable-Methode zu verwenden.

df = spark.createDataFrame([["John", "Berry"], ["Rick", "Berry"], ["Anthony", "Davis"]],
                                 schema=["FIRST_NAME", "LAST_NAME"])

df.write.saveAsTable("Personal_info")

Zusätzliche Empfehlungen

SPRKPY1088

Meldung: Die Werte von pyspark.sql.readwriter.DataFrameWriter.option in Snowpark können unterschiedlich sein, so dass eine Validierung erforderlich sein könnte.

Kategorie: Warnung

Beschreibung

The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so validation might be needed to ensure that the behavior is correct.

Szenarien

Es gibt einige Szenarien, je nachdem, ob die Option unterstützt wird oder nicht oder welches Format zum Schreiben der Datei verwendet wird.

Szenario 1

Eingabe

Below is an example of the usage of the method option, adding a sep option, which is currently supported.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])

df.write.option("sep", ",").csv("some_path")

Ausgabe

The tool adds the EWI SPRKPY1088 indicating that it is required validation.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.option("sep", ",").csv("some_path")

Empfohlene Korrektur

Die Snowpark API unterstützt diesen Parameter, so dass die einzige Maßnahme darin bestehen kann, die Verhaltensweise nach der Migration zu überprüfen. Die unterstützten Parameter entnehmen Sie bitte der Äquivalenzen-Tabelle.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.option("sep", ",").csv("some_path")
Szenario 2

Eingabe

Here the scenario shows the usage of option, but adds a header option, which is not supported.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])

df.write.option("header", True).csv("some_path")

Ausgabe

The tool adds the EWI SPRKPY1088 indicating that it is required validation is needed.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.option("header", True).csv("some_path")

Empfohlene Korrektur

For this scenario it is recommended to evaluate the Snowpark format type options to see if it is possible to change it according to your needs. Also, check the behavior after the change.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.csv("some_path")
Szenario 3

Eingabe

This scenario adds a sep option, which is supported and uses the JSON method.

  • Note: this scenario also applies for PARQUET.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])

df.write.option("sep", ",").json("some_path")

Ausgabe

The tool adds the EWI SPRKPY1088 indicating that it is required validation is needed.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.option("sep", ",").json("some_path")

Empfohlene Korrektur

The file format JSON does not support the parameter sep, so it is recommended to evaluate the snowpark format type options to see if it is possible to change it according to your needs. Also, check the behavior after the change.

df = spark.createDataFrame([(100, "myVal")], ["ID", "Value"])
#EWI: SPRKPY1088 => The pyspark.sql.readwriter.DataFrameWriter.option values in Snowpark may be different, so required validation might be needed.
df.write.json("some_path")

Zusätzliche Empfehlungen

  • Since there are some not supported parameters, it is recommended to check the table of equivalences and check the behavior after the transformation.

  • Äquivalenzen-Tabelle:

PySpark-Option

SnowFlake-Option

Unterstützte Dateiformate

Beschreibung

SEP

FIELD_DELIMITER

CSV

Ein oder mehrere einzelne Byte- oder Multibyte-Zeichen, die Felder in einer Eingabedatei trennen.

LINESEP

RECORD_DELIMITER

CSV

Ein oder mehrere Zeichen, die Datensätze in einer Eingabedatei trennen.

QUOTE

FIELD_OPTIONALLY_ENCLOSED_BY

CSV

Zeichen, das verwendet wird, um Zeichenfolgen einzuschließen.

NULLVALUE

NULL_IF

CSV

Zeichenfolge, die zum Konvertieren in und von SQL NULL verwendet wird.

DATEFORMAT

DATE_FORMAT

CSV

Zeichenfolge, die das Format der Datumswerte in den zu ladenden Datendateien definiert.

TIMESTAMPFORMAT

TIMESTAMP_FORMAT

CSV

Zeichenfolge, die das Format der Zeitstempelwerte in den zu ladenden Datendateien definiert.

Wenn der verwendete Parameter nicht in der Liste enthalten ist, gibt API einen Fehler aus.

SPRKPY1089

Meldung: Die Werte von pyspark.sql.readwriter.DataFrameWriter. options in Snowpark können unterschiedlich sein, so dass eine Validierung erforderlich sein könnte.

Kategorie: Warnung

Beschreibung

The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so validation might be needed to ensure that the behavior is correct.

Szenarien

Es gibt einige Szenarien, je nachdem, ob die Optionen unterstützt werden oder nicht oder welches Format zum Schreiben der Datei verwendet wird.

Szenario 1

Eingabe

Below is an example of the usage of the method options, adding the options sep and nullValue, which are currently supported.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])

df.write.options(nullValue="myVal", sep=",").csv("some_path")

Ausgabe

The tool adds the EWI SPRKPY1089 indicating that it is required validation.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.options(nullValue="myVal", sep=",").csv("some_path")

Empfohlene Korrektur

Snowpark API unterstützt diese Parameter, so dass die einzige Maßnahme darin bestehen kann, die Verhaltensweise nach der Migration zu überprüfen. Die unterstützten Parameter entnehmen Sie bitte der Äquivalenzen-Tabelle.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.options(nullValue="myVal", sep=",").csv("some_path")
Szenario 2

Eingabe

Here the scenario shows the usage of options, but adds a header option, which is not supported.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])

df.write.options(header=True, sep=",").csv("some_path")

Ausgabe

The tool adds the EWI SPRKPY1089 indicating that it is required validation is needed.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.options(header=True, sep=",").csv("some_path")

Empfohlene Korrektur

For this scenario it is recommended to evaluate the Snowpark format type options to see if it is possible to change it according to your needs. Also, check the behavior after the change.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.csv("some_path")
Szenario 3

Eingabe

This scenario adds a sep option, which is supported and uses the JSON method.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])

df.write.options(nullValue="myVal", sep=",").json("some_path")

Ausgabe

The tool adds the EWI SPRKPY1089 indicating that it is required validation is needed.

  • Note: this scenario also applies for PARQUET.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.options(nullValue="myVal", sep=",").json("some_path")

Empfohlene Korrektur

The file format JSON does not support the parameter sep, so it is recommended to evaluate the snowpark format type options to see if it is possible to change it according to your needs. Also, check the behavior after the change.

df = spark.createDataFrame([(1, "myVal")], [2, "myVal2"], [None, "myVal3" ])
#EWI: SPRKPY1089 => The pyspark.sql.readwriter.DataFrameWriter.options values in Snowpark may be different, so required validation might be needed.
df.write.json("some_path")

Zusätzliche Empfehlungen

  • Since there are some not supported parameters, it is recommended to check the table of equivalences and check the behavior after the transformation.

  • Äquivalenzen-Tabelle:

Snowpark kann eine Liste von Äquivalenzen für einige Parameter unterstützen:

PySpark-Option

SnowFlake-Option

Unterstützte Dateiformate

Beschreibung

SEP

FIELD_DELIMITER

CSV

Ein oder mehrere einzelne Byte- oder Multibyte-Zeichen, die Felder in einer Eingabedatei trennen.

LINESEP

RECORD_DELIMITER

CSV

Ein oder mehrere Zeichen, die Datensätze in einer Eingabedatei trennen.

QUOTE

FIELD_OPTIONALLY_ENCLOSED_BY

CSV

Zeichen, das verwendet wird, um Zeichenfolgen einzuschließen.

NULLVALUE

NULL_IF

CSV

Zeichenfolge, die zum Konvertieren in und von SQL NULL verwendet wird.

DATEFORMAT

DATE_FORMAT

CSV

Zeichenfolge, die das Format der Datumswerte in den zu ladenden Datendateien definiert.

TIMESTAMPFORMAT

TIMESTAMP_FORMAT

CSV

Zeichenfolge, die das Format der Zeitstempelwerte in den zu ladenden Datendateien definiert.

Wenn der verwendete Parameter nicht in der Liste enthalten ist, gibt API einen Fehler aus.

SPRKPY1101

Kategorie

Parsing-Fehler.

Beschreibung

Wenn das Tool einen Parsing-Fehler erkennt, versucht es, ihn zu beheben und setzt den Prozess in der nächsten Zeile fort. In diesen Fällen werden der Fehler und die Kommentare in der Zeile angezeigt.

Dieses Beispiel zeigt, wie ein Fehler bei der Abweichung zwischen Leerzeichen und Tabulatoren behandelt wird.

Eingabecode

def foo():
    x = 5 # Spaces
     y = 6 # Tab

def foo2():
    x=6
    y=7

Ausgabecode

def foo():
    x = 5 # Spaces
## EWI: SPRKPY1101 => Unrecognized or invalid CODE STATEMENT @(3, 2). Last valid token was '5' @(2, 9), failed token 'y' @(3, 2)
## y = 6 # Tab

def foo2():
    x=6
    y=7

Empfehlungen

  • Versuchen Sie, die kommentierte Zeile zu korrigieren.

  • For more support, email us at sma-support@snowflake.com. If you have a support contract with Snowflake, reach out to your sales engineer, who can direct your support needs.